Skip to content

Commit

Permalink
Add health check for Kafka processors
Browse files Browse the repository at this point in the history
Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Dec 30, 2023
1 parent 40897cb commit befe8cb
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.dependencytrack.event.kafka.processor;

import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.Liveness;

import static org.dependencytrack.event.kafka.processor.KafkaProcessorsInitializer.PROCESSOR_MANAGER;

@Liveness
public class KafkaProcessorsHealthCheck implements HealthCheck {

@Override
public HealthCheckResponse call() {
return PROCESSOR_MANAGER.probeHealth();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,32 @@
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class KafkaRecordProcessorInitializer implements ServletContextListener {
public class KafkaProcessorsInitializer implements ServletContextListener {

private static final Logger LOGGER = Logger.getLogger(KafkaRecordProcessorInitializer.class);
private static final Logger LOGGER = Logger.getLogger(KafkaProcessorsInitializer.class);

private final RecordProcessorManager processorManager = new RecordProcessorManager();
static final RecordProcessorManager PROCESSOR_MANAGER = new RecordProcessorManager();

@Override
public void contextInitialized(final ServletContextEvent event) {
LOGGER.info("Initializing Kafka processors");

processorManager.register(VulnerabilityMirrorProcessor.PROCESSOR_NAME,
PROCESSOR_MANAGER.register(VulnerabilityMirrorProcessor.PROCESSOR_NAME,
new VulnerabilityMirrorProcessor(), KafkaTopics.NEW_VULNERABILITY);
processorManager.register(RepositoryMetaResultProcessor.PROCESSOR_NAME,
PROCESSOR_MANAGER.register(RepositoryMetaResultProcessor.PROCESSOR_NAME,
new RepositoryMetaResultProcessor(), KafkaTopics.REPO_META_ANALYSIS_RESULT);
if (Config.getInstance().getPropertyAsBoolean(ConfigKey.TMP_DELAY_BOM_PROCESSED_NOTIFICATION)) {
processorManager.register(DelayedBomProcessedNotificationProcessor.PROCESSOR_NAME,
PROCESSOR_MANAGER.register(DelayedBomProcessedNotificationProcessor.PROCESSOR_NAME,
new DelayedBomProcessedNotificationProcessor(), KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE);
}
processorManager.startAll();

PROCESSOR_MANAGER.startAll();
}

@Override
public void contextDestroyed(final ServletContextEvent event) {
LOGGER.info("Stopping Kafka processors");
processorManager.close();
PROCESSOR_MANAGER.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import alpine.common.metrics.Metrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.github.resilience4j.core.IntervalFunction;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
Expand All @@ -14,6 +15,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.dependencytrack.event.kafka.KafkaTopics.Topic;
import org.eclipse.microprofile.health.HealthCheckResponse;

import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -87,6 +89,31 @@ public void startAll() {
}
}

public HealthCheckResponse probeHealth() {
final var responseBuilder = HealthCheckResponse.named("kafka-processors");

boolean isUp = true;
for (final Map.Entry<String, ManagedRecordProcessor> managedProcessorEntry : managedProcessors.entrySet()) {
final String processorName = managedProcessorEntry.getKey();
final ParallelStreamProcessor<?, ?> parallelConsumer = managedProcessorEntry.getValue().parallelConsumer();
final boolean isProcessorUp = !parallelConsumer.isClosedOrFailed();

responseBuilder.withData(processorName, isProcessorUp
? HealthCheckResponse.Status.UP.name()
: HealthCheckResponse.Status.DOWN.name());
if (isProcessorUp
&& parallelConsumer instanceof final ParallelEoSStreamProcessor<?, ?> concreteParallelConsumer
&& concreteParallelConsumer.getFailureCause() != null) {
responseBuilder.withData("%s_failure_reason".formatted(processorName),
concreteParallelConsumer.getFailureCause().getMessage());
}

isUp &= isProcessorUp;
}

return responseBuilder.status(isUp).build();
}

@Override
public void close() {
for (final Map.Entry<String, ManagedRecordProcessor> managedProcessorEntry : managedProcessors.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import alpine.server.health.checks.DatabaseHealthCheck;
import io.github.mweirauch.micrometer.jvm.extras.ProcessMemoryMetrics;
import io.github.mweirauch.micrometer.jvm.extras.ProcessThreadMetrics;
import org.dependencytrack.event.kafka.processor.KafkaProcessorsHealthCheck;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
Expand All @@ -36,7 +37,7 @@ public class HealthCheckInitializer implements ServletContextListener {
public void contextInitialized(final ServletContextEvent event) {
LOGGER.info("Registering health checks");
HealthCheckRegistry.getInstance().register("database", new DatabaseHealthCheck());
HealthCheckRegistry.getInstance().register("kafka-streams", new KafkaStreamsHealthCheck());
HealthCheckRegistry.getInstance().register("kafka-processors", new KafkaProcessorsHealthCheck());

// TODO: Move this to its own initializer if it turns out to be useful
LOGGER.info("Registering extra process metrics");
Expand Down
2 changes: 1 addition & 1 deletion src/main/webapp/WEB-INF/web.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<listener-class>org.dependencytrack.event.EventSubsystemInitializer</listener-class>
</listener>
<listener>
<listener-class>org.dependencytrack.event.kafka.processor.KafkaRecordProcessorInitializer</listener-class>
<listener-class>org.dependencytrack.event.kafka.processor.KafkaProcessorsInitializer</listener-class>
</listener>
<listener>
<listener-class>org.dependencytrack.event.kafka.streams.KafkaStreamsInitializer</listener-class>
Expand Down

0 comments on commit befe8cb

Please sign in to comment.