Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing pipeline support #1004

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs;

import fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;

/**
* The processing pipeline that will be used if not overridden.
*
*/
public class DefaultProcessingPipeline implements ProcessingPipeline {
private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass());

protected Config config;
protected TikaProcessor tika;
protected EsIndexProcessor es;

/**
* Parse the file with Tika and index to ES.
* Sub classes can override this method to define their own custom processing.
* @param ctx the context in which to find the file inputstream as well as
*/
@Override
public void processFile(FsCrawlerContext ctx) {
logger.debug("Starting processing of file {}", ctx.getFullFilename());
extractTextWithTika(ctx);
indexToEs(ctx);
}

/**
* Indexes document using {@link EsIndexProcessor}
*/
protected void indexToEs(FsCrawlerContext ctx) {
if (FsCrawlerUtil.isIndexable(ctx.getDoc().getContent(), config.getFsSettings().getFs().getFilters())) {
es.process(ctx);
} else {
logger.debug("We ignore file [{}] because it does not match all the patterns {}", ctx.getFile().getName(),
config.getFsSettings().getFs().getFilters());
}
}

/**
* Parse text and metadata from document using {@link TikaProcessor}
*/
protected void extractTextWithTika(FsCrawlerContext ctx) {
tika.process(ctx);
}

@Override
public void init(Config config) throws IOException {
this.config = config;
tika = new TikaProcessor(config.getFsSettings(), config.getMessageDigest(), Collections.emptyMap());
es = new EsIndexProcessor(config.getFsSettings(), config.getEsClient());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import fr.pilato.elasticsearch.crawler.fs.beans.DocParser;
import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient;
import fr.pilato.elasticsearch.crawler.fs.framework.SignTool;
import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Type;
import java.security.NoSuchAlgorithmException;
import java.util.Map;

/**
* Pulls {@link fr.pilato.elasticsearch.crawler.fs.beans.Doc} from context,
* merges it with 'extraDoc' if it exists and then indexes it to Elasticsearch
*/
public class EsIndexProcessor implements Processor {
private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass());
private final FsSettings fsSettings;
private final ElasticsearchClient esClient;

public EsIndexProcessor(FsSettings fsSettings, ElasticsearchClient esClient) {
this.fsSettings = fsSettings;
this.esClient = esClient;
}

@Override
public void process(FsCrawlerContext ctx) throws ProcessingException {
try {
long startTime = System.currentTimeMillis();
String index = fsSettings.getElasticsearch().getIndex();
String id = generateId(ctx);
String pipeline = fsSettings.getElasticsearch().getPipeline();
String json = DocParser.toJson(ctx.getDoc());
if (!ctx.getExtraDoc().isEmpty()) {
json = mergeExtraDoc(json, ctx.getExtraDoc());
}
esClient.index(index, id, json, pipeline);
logger.debug("Indexed {}/{}?pipeline={} in {}ms", index, id, pipeline,
System.currentTimeMillis() - startTime);
logger.trace("JSon indexed : {}", json);
} catch (JsonProcessingException e) {
throw new ProcessingException(e);
}
}

protected static String mergeExtraDoc(String json, Map<String, Object> extraDoc) {
Type mapType = new TypeToken<Map<String, Object>>() {}.getType();
Map<String,Object> doc = new Gson().fromJson(json, mapType);
doc.putAll(extraDoc);
return new Gson().toJson(doc);
}

protected String generateId(FsCrawlerContext ctx) {
try {
return fsSettings.getFs().isFilenameAsId() ?
ctx.getFile().getName() :
SignTool.sign((new File(ctx.getFile().getName(), ctx.getFilepath())).toString());
} catch (NoSuchAlgorithmException e) {
throw new ProcessingException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs;

import fr.pilato.elasticsearch.crawler.fs.beans.Doc;
import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractModel;

import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

/**
* A class that keeps necessary context for the processing pipeline.
* Each processor will typically update the 'doc' as needed.
*/
public class FsCrawlerContext {
private final FileAbstractModel file;
private final String filepath;
private final InputStream inputStream;
private final String fullFilename;
private final Map<String, Object> extraDoc;
private Doc doc;

public FsCrawlerContext(Builder builder) {
this.file = builder.file;
this.filepath = builder.filepath;
this.inputStream = builder.inputStream;
this.fullFilename = builder.fullFilename;
this.doc = builder.doc;
this.extraDoc = builder.extraDoc;
}

public FileAbstractModel getFile() {
return file;
}

public String getFilepath() {
return filepath;
}

public void setDoc() {
this.doc = new Doc();
}

public Doc getDoc() {
return doc;
}

public InputStream getInputStream() {
return inputStream;
}

public String getFullFilename() {
return fullFilename;
}

public Map<String, Object> getExtraDoc() {
return extraDoc;
}


public static class Builder {
private FileAbstractModel file;
private String filepath;
private InputStream inputStream;
private String fullFilename;
private Doc doc = new Doc();
private Map<String,Object> extraDoc = new HashMap<>();

public Builder withFileModel(FileAbstractModel file) {
this.file = file;
return this;
}

public Builder withFilePath(String filepath) {
this.filepath = filepath;
return this;
}

public Builder withInputStream(InputStream inputStream) {
this.inputStream = inputStream;
return this;
}

public Builder withFullFilename(String fullFilename) {
this.fullFilename = fullFilename;
return this;
}

public Builder withDoc(Doc doc) {
this.doc = doc;
return this;
}

public Builder withExtraDoc(Map<String,Object> extraDoc) {
this.extraDoc = extraDoc;
return this;
}

public FsCrawlerContext build() {
return new FsCrawlerContext(this);
}

}
}
Loading