From 7c537317ebee7ba8d73df65fce4522384cf2f382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Fri, 4 Sep 2020 22:48:51 +0200 Subject: [PATCH 01/17] Processing pipeline support (#1003). --- .../crawler/fs/DefaultProcessingPipeline.java | 53 +++++++ .../crawler/fs/EsIndexProcessor.java | 63 ++++++++ .../crawler/fs/FsCrawlerContext.java | 146 ++++++++++++++++++ .../crawler/fs/FsParserAbstract.java | 104 +++++++------ .../crawler/fs/ProcessingException.java | 30 ++++ .../crawler/fs/ProcessingPipeline.java | 29 ++++ .../elasticsearch/crawler/fs/Processor.java | 27 ++++ .../crawler/fs/TikaProcessor.java | 54 +++++++ .../crawler/fs/settings/FsSettings.java | 13 ++ .../crawler/fs/settings/Pipeline.java | 72 +++++++++ 10 files changed, 540 insertions(+), 51 deletions(-) create mode 100644 core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java create mode 100644 core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java create mode 100644 core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java create mode 100644 core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java create mode 100644 core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java create mode 100644 core/src/main/java/fr/pilato/elasticsearch/crawler/fs/Processor.java create mode 100644 core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java create mode 100644 settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java new file mode 100644 index 000000000..bf9a51524 --- /dev/null +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java @@ -0,0 +1,53 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fr.pilato.elasticsearch.crawler.fs; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.isIndexable; + +/** + * The processing pipeline that will be used if not overridden. + * + */ +public class DefaultProcessingPipeline implements ProcessingPipeline { + private static final Logger logger = LogManager.getLogger(DefaultProcessingPipeline.class); + private final TikaProcessor tika; + private final EsIndexProcessor es; + + public DefaultProcessingPipeline() { + tika = new TikaProcessor(); + es = new EsIndexProcessor(); + } + + @Override + public void processFile(FsCrawlerContext ctx) { + // Extracting content with Tika + tika.process(ctx); + + // Index to es + if (isIndexable(ctx.getDoc().getContent(), ctx.getFsSettings().getFs().getFilters())) { + es.process(ctx); + } else { + logger.debug("We ignore file [{}] because it does not match all the patterns {}", ctx.getFile().getName(), + ctx.getFsSettings().getFs().getFilters()); + } + } +} diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java new file mode 100644 index 000000000..5a44d0141 --- /dev/null +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java @@ -0,0 +1,63 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fr.pilato.elasticsearch.crawler.fs; + +import com.fasterxml.jackson.core.JsonProcessingException; +import fr.pilato.elasticsearch.crawler.fs.beans.DocParser; +import fr.pilato.elasticsearch.crawler.fs.framework.SignTool; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.security.NoSuchAlgorithmException; + +/** + * Pulls {@link fr.pilato.elasticsearch.crawler.fs.beans.Doc} from context and indexes it to Elasticsearch + */ +public class EsIndexProcessor implements Processor { + private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + public void process(FsCrawlerContext ctx) throws ProcessingException { + try { + long startTime = System.currentTimeMillis(); + String index = ctx.getFsSettings().getElasticsearch().getIndex(); + String id = generateId(ctx); + String pipeline = ctx.getFsSettings().getElasticsearch().getPipeline(); + String json = DocParser.toJson(ctx.getDoc()); + ctx.getEsClient().index(index, id, json, pipeline); + logger.debug("Indexed {}/{}?pipeline={} in {}ms", index, id, pipeline, + System.currentTimeMillis() - startTime); + logger.trace("JSon indexed : {}", json); + } catch (JsonProcessingException e) { + throw new ProcessingException(e); + } + } + + String generateId(FsCrawlerContext ctx) { + try { + return ctx.getFsSettings().getFs().isFilenameAsId() ? + ctx.getFile().getName() : + SignTool.sign((new File(ctx.getFile().getName(), ctx.getFilepath())).toString()); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java new file mode 100644 index 000000000..e80c3897f --- /dev/null +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java @@ -0,0 +1,146 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fr.pilato.elasticsearch.crawler.fs; + +import fr.pilato.elasticsearch.crawler.fs.beans.Doc; +import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient; +import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractModel; +import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; + +import java.io.InputStream; +import java.security.MessageDigest; + +/** + * A class that keeps necessary context for the processing pipeline. + * Each processor will typically update the 'doc' as needed. + */ +public class FsCrawlerContext { + private final FileAbstractModel file; + private final FsSettings fsSettings; + private final String filepath; + private final ElasticsearchClient esClient; + private final InputStream inputStream; + private final String fullFilename; + private Doc doc; + private final MessageDigest messageDigest; + + public FsCrawlerContext(Builder builder) { + this.file = builder.file; + this.fsSettings = builder.fsSettings; + this.filepath = builder.filepath; + this.messageDigest = builder.messageDigest; + this.esClient = builder.esClient; + this.inputStream = builder.inputStream; + this.fullFilename = builder.fullFilename; + this.doc = builder.doc; + } + + public FileAbstractModel getFile() { + return file; + } + + public FsSettings getFsSettings() { + return fsSettings; + } + + public String getFilepath() { + return filepath; + } + + public void setDoc() { + this.doc = new Doc(); + } + + public Doc getDoc() { + return doc; + } + + public MessageDigest getMessageDigest() { + return messageDigest; + } + + public ElasticsearchClient getEsClient() { + return esClient; + } + + public InputStream getInputStream() { + return inputStream; + } + + public String getFullFilename() { + return fullFilename; + } + + + public static class Builder { + private FileAbstractModel file; + private FsSettings fsSettings; + private String filepath; + private MessageDigest messageDigest; + private ElasticsearchClient esClient; + private InputStream inputStream; + private String fullFilename; + private Doc doc = new Doc(); + + public Builder withFileModel(FileAbstractModel file) { + this.file = file; + return this; + } + + public Builder withFsSettings(FsSettings fsSettings) { + this.fsSettings = fsSettings; + return this; + } + + public Builder withFilePath(String filepath) { + this.filepath = filepath; + return this; + } + + public Builder withMessageDigest(MessageDigest messageDigest) { + this.messageDigest = messageDigest; + return this; + } + + public Builder withEsClient(ElasticsearchClient esClient) { + this.esClient = esClient; + return this; + } + + public Builder withInputStream(InputStream inputStream) { + this.inputStream = inputStream; + return this; + } + + public Builder withFullFilename(String fullFilename) { + this.fullFilename = fullFilename; + return this; + } + + public Builder withDoc(Doc doc) { + this.doc = doc; + return this; + } + + public FsCrawlerContext build() { + return new FsCrawlerContext(this); + } + + } +} diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index 6eb6c397f..fe54a7297 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -19,34 +19,22 @@ package fr.pilato.elasticsearch.crawler.fs; -import fr.pilato.elasticsearch.crawler.fs.beans.Attributes; -import fr.pilato.elasticsearch.crawler.fs.beans.Doc; -import fr.pilato.elasticsearch.crawler.fs.beans.DocParser; -import fr.pilato.elasticsearch.crawler.fs.beans.FsJob; -import fr.pilato.elasticsearch.crawler.fs.beans.FsJobFileHandler; -import fr.pilato.elasticsearch.crawler.fs.beans.PathParser; -import fr.pilato.elasticsearch.crawler.fs.beans.ScanStatistic; -import fr.pilato.elasticsearch.crawler.fs.client.ESSearchHit; -import fr.pilato.elasticsearch.crawler.fs.client.ESSearchRequest; -import fr.pilato.elasticsearch.crawler.fs.client.ESSearchResponse; -import fr.pilato.elasticsearch.crawler.fs.client.ESTermQuery; -import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient; +import fr.pilato.elasticsearch.crawler.fs.beans.*; +import fr.pilato.elasticsearch.crawler.fs.client.*; import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractModel; import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractor; import fr.pilato.elasticsearch.crawler.fs.framework.ByteSizeValue; import fr.pilato.elasticsearch.crawler.fs.framework.OsValidator; import fr.pilato.elasticsearch.crawler.fs.framework.SignTool; import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; -import fr.pilato.elasticsearch.crawler.fs.tika.TikaDocParser; +import fr.pilato.elasticsearch.crawler.fs.settings.Pipeline; import fr.pilato.elasticsearch.crawler.fs.tika.XmlDocParser; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.BufferedReader; import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; +import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -57,12 +45,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.stream.Collectors; -import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.computeVirtualPathName; -import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.isFileSizeUnderLimit; -import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.isIndexable; -import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.localDateTimeToDate; +import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.*; public abstract class FsParserAbstract extends FsParser { private static final Logger logger = LogManager.getLogger(FsParserAbstract.class); @@ -80,6 +66,7 @@ public abstract class FsParserAbstract extends FsParser { private final Integer loop; private final MessageDigest messageDigest; private final String pathSeparator; + private ProcessingPipeline pipeline; private ScanStatistic stats; @@ -88,6 +75,14 @@ public abstract class FsParserAbstract extends FsParser { this.fsJobFileHandler = new FsJobFileHandler(config); this.esClient = esClient; this.loop = loop; + try { + Class clazz = FsParserAbstract.class.getClassLoader().loadClass(Pipeline.DEFAULT().getClassName()); + this.pipeline = (ProcessingPipeline) clazz.getDeclaredConstructor().newInstance(); + logger.info("Created processing pipeline {}", this.pipeline.getClass().getName()); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + logger.error("Could not create processing pipeline"); + e.printStackTrace(); + } logger.debug("creating fs crawler thread [{}] for [{}] every [{}]", fsSettings.getName(), fsSettings.getFs().getUrl(), fsSettings.getFs().getUpdateRate()); @@ -229,7 +224,7 @@ private void updateFsJob(String jobName, LocalDateTime scanDate) throws Exceptio fsJobFileHandler.write(jobName, fsJob); } - private void addFilesRecursively(FileAbstractor path, String filepath, LocalDateTime lastScanDate) + private void addFilesRecursively(FileAbstractor abstractor, String filepath, LocalDateTime lastScanDate) throws Exception { logger.debug("indexing [{}] content", filepath); @@ -239,9 +234,9 @@ private void addFilesRecursively(FileAbstractor path, String filepath, LocalD return; } - final Collection children = path.getFiles(filepath); - Collection fsFiles = new ArrayList<>(); - Collection fsFolders = new ArrayList<>(); + final Collection children = abstractor.getFiles(filepath); + final Collection fsFolders = new HashSet<>(); + final Collection fsFiles = new HashSet<>(); if (children != null) { boolean ignoreFolder = false; @@ -260,22 +255,22 @@ private void addFilesRecursively(FileAbstractor path, String filepath, LocalD logger.trace("FileAbstractModel = {}", child); String filename = child.getName(); - String virtualFileName = computeVirtualPathName(stats.getRootPath(), new File(filepath, filename).toString()); + String virtualFileName = computeVirtualPathName(stats.getRootPath(), new File(filepath, filename).toString()); - // https://github.com/dadoonet/fscrawler/issues/1 : Filter documents - boolean isIndexable = isIndexable(child.isDirectory(), virtualFileName, fsSettings.getFs().getIncludes(), fsSettings.getFs().getExcludes()); + // https://github.com/dadoonet/fscrawler/issues/1 : Filter documents + boolean isIndexable = isIndexable(child.isDirectory(), virtualFileName, fsSettings.getFs().getIncludes(), fsSettings.getFs().getExcludes()); - logger.debug("[{}] can be indexed: [{}]", virtualFileName, isIndexable); + logger.debug("[{}] can be indexed: [{}]", virtualFileName, isIndexable); if (isIndexable) { if (child.isFile()) { - logger.debug(" - file: {}", virtualFileName); - fsFiles.add(filename); + logger.debug(" - file: {}", virtualFileName); + fsFiles.add(filename); if (child.getLastModifiedDate().isAfter(lastScanDate) || (child.getCreationDate() != null && child.getCreationDate().isAfter(lastScanDate))) { if (isFileSizeUnderLimit(fsSettings.getFs().getIgnoreAbove(), child.getSize())) { try { indexFile(child, stats, filepath, - fsSettings.getFs().isIndexContent() || fsSettings.getFs().isStoreSource() ? path.getInputStream(child) : null, child.getSize()); + fsSettings.getFs().isIndexContent() || fsSettings.getFs().isStoreSource() ? abstractor.getInputStream(child) : null, child.getSize()); stats.addFile(); } catch (java.io.FileNotFoundException e) { if (fsSettings.getFs().isContinueOnError()) { @@ -283,13 +278,19 @@ private void addFilesRecursively(FileAbstractor path, String filepath, LocalD } else { throw e; } + } catch (ProcessingException pe) { + if (fsSettings.getFs().isContinueOnError()) { + logger.warn("Processing error for {}, skipping...: {}", filename, pe.getMessage()); + } else { + throw pe; + } } - } else { - logger.debug("file [{}] has a size [{}] above the limit [{}]. We skip it.", filename, + } else { + logger.debug("file [{}] has a size [{}] above the limit [{}]. We skip it.", filename, new ByteSizeValue(child.getSize()), fsSettings.getFs().getIgnoreAbove()); - } - } else { - logger.debug(" - not modified: creation date {} , file date {}, last scan date {}", + } + } else { + logger.debug(" - not modified: creation date {} , file date {}, last scan date {}", child.getCreationDate(), child.getLastModifiedDate(), lastScanDate); } } else if (child.isDirectory()) { @@ -298,7 +299,7 @@ private void addFilesRecursively(FileAbstractor path, String filepath, LocalD fsFolders.add(child.getFullpath()); indexDirectory(child.getFullpath()); } - addFilesRecursively(path, child.getFullpath(), lastScanDate); + addFilesRecursively(abstractor, child.getFullpath(), lastScanDate); } else { logger.debug(" - other: {}", filename); logger.debug("Not a file nor a dir. Skipping {}", child.getFullpath()); @@ -480,19 +481,20 @@ private void indexFile(FileAbstractModel fileAbstractModel, ScanStatistic stats, // https://github.com/dadoonet/fscrawler/issues/185 : Support Xml files doc.setObject(XmlDocParser.generateMap(inputStream)); } else { - // Extracting content with Tika - TikaDocParser.generate(fsSettings, inputStream, filename, fullFilename, doc, messageDigest, filesize); - } - - // We index the data structure - if (isIndexable(doc.getContent(), fsSettings.getFs().getFilters())) { - esIndex(fsSettings.getElasticsearch().getIndex(), - generateIdFromFilename(filename, dirname), - DocParser.toJson(doc), - fsSettings.getElasticsearch().getPipeline()); - } else { - logger.debug("We ignore file [{}] because it does not match all the patterns {}", filename, - fsSettings.getFs().getFilters()); + FsCrawlerContext context = new FsCrawlerContext.Builder() + .withFileModel(fileAbstractModel) + .withFullFilename(fullFilename) + .withFilePath(dirname) + .withFsSettings(fsSettings) + .withMessageDigest(messageDigest) + .withEsClient(esClient) + .withInputStream(inputStream) + .withDoc(doc) + .build(); + + // Do various parsing such as Tika etc. + // The last thing the pipeline will do is index to ES + pipeline.processFile(context); } } else { if (fsSettings.getFs().isJsonSupport()) { diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java new file mode 100644 index 000000000..937c06d03 --- /dev/null +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java @@ -0,0 +1,30 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fr.pilato.elasticsearch.crawler.fs; + +import java.io.IOException; + +/** + * Thrown by processor plugins + */ +public class ProcessingException extends RuntimeException { + public ProcessingException(IOException e) { + super(e); + } +} diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java new file mode 100644 index 000000000..2e861569f --- /dev/null +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java @@ -0,0 +1,29 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fr.pilato.elasticsearch.crawler.fs; + +/** + * Custom pipeline implementations will implement this interface. + * A pipeline will find the document with basic metadata in the context, + * and can do various processing, and should end with indexing the document + * to Elasticsearch through the {@link EsIndexProcessor} + */ +public interface ProcessingPipeline { + void processFile(FsCrawlerContext ctx); +} diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/Processor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/Processor.java new file mode 100644 index 000000000..9f8f3ad25 --- /dev/null +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/Processor.java @@ -0,0 +1,27 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fr.pilato.elasticsearch.crawler.fs; + +/** + * A processor reads the context, and will typically update the + * 'doc'. It as access to various context and also ES. + */ +public interface Processor { + public void process(FsCrawlerContext ctx) throws ProcessingException; +} diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java new file mode 100644 index 000000000..d59073b66 --- /dev/null +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fr.pilato.elasticsearch.crawler.fs; + +import fr.pilato.elasticsearch.crawler.fs.beans.DocParser; +import fr.pilato.elasticsearch.crawler.fs.tika.TikaDocParser; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; + +/** + * Tikaprocessor will parse full text and update the Doc instance on context + */ +public class TikaProcessor implements Processor { + private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + public void process(FsCrawlerContext ctx) throws ProcessingException { + try { + long startTime = System.currentTimeMillis(); + TikaDocParser.generate( + ctx.getFsSettings(), + ctx.getInputStream(), + ctx.getFile().getName(), + ctx.getFullFilename(), + ctx.getDoc(), + ctx.getMessageDigest(), + ctx.getFile().getSize()); + logger.debug("Parsing document {} with Tika in {}ms", ctx.getFile().getName(), + System.currentTimeMillis() - startTime); + logger.trace("Parsed doc={}", DocParser.toJson(ctx.getDoc())); + } catch (IOException e) { + throw new ProcessingException(e); + } + } +} diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/FsSettings.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/FsSettings.java index 6897ec191..ce3cda4d0 100644 --- a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/FsSettings.java +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/FsSettings.java @@ -29,6 +29,7 @@ public class FsSettings { private Server server; private Elasticsearch elasticsearch; private Rest rest; + private Pipeline pipeline; public FsSettings() { @@ -46,12 +47,17 @@ public static Builder builder(String name) { return new Builder().setName(name); } + public Pipeline getPipeline() { + return pipeline; + } + public static class Builder { private String name; private Fs fs = Fs.DEFAULT; private Server server = null; private Elasticsearch elasticsearch = Elasticsearch.DEFAULT(); private Rest rest = null; + private Pipeline pipeline = Pipeline.DEFAULT(); private Builder setName(String name) { this.name = name; @@ -81,6 +87,10 @@ public Builder setRest(Rest rest) { public FsSettings build() { return new FsSettings(name, fs, server, elasticsearch, rest); } + + public Pipeline getPipeline() { + return pipeline; + } } public String getName() { @@ -134,6 +144,7 @@ public boolean equals(Object o) { if (!Objects.equals(fs, that.fs)) return false; if (!Objects.equals(server, that.server)) return false; if (!Objects.equals(rest, that.rest)) return false; + if (!Objects.equals(pipeline, that.pipeline)) return false; return Objects.equals(elasticsearch, that.elasticsearch); } @@ -145,6 +156,7 @@ public int hashCode() { result = 31 * result + (server != null ? server.hashCode() : 0); result = 31 * result + (rest != null ? rest.hashCode() : 0); result = 31 * result + (elasticsearch != null ? elasticsearch.hashCode() : 0); + result = 31 * result + (pipeline != null ? pipeline.hashCode() : 0); return result; } @@ -155,6 +167,7 @@ public String toString() { ", server=" + server + ", elasticsearch=" + elasticsearch + ", rest=" + rest + + ", pipeline=" + pipeline + '}'; } } diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java new file mode 100644 index 000000000..0eeb7b321 --- /dev/null +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java @@ -0,0 +1,72 @@ +/* + * Licensed to David Pilato (the "Author") under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Author licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fr.pilato.elasticsearch.crawler.fs.settings; + +/** + * Makes it possible to provide your own pipeline + */ +public class Pipeline { + private final String className; + + private Pipeline(Builder builder) { + this.className = builder.className; + } + + public static Pipeline DEFAULT() { + return Pipeline.builder() + .addClass("fr.pilato.elasticsearch.crawler.fs.DefaultProcessingPipeline") + .build(); + } + + private static Builder builder() { + return new Builder(); + } + + public String getClassName() { + return className; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Pipeline pipeline = (Pipeline) o; + + return className != null ? className.equals(pipeline.className) : pipeline.className == null; + } + + @Override + public int hashCode() { + return className != null ? className.hashCode() : 0; + } + + public static class Builder { + private String className; + + public Builder addClass(String className) { + this.className = className; + return this; + } + + public Pipeline build() { + return new Pipeline(this); + } + } +} From 72aa0b79cdb2969d966c5c6a094b11611be92381 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Fri, 4 Sep 2020 23:30:21 +0200 Subject: [PATCH 02/17] Move config into `fs` and actually use it Add some documentation (not tested) --- .../crawler/fs/FsParserAbstract.java | 2 +- docs/source/admin/fs/index.rst | 2 ++ docs/source/admin/fs/local-fs.rst | 24 +++++++++++++++++++ .../elasticsearch/crawler/fs/settings/Fs.java | 22 +++++++++++++++-- .../crawler/fs/settings/FsSettings.java | 13 ---------- .../crawler/fs/settings/Pipeline.java | 10 ++++---- 6 files changed, 53 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index fe54a7297..57761d7db 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -76,7 +76,7 @@ public abstract class FsParserAbstract extends FsParser { this.esClient = esClient; this.loop = loop; try { - Class clazz = FsParserAbstract.class.getClassLoader().loadClass(Pipeline.DEFAULT().getClassName()); + Class clazz = FsParserAbstract.class.getClassLoader().loadClass(fsSettings.getFs().getPipeline().getClassName()); this.pipeline = (ProcessingPipeline) clazz.getDeclaredConstructor().newInstance(); logger.info("Created processing pipeline {}", this.pipeline.getClass().getName()); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { diff --git a/docs/source/admin/fs/index.rst b/docs/source/admin/fs/index.rst index 65fd8ad18..f3f3d36b0 100644 --- a/docs/source/admin/fs/index.rst +++ b/docs/source/admin/fs/index.rst @@ -33,6 +33,8 @@ The job file must comply to the following ``yaml`` specifications: language: "eng" path: "/path/to/tesseract/if/not/available/in/PATH" data_path: "/path/to/tesseract/tessdata/if/needed" + pipeline: + class_name: "com.example.MyCustomProcessingPipeline" server: hostname: "localhost" port: 22 diff --git a/docs/source/admin/fs/local-fs.rst b/docs/source/admin/fs/local-fs.rst index c60ff8566..308ee3af1 100644 --- a/docs/source/admin/fs/local-fs.rst +++ b/docs/source/admin/fs/local-fs.rst @@ -54,6 +54,8 @@ Here is a list of Local FS settings (under ``fs.`` prefix)`: +----------------------------+-----------------------+---------------------------------+ | ``fs.follow_symlinks`` | ``false`` | `Follow Symlinks`_ | +----------------------------+-----------------------+---------------------------------+ +| ``fs.pipeline.class_name`` | ``DefaultProcessingPipeline`` | :ref:`pipeline_class | ++----------------------------+-----------------------+---------------------------------+ .. _root-directory: @@ -576,6 +578,28 @@ continue with the rest of the directory tree you can set fs: continue_on_error: true +.. _pipeline_class: + +Pipeline Class +^^^^^^^^^^^^^^^^^ + +.. versionadded:: 2.8 + +By default FSCrawler will parse binary documents with Apache Tika, +and then index to Elasticsearch. If you require custom processing, +implement the `ProcessingPipeline` interface, place the jar file in +FSCrawler's classpath, and configure the class name of your custom +pipeline class in the job config file. Your custom pipeline class +could extend the `DefaultProcessingPipeline` class to reuse the tika +and indexing processors: + +.. code:: yaml + + name: "test" + fs: + pipeline: + class_name: com.example.CustomProcessingPipeline + Language detection ^^^^^^^^^^^^^^^^^^ diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java index 3aef0eb33..20ff40379 100644 --- a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java @@ -58,6 +58,7 @@ public class Fs { private Ocr ocr = new Ocr(); private ByteSizeValue ignoreAbove = null; private boolean followSymlinks = false; + private Pipeline pipeline; public static Builder builder() { return new Builder(); @@ -91,6 +92,7 @@ public static class Builder { private Ocr ocr = new Ocr(); private ByteSizeValue ignoreAbove = null; private boolean followSymlinks = false; + private Pipeline pipeline = Pipeline.DEFAULT(); public Builder setUrl(String url) { this.url = url; @@ -246,10 +248,15 @@ public Builder setFollowSymlinks(boolean followSymlinks) { return this; } + public Builder setPipeline(Pipeline pipeline) { + this.pipeline = pipeline; + return this; + } + public Fs build() { return new Fs(url, updateRate, includes, excludes, filters, jsonSupport, filenameAsId, addFilesize, removeDeleted, addAsInnerObject, storeSource, indexedChars, indexContent, attributesSupport, rawMetadata, - checksum, xmlSupport, indexFolders, langDetect, continueOnError, ocr, ignoreAbove, followSymlinks); + checksum, xmlSupport, indexFolders, langDetect, continueOnError, ocr, ignoreAbove, followSymlinks, pipeline); } } @@ -260,7 +267,8 @@ public Fs( ) { private Fs(String url, TimeValue updateRate, List includes, List excludes, List filters, boolean jsonSupport, boolean filenameAsId, boolean addFilesize, boolean removeDeleted, boolean addAsInnerObject, boolean storeSource, Percentage indexedChars, boolean indexContent, boolean attributesSupport, boolean rawMetadata, String checksum, boolean xmlSupport, - boolean indexFolders, boolean langDetect, boolean continueOnError, Ocr ocr, ByteSizeValue ignoreAbove, boolean followSymlinks) { + boolean indexFolders, boolean langDetect, boolean continueOnError, Ocr ocr, ByteSizeValue ignoreAbove, boolean followSymlinks, + Pipeline pipeline) { this.url = url; this.updateRate = updateRate; this.includes = includes; @@ -284,6 +292,7 @@ private Fs(String url, TimeValue updateRate, List includes, List this.ocr = ocr; this.ignoreAbove = ignoreAbove; this.followSymlinks = followSymlinks; + this.pipeline = pipeline; } public String getUrl() { @@ -446,6 +455,15 @@ public void setContinueOnError(boolean continueOnError) { this.continueOnError = continueOnError; } + public Pipeline getPipeline() { + return pipeline; + } + + public void setPipeline(Pipeline pipeline) { + this.pipeline = pipeline; + } + + @Deprecated public void setPdfOcr(boolean pdfOcr) { String strategy; diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/FsSettings.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/FsSettings.java index ce3cda4d0..6897ec191 100644 --- a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/FsSettings.java +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/FsSettings.java @@ -29,7 +29,6 @@ public class FsSettings { private Server server; private Elasticsearch elasticsearch; private Rest rest; - private Pipeline pipeline; public FsSettings() { @@ -47,17 +46,12 @@ public static Builder builder(String name) { return new Builder().setName(name); } - public Pipeline getPipeline() { - return pipeline; - } - public static class Builder { private String name; private Fs fs = Fs.DEFAULT; private Server server = null; private Elasticsearch elasticsearch = Elasticsearch.DEFAULT(); private Rest rest = null; - private Pipeline pipeline = Pipeline.DEFAULT(); private Builder setName(String name) { this.name = name; @@ -87,10 +81,6 @@ public Builder setRest(Rest rest) { public FsSettings build() { return new FsSettings(name, fs, server, elasticsearch, rest); } - - public Pipeline getPipeline() { - return pipeline; - } } public String getName() { @@ -144,7 +134,6 @@ public boolean equals(Object o) { if (!Objects.equals(fs, that.fs)) return false; if (!Objects.equals(server, that.server)) return false; if (!Objects.equals(rest, that.rest)) return false; - if (!Objects.equals(pipeline, that.pipeline)) return false; return Objects.equals(elasticsearch, that.elasticsearch); } @@ -156,7 +145,6 @@ public int hashCode() { result = 31 * result + (server != null ? server.hashCode() : 0); result = 31 * result + (rest != null ? rest.hashCode() : 0); result = 31 * result + (elasticsearch != null ? elasticsearch.hashCode() : 0); - result = 31 * result + (pipeline != null ? pipeline.hashCode() : 0); return result; } @@ -167,7 +155,6 @@ public String toString() { ", server=" + server + ", elasticsearch=" + elasticsearch + ", rest=" + rest + - ", pipeline=" + pipeline + '}'; } } diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java index 0eeb7b321..2108486b3 100644 --- a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java @@ -22,10 +22,12 @@ * Makes it possible to provide your own pipeline */ public class Pipeline { - private final String className; + private String className = "fr.pilato.elasticsearch.crawler.fs.DefaultProcessingPipeline"; - private Pipeline(Builder builder) { - this.className = builder.className; + public Pipeline() { } + + private Pipeline(String className) { + this.className = className; } public static Pipeline DEFAULT() { @@ -66,7 +68,7 @@ public Builder addClass(String className) { } public Pipeline build() { - return new Pipeline(this); + return new Pipeline(className); } } } From a451ce10567fcd7a4e274887e62f548494c64d65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sat, 5 Sep 2020 00:22:50 +0200 Subject: [PATCH 03/17] Move config into `fs` and actually use it Add support for an 'extraDoc' on context that gets merged into 'doc' before indexing Add some documentation (not tested) --- .../crawler/fs/EsIndexProcessor.java | 14 ++++++++++++ .../crawler/fs/FsCrawlerContext.java | 15 +++++++++++++ .../crawler/fs/EsIndexProcessorTest.java | 22 +++++++++++++++++++ 3 files changed, 51 insertions(+) create mode 100644 core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java index 5a44d0141..8a0920321 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java @@ -19,6 +19,8 @@ package fr.pilato.elasticsearch.crawler.fs; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import fr.pilato.elasticsearch.crawler.fs.beans.DocParser; import fr.pilato.elasticsearch.crawler.fs.framework.SignTool; import org.apache.logging.log4j.LogManager; @@ -26,7 +28,9 @@ import java.io.File; import java.lang.invoke.MethodHandles; +import java.lang.reflect.Type; import java.security.NoSuchAlgorithmException; +import java.util.Map; /** * Pulls {@link fr.pilato.elasticsearch.crawler.fs.beans.Doc} from context and indexes it to Elasticsearch @@ -42,6 +46,9 @@ public void process(FsCrawlerContext ctx) throws ProcessingException { String id = generateId(ctx); String pipeline = ctx.getFsSettings().getElasticsearch().getPipeline(); String json = DocParser.toJson(ctx.getDoc()); + if (!ctx.getExtraDoc().isEmpty()) { + json = mergeExtraDoc(json, ctx.getExtraDoc()); + } ctx.getEsClient().index(index, id, json, pipeline); logger.debug("Indexed {}/{}?pipeline={} in {}ms", index, id, pipeline, System.currentTimeMillis() - startTime); @@ -51,6 +58,13 @@ public void process(FsCrawlerContext ctx) throws ProcessingException { } } + protected String mergeExtraDoc(String json, Map extraDoc) { + Type mapType = new TypeToken>() {}.getType(); + Map doc = new Gson().fromJson(json, mapType); + doc.putAll(extraDoc); + return new Gson().toJson(doc); + } + String generateId(FsCrawlerContext ctx) { try { return ctx.getFsSettings().getFs().isFilenameAsId() ? diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java index e80c3897f..b29061a2f 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java @@ -25,6 +25,8 @@ import java.io.InputStream; import java.security.MessageDigest; +import java.util.HashMap; +import java.util.Map; /** * A class that keeps necessary context for the processing pipeline. @@ -37,7 +39,9 @@ public class FsCrawlerContext { private final ElasticsearchClient esClient; private final InputStream inputStream; private final String fullFilename; + private final Map extraDoc; private Doc doc; + private final MessageDigest messageDigest; public FsCrawlerContext(Builder builder) { @@ -49,6 +53,7 @@ public FsCrawlerContext(Builder builder) { this.inputStream = builder.inputStream; this.fullFilename = builder.fullFilename; this.doc = builder.doc; + this.extraDoc = builder.extraDoc; } public FileAbstractModel getFile() { @@ -87,6 +92,10 @@ public String getFullFilename() { return fullFilename; } + public Map getExtraDoc() { + return extraDoc; + } + public static class Builder { private FileAbstractModel file; @@ -97,6 +106,7 @@ public static class Builder { private InputStream inputStream; private String fullFilename; private Doc doc = new Doc(); + private Map extraDoc = new HashMap<>(); public Builder withFileModel(FileAbstractModel file) { this.file = file; @@ -138,6 +148,11 @@ public Builder withDoc(Doc doc) { return this; } + public Builder withExtraDoc(Map extraDoc) { + this.extraDoc = extraDoc; + return this; + } + public FsCrawlerContext build() { return new FsCrawlerContext(this); } diff --git a/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java b/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java new file mode 100644 index 000000000..1e57b39f5 --- /dev/null +++ b/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java @@ -0,0 +1,22 @@ +package fr.pilato.elasticsearch.crawler.fs; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + +public class EsIndexProcessorTest { + + @Test + public void mergeExtraDoc() { + Map extra = new HashMap<>(); + Map geo = new HashMap<>(); + geo.put("hello", "world"); + extra.put("geo", geo); + String json = "{\"file\" : {\"extension\" : \"pdf\"} }"; + String modifiedJson = new EsIndexProcessor().mergeExtraDoc(json, extra); + assertEquals("{\"file\":{\"extension\":\"pdf\"},\"geo\":{\"hello\":\"world\"}}", modifiedJson); + } +} \ No newline at end of file From 0a37d83383eafae465a2c5b7b1bf8b5bb5bb570c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sat, 5 Sep 2020 00:57:47 +0200 Subject: [PATCH 04/17] Address some automated PR review feedback --- .../pilato/elasticsearch/crawler/fs/EsIndexProcessor.java | 7 ++++--- .../elasticsearch/crawler/fs/ProcessingException.java | 4 +--- .../elasticsearch/crawler/fs/EsIndexProcessorTest.java | 2 +- .../fr/pilato/elasticsearch/crawler/fs/settings/Fs.java | 2 +- .../elasticsearch/crawler/fs/settings/Pipeline.java | 8 +++----- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java index 8a0920321..dfaede455 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java @@ -33,7 +33,8 @@ import java.util.Map; /** - * Pulls {@link fr.pilato.elasticsearch.crawler.fs.beans.Doc} from context and indexes it to Elasticsearch + * Pulls {@link fr.pilato.elasticsearch.crawler.fs.beans.Doc} from context, + * merges it with 'extraDoc' if it exists and then indexes it to Elasticsearch */ public class EsIndexProcessor implements Processor { private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); @@ -65,13 +66,13 @@ protected String mergeExtraDoc(String json, Map extraDoc) { return new Gson().toJson(doc); } - String generateId(FsCrawlerContext ctx) { + protected String generateId(FsCrawlerContext ctx) { try { return ctx.getFsSettings().getFs().isFilenameAsId() ? ctx.getFile().getName() : SignTool.sign((new File(ctx.getFile().getName(), ctx.getFilepath())).toString()); } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); + throw new ProcessingException(e); } } } diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java index 937c06d03..368870296 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java @@ -18,13 +18,11 @@ */ package fr.pilato.elasticsearch.crawler.fs; -import java.io.IOException; - /** * Thrown by processor plugins */ public class ProcessingException extends RuntimeException { - public ProcessingException(IOException e) { + public ProcessingException(Throwable e) { super(e); } } diff --git a/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java b/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java index 1e57b39f5..5b91e5e00 100644 --- a/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java +++ b/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java @@ -5,7 +5,7 @@ import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; public class EsIndexProcessorTest { diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java index 20ff40379..0c6d2c6c0 100644 --- a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java @@ -92,7 +92,7 @@ public static class Builder { private Ocr ocr = new Ocr(); private ByteSizeValue ignoreAbove = null; private boolean followSymlinks = false; - private Pipeline pipeline = Pipeline.DEFAULT(); + private Pipeline pipeline = Pipeline.DEFAULT; public Builder setUrl(String url) { this.url = url; diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java index 2108486b3..a3855e38f 100644 --- a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java @@ -30,11 +30,9 @@ private Pipeline(String className) { this.className = className; } - public static Pipeline DEFAULT() { - return Pipeline.builder() - .addClass("fr.pilato.elasticsearch.crawler.fs.DefaultProcessingPipeline") - .build(); - } + public static final Pipeline DEFAULT = Pipeline.builder() + .addClass("fr.pilato.elasticsearch.crawler.fs.DefaultProcessingPipeline") + .build(); private static Builder builder() { return new Builder(); From 225f348659ba26da9399b916f679253b4b3759c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sat, 5 Sep 2020 01:06:54 +0200 Subject: [PATCH 05/17] Fix Codacy issues --- .../crawler/fs/FsParserAbstract.java | 15 ++++++++++++--- .../crawler/fs/settings/Pipeline.java | 8 ++++---- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index 57761d7db..e6b59bdea 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -19,15 +19,24 @@ package fr.pilato.elasticsearch.crawler.fs; -import fr.pilato.elasticsearch.crawler.fs.beans.*; -import fr.pilato.elasticsearch.crawler.fs.client.*; +import fr.pilato.elasticsearch.crawler.fs.beans.Attributes; +import fr.pilato.elasticsearch.crawler.fs.beans.Doc; +import fr.pilato.elasticsearch.crawler.fs.beans.DocParser; +import fr.pilato.elasticsearch.crawler.fs.beans.FsJob; +import fr.pilato.elasticsearch.crawler.fs.beans.FsJobFileHandler; +import fr.pilato.elasticsearch.crawler.fs.beans.PathParser; +import fr.pilato.elasticsearch.crawler.fs.beans.ScanStatistic; +import fr.pilato.elasticsearch.crawler.fs.client.ESSearchHit; +import fr.pilato.elasticsearch.crawler.fs.client.ESSearchRequest; +import fr.pilato.elasticsearch.crawler.fs.client.ESSearchResponse; +import fr.pilato.elasticsearch.crawler.fs.client.ESTermQuery; +import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient; import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractModel; import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractor; import fr.pilato.elasticsearch.crawler.fs.framework.ByteSizeValue; import fr.pilato.elasticsearch.crawler.fs.framework.OsValidator; import fr.pilato.elasticsearch.crawler.fs.framework.SignTool; import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; -import fr.pilato.elasticsearch.crawler.fs.settings.Pipeline; import fr.pilato.elasticsearch.crawler.fs.tika.XmlDocParser; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java index a3855e38f..22ae444bb 100644 --- a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Pipeline.java @@ -22,6 +22,10 @@ * Makes it possible to provide your own pipeline */ public class Pipeline { + public static final Pipeline DEFAULT = Pipeline.builder() + .addClass("fr.pilato.elasticsearch.crawler.fs.DefaultProcessingPipeline") + .build(); + private String className = "fr.pilato.elasticsearch.crawler.fs.DefaultProcessingPipeline"; public Pipeline() { } @@ -30,10 +34,6 @@ private Pipeline(String className) { this.className = className; } - public static final Pipeline DEFAULT = Pipeline.builder() - .addClass("fr.pilato.elasticsearch.crawler.fs.DefaultProcessingPipeline") - .build(); - private static Builder builder() { return new Builder(); } From 8a57fb7fec87bccafffae8bff89faa9176e30ed9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sat, 5 Sep 2020 01:14:41 +0200 Subject: [PATCH 06/17] More fixes --- .../elasticsearch/crawler/fs/FsParserAbstract.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index e6b59bdea..67d87d5dc 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -41,8 +41,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.BufferedReader; import java.io.File; -import java.io.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; @@ -57,7 +60,10 @@ import java.util.HashSet; import java.util.stream.Collectors; -import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.*; +import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.computeVirtualPathName; +import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.isFileSizeUnderLimit; +import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.isIndexable; +import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.localDateTimeToDate; public abstract class FsParserAbstract extends FsParser { private static final Logger logger = LogManager.getLogger(FsParserAbstract.class); From 0b54155490831be12990dc4b855a541c826c0b4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sat, 5 Sep 2020 01:26:48 +0200 Subject: [PATCH 07/17] More fixes --- .../elasticsearch/crawler/fs/TikaProcessor.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java index d59073b66..b22f908c5 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java @@ -24,21 +24,26 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.io.InputStream; import java.lang.invoke.MethodHandles; /** - * Tikaprocessor will parse full text and update the Doc instance on context + * Tikaprocessor will parse full text and update the Doc instance on context. + *
    + *
  • Input: raw file input stream
  • + *
  • Output: Doc with the parsed text and metadata
  • + *
*/ public class TikaProcessor implements Processor { private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); @Override public void process(FsCrawlerContext ctx) throws ProcessingException { - try { + try(InputStream stream = ctx.getInputStream()) { long startTime = System.currentTimeMillis(); TikaDocParser.generate( ctx.getFsSettings(), - ctx.getInputStream(), + stream, ctx.getFile().getName(), ctx.getFullFilename(), ctx.getDoc(), From 1dc0912df9b1df0bdf177f102ed3825eb9223957 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sun, 6 Sep 2020 01:06:27 +0200 Subject: [PATCH 08/17] Avoid nullpointer when pipeline not defined --- .../elasticsearch/crawler/fs/DefaultProcessingPipeline.java | 4 +++- .../java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java index bf9a51524..f5d76cf1a 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java @@ -21,6 +21,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.lang.invoke.MethodHandles; + import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.isIndexable; /** @@ -28,7 +30,7 @@ * */ public class DefaultProcessingPipeline implements ProcessingPipeline { - private static final Logger logger = LogManager.getLogger(DefaultProcessingPipeline.class); + private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); private final TikaProcessor tika; private final EsIndexProcessor es; diff --git a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java index 0c6d2c6c0..c4815c8c2 100644 --- a/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java +++ b/settings/src/main/java/fr/pilato/elasticsearch/crawler/fs/settings/Fs.java @@ -58,7 +58,7 @@ public class Fs { private Ocr ocr = new Ocr(); private ByteSizeValue ignoreAbove = null; private boolean followSymlinks = false; - private Pipeline pipeline; + private Pipeline pipeline = Pipeline.DEFAULT; public static Builder builder() { return new Builder(); @@ -92,7 +92,7 @@ public static class Builder { private Ocr ocr = new Ocr(); private ByteSizeValue ignoreAbove = null; private boolean followSymlinks = false; - private Pipeline pipeline = Pipeline.DEFAULT; + private Pipeline pipeline; public Builder setUrl(String url) { this.url = url; From 430de8bd136f18e5ad61897b8a2d68f354e104d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sun, 6 Sep 2020 02:11:04 +0200 Subject: [PATCH 09/17] Introduce a init() method that takes a config with settings like fsSettings and esClient Also pass this to processors as needed. This makes the Context object only keep data related to the file being processed --- .../crawler/fs/DefaultProcessingPipeline.java | 25 ++++++---- .../crawler/fs/EsIndexProcessor.java | 19 +++++-- .../crawler/fs/FsCrawlerContext.java | 40 --------------- .../crawler/fs/FsParserAbstract.java | 21 ++++---- .../crawler/fs/ProcessingPipeline.java | 49 +++++++++++++++++++ .../crawler/fs/TikaProcessor.java | 13 ++++- .../crawler/fs/EsIndexProcessorTest.java | 2 +- 7 files changed, 100 insertions(+), 69 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java index f5d76cf1a..4f340d4e1 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java @@ -18,26 +18,24 @@ */ package fr.pilato.elasticsearch.crawler.fs; +import fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.lang.invoke.MethodHandles; -import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.isIndexable; - /** * The processing pipeline that will be used if not overridden. * */ public class DefaultProcessingPipeline implements ProcessingPipeline { private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); - private final TikaProcessor tika; - private final EsIndexProcessor es; - public DefaultProcessingPipeline() { - tika = new TikaProcessor(); - es = new EsIndexProcessor(); - } + protected TikaProcessor tika; + protected EsIndexProcessor es; + protected Config config; + + public DefaultProcessingPipeline() { } @Override public void processFile(FsCrawlerContext ctx) { @@ -45,11 +43,18 @@ public void processFile(FsCrawlerContext ctx) { tika.process(ctx); // Index to es - if (isIndexable(ctx.getDoc().getContent(), ctx.getFsSettings().getFs().getFilters())) { + if (FsCrawlerUtil.isIndexable(ctx.getDoc().getContent(), config.getFsSettings().getFs().getFilters())) { es.process(ctx); } else { logger.debug("We ignore file [{}] because it does not match all the patterns {}", ctx.getFile().getName(), - ctx.getFsSettings().getFs().getFilters()); + config.getFsSettings().getFs().getFilters()); } } + + @Override + public void init(Config config) { + this.config = config; + tika = new TikaProcessor(config.getFsSettings(), config.getMessageDigest()); + es = new EsIndexProcessor(config.getFsSettings(), config.getEsClient()); + } } diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java index dfaede455..f5c449a70 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessor.java @@ -22,7 +22,9 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import fr.pilato.elasticsearch.crawler.fs.beans.DocParser; +import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient; import fr.pilato.elasticsearch.crawler.fs.framework.SignTool; +import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,19 +40,26 @@ */ public class EsIndexProcessor implements Processor { private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); + private final FsSettings fsSettings; + private final ElasticsearchClient esClient; + + public EsIndexProcessor(FsSettings fsSettings, ElasticsearchClient esClient) { + this.fsSettings = fsSettings; + this.esClient = esClient; + } @Override public void process(FsCrawlerContext ctx) throws ProcessingException { try { long startTime = System.currentTimeMillis(); - String index = ctx.getFsSettings().getElasticsearch().getIndex(); + String index = fsSettings.getElasticsearch().getIndex(); String id = generateId(ctx); - String pipeline = ctx.getFsSettings().getElasticsearch().getPipeline(); + String pipeline = fsSettings.getElasticsearch().getPipeline(); String json = DocParser.toJson(ctx.getDoc()); if (!ctx.getExtraDoc().isEmpty()) { json = mergeExtraDoc(json, ctx.getExtraDoc()); } - ctx.getEsClient().index(index, id, json, pipeline); + esClient.index(index, id, json, pipeline); logger.debug("Indexed {}/{}?pipeline={} in {}ms", index, id, pipeline, System.currentTimeMillis() - startTime); logger.trace("JSon indexed : {}", json); @@ -59,7 +68,7 @@ public void process(FsCrawlerContext ctx) throws ProcessingException { } } - protected String mergeExtraDoc(String json, Map extraDoc) { + protected static String mergeExtraDoc(String json, Map extraDoc) { Type mapType = new TypeToken>() {}.getType(); Map doc = new Gson().fromJson(json, mapType); doc.putAll(extraDoc); @@ -68,7 +77,7 @@ protected String mergeExtraDoc(String json, Map extraDoc) { protected String generateId(FsCrawlerContext ctx) { try { - return ctx.getFsSettings().getFs().isFilenameAsId() ? + return fsSettings.getFs().isFilenameAsId() ? ctx.getFile().getName() : SignTool.sign((new File(ctx.getFile().getName(), ctx.getFilepath())).toString()); } catch (NoSuchAlgorithmException e) { diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java index b29061a2f..172686f50 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerContext.java @@ -19,12 +19,9 @@ package fr.pilato.elasticsearch.crawler.fs; import fr.pilato.elasticsearch.crawler.fs.beans.Doc; -import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient; import fr.pilato.elasticsearch.crawler.fs.crawler.FileAbstractModel; -import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; import java.io.InputStream; -import java.security.MessageDigest; import java.util.HashMap; import java.util.Map; @@ -34,22 +31,15 @@ */ public class FsCrawlerContext { private final FileAbstractModel file; - private final FsSettings fsSettings; private final String filepath; - private final ElasticsearchClient esClient; private final InputStream inputStream; private final String fullFilename; private final Map extraDoc; private Doc doc; - private final MessageDigest messageDigest; - public FsCrawlerContext(Builder builder) { this.file = builder.file; - this.fsSettings = builder.fsSettings; this.filepath = builder.filepath; - this.messageDigest = builder.messageDigest; - this.esClient = builder.esClient; this.inputStream = builder.inputStream; this.fullFilename = builder.fullFilename; this.doc = builder.doc; @@ -60,10 +50,6 @@ public FileAbstractModel getFile() { return file; } - public FsSettings getFsSettings() { - return fsSettings; - } - public String getFilepath() { return filepath; } @@ -76,14 +62,6 @@ public Doc getDoc() { return doc; } - public MessageDigest getMessageDigest() { - return messageDigest; - } - - public ElasticsearchClient getEsClient() { - return esClient; - } - public InputStream getInputStream() { return inputStream; } @@ -99,10 +77,7 @@ public Map getExtraDoc() { public static class Builder { private FileAbstractModel file; - private FsSettings fsSettings; private String filepath; - private MessageDigest messageDigest; - private ElasticsearchClient esClient; private InputStream inputStream; private String fullFilename; private Doc doc = new Doc(); @@ -113,26 +88,11 @@ public Builder withFileModel(FileAbstractModel file) { return this; } - public Builder withFsSettings(FsSettings fsSettings) { - this.fsSettings = fsSettings; - return this; - } - public Builder withFilePath(String filepath) { this.filepath = filepath; return this; } - public Builder withMessageDigest(MessageDigest messageDigest) { - this.messageDigest = messageDigest; - return this; - } - - public Builder withEsClient(ElasticsearchClient esClient) { - this.esClient = esClient; - return this; - } - public Builder withInputStream(InputStream inputStream) { this.inputStream = inputStream; return this; diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index 67d87d5dc..ceb49cc6a 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -90,14 +90,6 @@ public abstract class FsParserAbstract extends FsParser { this.fsJobFileHandler = new FsJobFileHandler(config); this.esClient = esClient; this.loop = loop; - try { - Class clazz = FsParserAbstract.class.getClassLoader().loadClass(fsSettings.getFs().getPipeline().getClassName()); - this.pipeline = (ProcessingPipeline) clazz.getDeclaredConstructor().newInstance(); - logger.info("Created processing pipeline {}", this.pipeline.getClass().getName()); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - logger.error("Could not create processing pipeline"); - e.printStackTrace(); - } logger.debug("creating fs crawler thread [{}] for [{}] every [{}]", fsSettings.getName(), fsSettings.getFs().getUrl(), fsSettings.getFs().getUpdateRate()); @@ -113,6 +105,16 @@ public abstract class FsParserAbstract extends FsParser { messageDigest = null; } + try { + Class clazz = FsParserAbstract.class.getClassLoader().loadClass(fsSettings.getFs().getPipeline().getClassName()); + pipeline = (ProcessingPipeline) clazz.getDeclaredConstructor().newInstance(); + pipeline.init(new ProcessingPipeline.Config(fsSettings, esClient, messageDigest, Collections.emptyMap())); + logger.info("Created processing pipeline {}", this.pipeline.getClass().getName()); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + logger.error("Could not create processing pipeline"); + e.printStackTrace(); + } + // On Windows, when using SSH server, we need to force the "Linux" separator if (OsValidator.WINDOWS && fsSettings.getServer() != null) { logger.debug("We are running on Windows with SSH Server settings so we need to force the Linux separator."); @@ -500,9 +502,6 @@ private void indexFile(FileAbstractModel fileAbstractModel, ScanStatistic stats, .withFileModel(fileAbstractModel) .withFullFilename(fullFilename) .withFilePath(dirname) - .withFsSettings(fsSettings) - .withMessageDigest(messageDigest) - .withEsClient(esClient) .withInputStream(inputStream) .withDoc(doc) .build(); diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java index 2e861569f..db99eb7bc 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java @@ -18,6 +18,12 @@ */ package fr.pilato.elasticsearch.crawler.fs; +import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient; +import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; + +import java.security.MessageDigest; +import java.util.Map; + /** * Custom pipeline implementations will implement this interface. * A pipeline will find the document with basic metadata in the context, @@ -25,5 +31,48 @@ * to Elasticsearch through the {@link EsIndexProcessor} */ public interface ProcessingPipeline { + /** + * Process one file + * @param ctx the context in which to find the file inputstream as well as + * other details about the file + */ void processFile(FsCrawlerContext ctx); + + /** + * Initialize the pipeline with settings and ES client objects + */ + void init(Config config); + + public static class Config { + private final FsSettings fsSettings; + private final ElasticsearchClient esClient; + private MessageDigest messageDigest; + private Map configMap; + + public Config(FsSettings fsSettings, + ElasticsearchClient esClient, + MessageDigest messageDigest, + Map configMap) { + this.fsSettings = fsSettings; + this.esClient = esClient; + this.messageDigest = messageDigest; + this.configMap = configMap; + } + + public FsSettings getFsSettings() { + return fsSettings; + } + + public ElasticsearchClient getEsClient() { + return esClient; + } + + public Map getConfigMap() { + return configMap; + } + + public MessageDigest getMessageDigest() { + return messageDigest; + } + } } diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java index b22f908c5..84f096ce8 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java @@ -19,6 +19,7 @@ package fr.pilato.elasticsearch.crawler.fs; import fr.pilato.elasticsearch.crawler.fs.beans.DocParser; +import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; import fr.pilato.elasticsearch.crawler.fs.tika.TikaDocParser; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; import java.lang.invoke.MethodHandles; +import java.security.MessageDigest; /** * Tikaprocessor will parse full text and update the Doc instance on context. @@ -36,18 +38,25 @@ */ public class TikaProcessor implements Processor { private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); + private FsSettings fsSettings; + private MessageDigest messageDigest; + + public TikaProcessor(FsSettings fsSettings, MessageDigest messageDigest) { + this.fsSettings = fsSettings; + this.messageDigest = messageDigest; + } @Override public void process(FsCrawlerContext ctx) throws ProcessingException { try(InputStream stream = ctx.getInputStream()) { long startTime = System.currentTimeMillis(); TikaDocParser.generate( - ctx.getFsSettings(), + fsSettings, stream, ctx.getFile().getName(), ctx.getFullFilename(), ctx.getDoc(), - ctx.getMessageDigest(), + messageDigest, ctx.getFile().getSize()); logger.debug("Parsing document {} with Tika in {}ms", ctx.getFile().getName(), System.currentTimeMillis() - startTime); diff --git a/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java b/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java index 5b91e5e00..5ccc5a165 100644 --- a/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java +++ b/core/src/test/java/fr/pilato/elasticsearch/crawler/fs/EsIndexProcessorTest.java @@ -16,7 +16,7 @@ public void mergeExtraDoc() { geo.put("hello", "world"); extra.put("geo", geo); String json = "{\"file\" : {\"extension\" : \"pdf\"} }"; - String modifiedJson = new EsIndexProcessor().mergeExtraDoc(json, extra); + String modifiedJson = EsIndexProcessor.mergeExtraDoc(json, extra); assertEquals("{\"file\":{\"extension\":\"pdf\"},\"geo\":{\"hello\":\"world\"}}", modifiedJson); } } \ No newline at end of file From a7f8bf5addbb950b862ff08cab08706354fa537e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sun, 6 Sep 2020 02:37:57 +0200 Subject: [PATCH 10/17] Remove empty constructor, add debug msg when a file enters the pieline --- .../elasticsearch/crawler/fs/DefaultProcessingPipeline.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java index 4f340d4e1..9918d8366 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java @@ -35,10 +35,10 @@ public class DefaultProcessingPipeline implements ProcessingPipeline { protected EsIndexProcessor es; protected Config config; - public DefaultProcessingPipeline() { } - @Override public void processFile(FsCrawlerContext ctx) { + logger.debug("Starting processing of file {}", ctx.getFullFilename()); + // Extracting content with Tika tika.process(ctx); From 601112bae59f357cd27acfc40abe919651a4919e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sun, 6 Sep 2020 16:16:14 +0200 Subject: [PATCH 11/17] Tikaprocessor should not close stream, it is done elsewhere --- .../fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java index 84f096ce8..547c90622 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java @@ -48,11 +48,11 @@ public TikaProcessor(FsSettings fsSettings, MessageDigest messageDigest) { @Override public void process(FsCrawlerContext ctx) throws ProcessingException { - try(InputStream stream = ctx.getInputStream()) { + try { long startTime = System.currentTimeMillis(); TikaDocParser.generate( fsSettings, - stream, + ctx.getInputStream(), ctx.getFile().getName(), ctx.getFullFilename(), ctx.getDoc(), From 4b779d60252958e2eab4f10c4a664c1315db82ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sun, 6 Sep 2020 17:39:30 +0200 Subject: [PATCH 12/17] Extract Tika and indexing into separate methods, to make it even easier to subclass the `DefaultProcessingPipeline`, calling these two methods plus your own in addition. Add some javadoc --- .../crawler/fs/DefaultProcessingPipeline.java | 25 +++++++++++++++---- .../crawler/fs/ProcessingPipeline.java | 8 ++++-- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java index 9918d8366..48d60fc66 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java @@ -31,18 +31,26 @@ public class DefaultProcessingPipeline implements ProcessingPipeline { private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); + protected Config config; protected TikaProcessor tika; protected EsIndexProcessor es; - protected Config config; + /** + * Parse the file with Tika and index to ES. + * Sub classes can override this method to define their own custom processing. + * @param ctx the context in which to find the file inputstream as well as + */ @Override public void processFile(FsCrawlerContext ctx) { logger.debug("Starting processing of file {}", ctx.getFullFilename()); + extractTextWithTika(ctx); + indexToEs(ctx); + } - // Extracting content with Tika - tika.process(ctx); - - // Index to es + /** + * Indexes document using {@link EsIndexProcessor} + */ + protected void indexToEs(FsCrawlerContext ctx) { if (FsCrawlerUtil.isIndexable(ctx.getDoc().getContent(), config.getFsSettings().getFs().getFilters())) { es.process(ctx); } else { @@ -51,6 +59,13 @@ public void processFile(FsCrawlerContext ctx) { } } + /** + * Parse text and metadata from document using {@link TikaProcessor} + */ + protected void extractTextWithTika(FsCrawlerContext ctx) { + tika.process(ctx); + } + @Override public void init(Config config) { this.config = config; diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java index db99eb7bc..968b612ae 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java @@ -32,7 +32,7 @@ */ public interface ProcessingPipeline { /** - * Process one file + * Process one file. * @param ctx the context in which to find the file inputstream as well as * other details about the file */ @@ -43,7 +43,11 @@ public interface ProcessingPipeline { */ void init(Config config); - public static class Config { + /** + * This class holds configurations and the ES client for a processing pipeline. + * The configMap supports sending arbitrary configuration to a pipeline. + */ + class Config { private final FsSettings fsSettings; private final ElasticsearchClient esClient; private MessageDigest messageDigest; From c2d5e31ff26e23be7825a0d4cbf4be36bf2c2547 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sun, 6 Sep 2020 17:42:33 +0200 Subject: [PATCH 13/17] Revert some unnecessary changes, try to please codacy --- .../elasticsearch/crawler/fs/FsParserAbstract.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index ceb49cc6a..94151b68a 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -241,7 +241,7 @@ private void updateFsJob(String jobName, LocalDateTime scanDate) throws Exceptio fsJobFileHandler.write(jobName, fsJob); } - private void addFilesRecursively(FileAbstractor abstractor, String filepath, LocalDateTime lastScanDate) + private void addFilesRecursively(FileAbstractor path, String filepath, LocalDateTime lastScanDate) throws Exception { logger.debug("indexing [{}] content", filepath); @@ -251,9 +251,9 @@ private void addFilesRecursively(FileAbstractor abstractor, String filepath, return; } - final Collection children = abstractor.getFiles(filepath); - final Collection fsFolders = new HashSet<>(); - final Collection fsFiles = new HashSet<>(); + final Collection children = path.getFiles(filepath); + Collection fsFiles = new ArrayList<>(); + Collection fsFolders = new ArrayList<>(); if (children != null) { boolean ignoreFolder = false; @@ -287,7 +287,7 @@ private void addFilesRecursively(FileAbstractor abstractor, String filepath, if (isFileSizeUnderLimit(fsSettings.getFs().getIgnoreAbove(), child.getSize())) { try { indexFile(child, stats, filepath, - fsSettings.getFs().isIndexContent() || fsSettings.getFs().isStoreSource() ? abstractor.getInputStream(child) : null, child.getSize()); + fsSettings.getFs().isIndexContent() || fsSettings.getFs().isStoreSource() ? path.getInputStream(child) : null, child.getSize()); stats.addFile(); } catch (java.io.FileNotFoundException e) { if (fsSettings.getFs().isContinueOnError()) { @@ -316,7 +316,7 @@ private void addFilesRecursively(FileAbstractor abstractor, String filepath, fsFolders.add(child.getFullpath()); indexDirectory(child.getFullpath()); } - addFilesRecursively(abstractor, child.getFullpath(), lastScanDate); + addFilesRecursively(path, child.getFullpath(), lastScanDate); } else { logger.debug(" - other: {}", filename); logger.debug("Not a file nor a dir. Skipping {}", child.getFullpath()); From d3c0272cae047b8f400b47c98dcfce17fc1f40d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sun, 6 Sep 2020 17:53:30 +0200 Subject: [PATCH 14/17] Remove unused import --- .../fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index 94151b68a..658b6e21f 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -57,7 +57,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.stream.Collectors; import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.computeVirtualPathName; From a4ab68aa1a7e0b022d01e4857d2b7b1b703d03a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Sun, 6 Sep 2020 17:53:53 +0200 Subject: [PATCH 15/17] Remove unused import --- .../java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java index 547c90622..aff6313f0 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java @@ -25,7 +25,6 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.io.InputStream; import java.lang.invoke.MethodHandles; import java.security.MessageDigest; From ca37bd33157e1866f96befac21b464371d1e04fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Mon, 7 Sep 2020 16:34:10 +0200 Subject: [PATCH 16/17] TikaProcessor configurable for what metadata fields to extract --- .../crawler/fs/DefaultProcessingPipeline.java | 6 ++-- .../crawler/fs/FsParserAbstract.java | 4 +-- .../crawler/fs/ProcessingException.java | 8 +++++ .../crawler/fs/ProcessingPipeline.java | 3 +- .../crawler/fs/TikaProcessor.java | 8 +++-- .../crawler/fs/tika/TikaDocParser.java | 33 +++++++++++++++++++ 6 files changed, 55 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java index 48d60fc66..6ef75eb47 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/DefaultProcessingPipeline.java @@ -22,7 +22,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.Collections; /** * The processing pipeline that will be used if not overridden. @@ -67,9 +69,9 @@ protected void extractTextWithTika(FsCrawlerContext ctx) { } @Override - public void init(Config config) { + public void init(Config config) throws IOException { this.config = config; - tika = new TikaProcessor(config.getFsSettings(), config.getMessageDigest()); + tika = new TikaProcessor(config.getFsSettings(), config.getMessageDigest(), Collections.emptyMap()); es = new EsIndexProcessor(config.getFsSettings(), config.getEsClient()); } } diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index 658b6e21f..60b807139 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -109,7 +109,7 @@ public abstract class FsParserAbstract extends FsParser { pipeline = (ProcessingPipeline) clazz.getDeclaredConstructor().newInstance(); pipeline.init(new ProcessingPipeline.Config(fsSettings, esClient, messageDigest, Collections.emptyMap())); logger.info("Created processing pipeline {}", this.pipeline.getClass().getName()); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + } catch (Exception e) { logger.error("Could not create processing pipeline"); e.printStackTrace(); } @@ -452,7 +452,7 @@ private void indexFile(FileAbstractModel fileAbstractModel, ScanStatistic stats, // Create the Doc object (only needed when we have add_as_inner_object: true (default) or when we don't index json or xml) if (fsSettings.getFs().isAddAsInnerObject() || (!fsSettings.getFs().isJsonSupport() && !fsSettings.getFs().isXmlSupport())) { - String fullFilename = new File(dirname, filename).toString(); + String fullFilename = new File(dirname, filename).getAbsolutePath(); Doc doc = new Doc(); diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java index 368870296..0817e7566 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingException.java @@ -25,4 +25,12 @@ public class ProcessingException extends RuntimeException { public ProcessingException(Throwable e) { super(e); } + + public ProcessingException(String reason) { + super(reason); + } + + public ProcessingException(String reason, Throwable e) { + super(reason, e); + } } diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java index 968b612ae..bbcfbd884 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/ProcessingPipeline.java @@ -21,6 +21,7 @@ import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClient; import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; +import java.io.IOException; import java.security.MessageDigest; import java.util.Map; @@ -41,7 +42,7 @@ public interface ProcessingPipeline { /** * Initialize the pipeline with settings and ES client objects */ - void init(Config config); + void init(Config config) throws IOException; /** * This class holds configurations and the ES client for a processing pipeline. diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java index aff6313f0..ad93bd4fe 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/TikaProcessor.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.security.MessageDigest; +import java.util.Map; /** * Tikaprocessor will parse full text and update the Doc instance on context. @@ -39,10 +40,12 @@ public class TikaProcessor implements Processor { private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass()); private FsSettings fsSettings; private MessageDigest messageDigest; + private Map metadataMapping; - public TikaProcessor(FsSettings fsSettings, MessageDigest messageDigest) { + public TikaProcessor(FsSettings fsSettings, MessageDigest messageDigest, Map metadataMapping) { this.fsSettings = fsSettings; this.messageDigest = messageDigest; + this.metadataMapping = metadataMapping; } @Override @@ -56,7 +59,8 @@ public void process(FsCrawlerContext ctx) throws ProcessingException { ctx.getFullFilename(), ctx.getDoc(), messageDigest, - ctx.getFile().getSize()); + ctx.getFile().getSize(), + metadataMapping); logger.debug("Parsing document {} with Tika in {}ms", ctx.getFile().getName(), System.currentTimeMillis() - startTime); logger.trace("Parsed doc={}", DocParser.toJson(ctx.getDoc())); diff --git a/tika/src/main/java/fr/pilato/elasticsearch/crawler/fs/tika/TikaDocParser.java b/tika/src/main/java/fr/pilato/elasticsearch/crawler/fs/tika/TikaDocParser.java index bcb4e855c..6620d598b 100644 --- a/tika/src/main/java/fr/pilato/elasticsearch/crawler/fs/tika/TikaDocParser.java +++ b/tika/src/main/java/fr/pilato/elasticsearch/crawler/fs/tika/TikaDocParser.java @@ -20,6 +20,7 @@ package fr.pilato.elasticsearch.crawler.fs.tika; import fr.pilato.elasticsearch.crawler.fs.beans.Doc; +import fr.pilato.elasticsearch.crawler.fs.beans.Meta; import fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil; import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; import org.apache.commons.io.input.TeeInputStream; @@ -38,6 +39,10 @@ import java.util.ArrayList; import java.util.Base64; import java.util.List; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; @@ -54,6 +59,23 @@ public class TikaDocParser { public static void generate(FsSettings fsSettings, InputStream inputStream, String filename, String fullFilename, Doc doc, MessageDigest messageDigest, long filesize) throws IOException { + generate(fsSettings, inputStream, filename, fullFilename, doc, messageDigest, filesize, Collections.emptyMap()); + } + + /** + * Generate the document with metadata based on input stream + * @param fsSettings config + * @param inputStream stream of the file + * @param filename simple file name + * @param fullFilename full file name with path + * @param doc {@link Doc} which will end up as Elastic doc + * @param messageDigest digest to use for hash generation + * @param filesize size of the file + * @param metadataMappings mapping of TikaKey->ElasticField for what additional / custom metadata to extract + * @throws IOException + */ + public static void generate(FsSettings fsSettings, InputStream inputStream, String filename, String fullFilename, Doc doc, + MessageDigest messageDigest, long filesize, Map metadataMappings) throws IOException { logger.trace("Generating document [{}]", fullFilename); // Extracting content with Tika // See #38: https://github.com/dadoonet/fscrawler/issues/38 @@ -133,6 +155,7 @@ public static void generate(FsSettings fsSettings, InputStream inputStream, Stri } // File + Arrays.stream(metadata.names()).forEach(n -> logger.trace("Meta: {}={}", n, metadata.get(n))); // Standard Meta setMeta(fullFilename, metadata, TikaCoreProperties.CREATOR, doc.getMeta()::setAuthor, Function.identity()); setMeta(fullFilename, metadata, TikaCoreProperties.TITLE, doc.getMeta()::setTitle, Function.identity()); @@ -173,6 +196,16 @@ public static void generate(FsSettings fsSettings, InputStream inputStream, Stri setMeta(fullFilename, metadata, TikaCoreProperties.RATING, doc.getMeta()::setRating, (value) -> value == null ? null : Integer.parseInt(value)); setMeta(fullFilename, metadata, TikaCoreProperties.COMMENTS, doc.getMeta()::setComments, Function.identity()); + // Additional metadata is mapped into an ES field inside 'meta' + if (!metadataMappings.isEmpty()) { + Meta meta = doc.getMeta(); + metadataMappings.entrySet().forEach(m -> { + String metaValue = metadata.get(m.getKey()); + meta.addRaw(m.getValue(), metaValue); + logger.debug("Mapped custom metadata {}={} from {}", m.getValue(), metaValue, m.getKey()); + }); + } + // Add support for more OOTB standard metadata if (fsSettings.getFs().isRawMetadata()) { From fe03ca8f3ad74771976043f667c92f3fbca2ff71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Mon, 7 Sep 2020 16:42:51 +0200 Subject: [PATCH 17/17] Throw exception if pipeline is not found --- .../fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java index 60b807139..0090c7552 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParserAbstract.java @@ -104,14 +104,14 @@ public abstract class FsParserAbstract extends FsParser { messageDigest = null; } + // Look for a custom processing pipeline try { Class clazz = FsParserAbstract.class.getClassLoader().loadClass(fsSettings.getFs().getPipeline().getClassName()); pipeline = (ProcessingPipeline) clazz.getDeclaredConstructor().newInstance(); pipeline.init(new ProcessingPipeline.Config(fsSettings, esClient, messageDigest, Collections.emptyMap())); logger.info("Created processing pipeline {}", this.pipeline.getClass().getName()); } catch (Exception e) { - logger.error("Could not create processing pipeline"); - e.printStackTrace(); + throw new RuntimeException("Could not create processing pipeline " + fsSettings.getFs().getPipeline().getClassName() + ". giving up"); } // On Windows, when using SSH server, we need to force the "Linux" separator