Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add processing pipeline support #1619

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@
<artifactId>jcommander</artifactId>
</dependency>

<!-- Open Telemetry -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>1.24.0-alpha</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.24.0-alpha</version>
</dependency>


<!-- Test dependencies -->
<dependency>
<groupId>fr.pilato.elasticsearch.crawler</groupId>
Expand All @@ -40,4 +65,16 @@

</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.24.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@
import java.util.List;
import java.util.Scanner;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;


import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.*;

/**
Expand Down Expand Up @@ -104,6 +120,9 @@ public static class FsCrawlerCommand {


public static void main(String[] args) throws Exception {

setupOtel();

// create a scanner so we can read the command-line input
Scanner scanner = new Scanner(System.in);

Expand Down Expand Up @@ -413,4 +432,14 @@ private static void sleep() {
Thread.currentThread().interrupt();
}
}

private static void setupOtel() {

OpenTelemetrySdk sdk = AutoConfiguredOpenTelemetrySdk.initialize()
.getOpenTelemetrySdk();

// OpenTelemetry openTelemetry = sdk.builder()
// .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
// .buildAndRegisterGlobal();
}
}
68 changes: 65 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>fscrawler-parent</artifactId>
Expand Down Expand Up @@ -45,6 +45,18 @@
</plugins>
</build>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.24.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Our framework -->
<dependency>
Expand Down Expand Up @@ -108,6 +120,30 @@
<artifactId>jersey-hk2</artifactId>
</dependency>

<!-- Telemetry -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<!-- <dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.24.0-alpha</version>
</dependency> -->
<!-- <dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-annotations</artifactId>
<version>1.23.0</version>
</dependency> -->

<!-- Test dependencies -->
<dependency>
<groupId>fr.pilato.elasticsearch.crawler</groupId>
Expand All @@ -120,6 +156,32 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.10.0</version>
<exclusions>
<exclusion>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</exclusion>
<exclusion>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>1.8.10</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-common</artifactId>
<version>1.8.10</version>
</dependency>

</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs;

import fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.lang.invoke.MethodHandles;


/**
* The processing pipeline that will be used if not overridden.
*
*/
public class DefaultProcessingPipeline implements ProcessingPipeline {
private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass());

protected Config config;
protected TikaProcessor tika;
protected EsIndexProcessor es;
protected TagsProcessor tags;
protected UpdateIdProcessor id;

/**
* Parse the file with Tika and index to ES.
* Sub classes can override this method to define their own custom processing.
* @param ctx the context in which to find the file inputstream as well as
*/
@Override
public void processFile(FsCrawlerContext ctx) {
logger.debug("Starting processing of file {}", ctx.getFullFilename());
extractTextWithTika(ctx);
updateTags(ctx);
updateId(ctx);
indexToEs(ctx);
}

/**
* Indexes document using {@link EsIndexProcessor}
*/
protected void indexToEs(FsCrawlerContext ctx) {
if (FsCrawlerUtil.isIndexable(ctx.getDoc().getContent(), config.getFsSettings().getFs().getFilters())) {
es.process(ctx);
} else {
logger.debug("We ignore file [{}] because it does not match all the patterns {}", ctx.getFile().getName(),
config.getFsSettings().getFs().getFilters());
}
}

/**
* Parse text and metadata from document using {@link TikaProcessor}
*/
protected void extractTextWithTika(FsCrawlerContext ctx) {
tika.process(ctx);
}

/**
* Overrides tags using {@link TagsProcessor} and config from ctx
*/
protected void updateTags(FsCrawlerContext ctx) {
tags.process(ctx);
}

/**
* Overrides _id using {@link UpdateIdProcesser}
*/
protected void updateId(FsCrawlerContext ctx) {
id.process(ctx);
}

@Override
public void init(Config config) throws IOException {
this.config = config;
tika = new TikaProcessor(config.getFsSettings(), config.getMessageDigest());
es = new EsIndexProcessor(config.getFsSettings(), config.getDocumentService());
tags = new TagsProcessor();
id = new UpdateIdProcessor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to David Pilato (the "Author") under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Author licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fr.pilato.elasticsearch.crawler.fs;

import fr.pilato.elasticsearch.crawler.fs.framework.FSCrawlerLogger;
import fr.pilato.elasticsearch.crawler.fs.service.FsCrawlerDocumentService;
import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings;
import io.opentelemetry.context.Scope;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.lang.invoke.MethodHandles;
import static fr.pilato.elasticsearch.crawler.fs.framework.FsCrawlerUtil.*;
import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.serialize;
import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.transform;

/**
* Pulls {@link fr.pilato.elasticsearch.crawler.fs.beans.Doc} from context,
* merges it with 'extraDoc' if it exists and then indexes it to Elasticsearch
*/
public class EsIndexProcessor extends ProcessorAbstract {
private static final Logger logger = LogManager.getLogger(MethodHandles.lookup().lookupClass());
private final FsSettings fsSettings;
private final FsCrawlerDocumentService documentService;

public EsIndexProcessor(FsSettings fsSettings, FsCrawlerDocumentService esClient) {
this.fsSettings = fsSettings;
this.documentService = esClient;
}

@Override
public void process(FsCrawlerContext ctx) throws ProcessingException {
var span = tracer.spanBuilder("es-index").startSpan();
Scope scope = span.makeCurrent();

var doc = ctx.getDoc();
var filename = ctx.getFile().getName();
var fullFilename = ctx.getFullFilename();
var id = ctx.getId();
var stats = ctx.getScanStatistic();

var postTransform = fsSettings.getFs().getPipeline().getPostTransform();

// We index the data structure
if(isIndexable(doc.getContent(), fsSettings.getFs().getFilters())) {
FSCrawlerLogger.documentDebug(id,
computeVirtualPathName(stats.getRootPath(), fullFilename),
"Indexing content");

var serializedData = serialize(doc);

if (postTransform != null) {
serializedData = transform(serializedData, postTransform);
}

documentService.indexRawJson(
fsSettings.getElasticsearch().getIndex(),
id,
serializedData,
fsSettings.getElasticsearch().getPipeline());
} else {
logger.debug("We ignore file [{}] because it does not match all the patterns {}", filename,
fsSettings.getFs().getFilters());
}

span.end();

}
}
Loading