diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/KafkaProcessorsHealthCheck.java b/src/main/java/org/dependencytrack/event/kafka/processor/KafkaProcessorsHealthCheck.java new file mode 100644 index 000000000..d70862b18 --- /dev/null +++ b/src/main/java/org/dependencytrack/event/kafka/processor/KafkaProcessorsHealthCheck.java @@ -0,0 +1,17 @@ +package org.dependencytrack.event.kafka.processor; + +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.Liveness; + +import static org.dependencytrack.event.kafka.processor.KafkaProcessorsInitializer.PROCESSOR_MANAGER; + +@Liveness +public class KafkaProcessorsHealthCheck implements HealthCheck { + + @Override + public HealthCheckResponse call() { + return PROCESSOR_MANAGER.probeHealth(); + } + +} diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/KafkaRecordProcessorInitializer.java b/src/main/java/org/dependencytrack/event/kafka/processor/KafkaProcessorsInitializer.java similarity index 67% rename from src/main/java/org/dependencytrack/event/kafka/processor/KafkaRecordProcessorInitializer.java rename to src/main/java/org/dependencytrack/event/kafka/processor/KafkaProcessorsInitializer.java index 546725540..8f23379ff 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/KafkaRecordProcessorInitializer.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/KafkaProcessorsInitializer.java @@ -9,31 +9,32 @@ import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; -public class KafkaRecordProcessorInitializer implements ServletContextListener { +public class KafkaProcessorsInitializer implements ServletContextListener { - private static final Logger LOGGER = Logger.getLogger(KafkaRecordProcessorInitializer.class); + private static final Logger LOGGER = Logger.getLogger(KafkaProcessorsInitializer.class); - private final RecordProcessorManager processorManager = new RecordProcessorManager(); + static final RecordProcessorManager PROCESSOR_MANAGER = new RecordProcessorManager(); @Override public void contextInitialized(final ServletContextEvent event) { LOGGER.info("Initializing Kafka processors"); - processorManager.register(VulnerabilityMirrorProcessor.PROCESSOR_NAME, + PROCESSOR_MANAGER.register(VulnerabilityMirrorProcessor.PROCESSOR_NAME, new VulnerabilityMirrorProcessor(), KafkaTopics.NEW_VULNERABILITY); - processorManager.register(RepositoryMetaResultProcessor.PROCESSOR_NAME, + PROCESSOR_MANAGER.register(RepositoryMetaResultProcessor.PROCESSOR_NAME, new RepositoryMetaResultProcessor(), KafkaTopics.REPO_META_ANALYSIS_RESULT); if (Config.getInstance().getPropertyAsBoolean(ConfigKey.TMP_DELAY_BOM_PROCESSED_NOTIFICATION)) { - processorManager.register(DelayedBomProcessedNotificationProcessor.PROCESSOR_NAME, + PROCESSOR_MANAGER.register(DelayedBomProcessedNotificationProcessor.PROCESSOR_NAME, new DelayedBomProcessedNotificationProcessor(), KafkaTopics.NOTIFICATION_PROJECT_VULN_ANALYSIS_COMPLETE); } - processorManager.startAll(); + + PROCESSOR_MANAGER.startAll(); } @Override public void contextDestroyed(final ServletContextEvent event) { LOGGER.info("Stopping Kafka processors"); - processorManager.close(); + PROCESSOR_MANAGER.close(); } } diff --git a/src/main/java/org/dependencytrack/event/kafka/processor/api/RecordProcessorManager.java b/src/main/java/org/dependencytrack/event/kafka/processor/api/RecordProcessorManager.java index 317084dfb..9c9139674 100644 --- a/src/main/java/org/dependencytrack/event/kafka/processor/api/RecordProcessorManager.java +++ b/src/main/java/org/dependencytrack/event/kafka/processor/api/RecordProcessorManager.java @@ -5,6 +5,7 @@ import alpine.common.metrics.Metrics; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; import io.confluent.parallelconsumer.ParallelStreamProcessor; import io.github.resilience4j.core.IntervalFunction; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; @@ -14,6 +15,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.dependencytrack.event.kafka.KafkaTopics.Topic; +import org.eclipse.microprofile.health.HealthCheckResponse; import java.time.Duration; import java.util.HashMap; @@ -87,6 +89,31 @@ public void startAll() { } } + public HealthCheckResponse probeHealth() { + final var responseBuilder = HealthCheckResponse.named("kafka-processors"); + + boolean isUp = true; + for (final Map.Entry managedProcessorEntry : managedProcessors.entrySet()) { + final String processorName = managedProcessorEntry.getKey(); + final ParallelStreamProcessor parallelConsumer = managedProcessorEntry.getValue().parallelConsumer(); + final boolean isProcessorUp = !parallelConsumer.isClosedOrFailed(); + + responseBuilder.withData(processorName, isProcessorUp + ? HealthCheckResponse.Status.UP.name() + : HealthCheckResponse.Status.DOWN.name()); + if (isProcessorUp + && parallelConsumer instanceof final ParallelEoSStreamProcessor concreteParallelConsumer + && concreteParallelConsumer.getFailureCause() != null) { + responseBuilder.withData("%s_failure_reason".formatted(processorName), + concreteParallelConsumer.getFailureCause().getMessage()); + } + + isUp &= isProcessorUp; + } + + return responseBuilder.status(isUp).build(); + } + @Override public void close() { for (final Map.Entry managedProcessorEntry : managedProcessors.entrySet()) { diff --git a/src/main/java/org/dependencytrack/health/HealthCheckInitializer.java b/src/main/java/org/dependencytrack/health/HealthCheckInitializer.java index 60ad5dd04..7b7e9112f 100644 --- a/src/main/java/org/dependencytrack/health/HealthCheckInitializer.java +++ b/src/main/java/org/dependencytrack/health/HealthCheckInitializer.java @@ -24,6 +24,7 @@ import alpine.server.health.checks.DatabaseHealthCheck; import io.github.mweirauch.micrometer.jvm.extras.ProcessMemoryMetrics; import io.github.mweirauch.micrometer.jvm.extras.ProcessThreadMetrics; +import org.dependencytrack.event.kafka.processor.KafkaProcessorsHealthCheck; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; @@ -36,7 +37,7 @@ public class HealthCheckInitializer implements ServletContextListener { public void contextInitialized(final ServletContextEvent event) { LOGGER.info("Registering health checks"); HealthCheckRegistry.getInstance().register("database", new DatabaseHealthCheck()); - HealthCheckRegistry.getInstance().register("kafka-streams", new KafkaStreamsHealthCheck()); + HealthCheckRegistry.getInstance().register("kafka-processors", new KafkaProcessorsHealthCheck()); // TODO: Move this to its own initializer if it turns out to be useful LOGGER.info("Registering extra process metrics"); diff --git a/src/main/webapp/WEB-INF/web.xml b/src/main/webapp/WEB-INF/web.xml index d6b206117..099a11dc1 100644 --- a/src/main/webapp/WEB-INF/web.xml +++ b/src/main/webapp/WEB-INF/web.xml @@ -48,7 +48,7 @@ org.dependencytrack.event.EventSubsystemInitializer - org.dependencytrack.event.kafka.processor.KafkaRecordProcessorInitializer + org.dependencytrack.event.kafka.processor.KafkaProcessorsInitializer org.dependencytrack.event.kafka.streams.KafkaStreamsInitializer