diff --git a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java index 87ff9e50b7..a5f76d4424 100644 --- a/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java +++ b/clients/kafka-common/src/main/java/org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.java @@ -40,6 +40,11 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.ConsumerDelegate; +import org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.kafka.KafkaClientFactory; @@ -94,7 +99,13 @@ public class HonoKafkaConsumer implements Lifecycle, ServiceClient { */ public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 250; - private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(30); + /** + * The name of the configuration property to set the delay after which obsolete metrics for a deleted Kafka topic + * are removed from the metrics of the Kafka consumer. + */ + public static final String CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = "hono.obsolete.metrics.removal.delay.millis"; + + private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS_DEFAULT = TimeUnit.SECONDS.toMillis(30); private static final String MSG_CONSUMER_NOT_INITIALIZED_STARTED = "consumer not initialized/started"; private static final Logger LOG = LoggerFactory.getLogger(HonoKafkaConsumer.class); @@ -500,6 +511,16 @@ public final boolean isRecordHandlingAndPollingPaused() { return pollingPaused.get(); } + /** + * Get the metrics kept by the consumer. + * + * @return The metrics. + * @throws IllegalStateException if invoked before the KafkaConsumer is set via the {@link #start()} method. + */ + public final Map metrics() { + return getUnderlyingConsumer().metrics(); + } + /** * Gets the used vert.x KafkaConsumer. * @@ -625,7 +646,7 @@ postponing record handling until consumer has been initialized \ /** * {@inheritDoc} *

- * This methods triggers the creation of a Kafka consumer in the background. A new attempt to create the + * This method triggers the creation of a Kafka consumer in the background. A new attempt to create the * consumer is made periodically until creation succeeds or the {@link #stop()} method has been invoked. *

* Client code may {@linkplain #addOnKafkaConsumerReadyHandler(Handler) register a dedicated handler} @@ -893,9 +914,13 @@ private void updateSubscribedTopicPatternTopicsAndRemoveMetrics() { .filter(t -> !subscribedTopicPatternTopics.contains(t)) .collect(Collectors.toSet()); if (!deletedTopics.isEmpty()) { + LOG.debug("deleted topics: {}", deletedTopics); // actual removal to be done with a delay, as there might still be unprocessed fetch response data // regarding these topics, in which case metrics would get re-created after they were removed - runOnContext(v -> vertx.setTimer(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, tid -> { + final long obsoleteMetricsRemovalDelayMillis = Optional + .ofNullable(consumerConfig.get(CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS)) + .map(Long::parseLong).orElse(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS_DEFAULT); + runOnContext(v -> vertx.setTimer(obsoleteMetricsRemovalDelayMillis, tid -> { runOnKafkaWorkerThread(v2 -> { removeMetricsForDeletedTopics(deletedTopics.stream() .filter(t -> !subscribedTopicPatternTopics.contains(t))); @@ -985,7 +1010,7 @@ protected void onPartitionsRevokedBlocking(final Set partitionsS *

* This default implementation does nothing. Subclasses may override this method. * - * @param partitionsSet The list of partitions that are not assigned to this consumer any more. + * @param partitionsSet The list of partitions that are not assigned to this consumer anymore. */ protected void onPartitionsLostBlocking(final Set partitionsSet) { // do nothing by default @@ -1050,7 +1075,7 @@ protected void runOnContext(final Handler codeToRun) { /** * Runs the given handler on the Kafka polling thread. *

- * The invocation of the handler is skipped if the this consumer is already closed. + * The invocation of the handler is skipped if this consumer is already closed. * * @param handler The handler to invoke. * @throws IllegalStateException if the corresponding executor service isn't available because no subscription @@ -1151,7 +1176,7 @@ public final Future ensureTopicIsAmongSubscribedTopicPatternTopics(final S LOG.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", topic); return Future.succeededFuture(); } - + LOG.debug("ensureTopicIsAmongSubscribedTopics: called for topic [{}]", topic); synchronized (subscriptionUpdateTrackersForToBeAddedTopics) { final var tracker = new SubscriptionUpdateTracker(topic); @@ -1168,7 +1193,7 @@ public final Future ensureTopicIsAmongSubscribedTopicPatternTopics(final S private void triggerTopicPatternSubscriptionUpdate() { if (!subscriptionUpdateTriggered.compareAndSet(false, true)) { - LOG.debug("ensureTopicIsAmongSubscribedTopics: subscription update already triggered"); + LOG.debug("triggerTopicPatternSubscriptionUpdate: subscription update already triggered"); return; } runOnKafkaWorkerThread(v -> { @@ -1198,7 +1223,7 @@ private void triggerTopicPatternSubscriptionUpdate() { try { LOG.info("triggering refresh of subscribed topic list ..."); getUnderlyingConsumer().subscribe(topicPattern, rebalanceListener); - if (!metadataMaxAge.isPresent() || metadataMaxAge.get() > THRESHOLD_METADATA_MAX_AGE_MS) { + if (metadataMaxAge.isEmpty() || metadataMaxAge.get() > THRESHOLD_METADATA_MAX_AGE_MS) { // Partitions of newly created topics are being assigned by means of // a rebalance. We make sure the rebalancing happens during the next poll() // operation in order to not having to wait for the metadata to become stale @@ -1217,35 +1242,46 @@ private void triggerTopicPatternSubscriptionUpdate() { } private void failAllSubscriptionUpdateTrackers(final Exception failure) { - final List toBeFailedTrackers = new ArrayList<>(); + final List toBeFailedTrackers; synchronized (subscriptionUpdateTrackersForToBeAddedTopics) { - toBeFailedTrackers.addAll(subscriptionUpdateTrackersForToBeAddedTopics.values()); + toBeFailedTrackers = new ArrayList<>(subscriptionUpdateTrackersForToBeAddedTopics.values()); subscriptionUpdateTrackersForToBeAddedTopics.clear(); } - toBeFailedTrackers.forEach(tracker -> { - runOnContext(v -> tracker.fail(failure)); - }); + toBeFailedTrackers.forEach(tracker -> runOnContext(v -> tracker.fail(failure))); } private void removeMetricsForDeletedTopics(final Stream deletedTopics) { final Metrics metrics = getInternalMetricsObject(kafkaConsumer.unwrap()); if (metrics != null) { deletedTopics.forEach(topic -> { + LOG.debug("removing metrics for deleted topic: {}", topic); metrics.removeSensor("topic." + topic + ".bytes-fetched"); metrics.removeSensor("topic." + topic + ".records-fetched"); }); } } + @SuppressWarnings("unchecked") private Metrics getInternalMetricsObject(final Consumer consumer) { if (consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer) { try { - final Field field = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("metrics"); - field.setAccessible(true); - return (Metrics) field.get(consumer); + final Field delegateField = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("delegate"); + delegateField.setAccessible(true); + final ConsumerDelegate delegate = (ConsumerDelegate) delegateField.get(consumer); + if (delegate instanceof AsyncKafkaConsumer) { + final Field metricsField = AsyncKafkaConsumer.class.getDeclaredField("metrics"); + metricsField.setAccessible(true); + return (Metrics) metricsField.get(delegate); + } else if (delegate instanceof LegacyKafkaConsumer) { + final Field metricsField = LegacyKafkaConsumer.class.getDeclaredField("metrics"); + metricsField.setAccessible(true); + return (Metrics) metricsField.get(delegate); + } } catch (final Exception e) { LOG.warn("failed to get metrics object", e); } + } else { + LOG.warn("unsupported consumer type: {}", consumer.getClass().getName()); } return null; } @@ -1300,7 +1336,7 @@ private ExecutorService getKafkaConsumerWorker(final KafkaConsumer co return worker; } - private final class SubscriptionUpdateTracker { + private static final class SubscriptionUpdateTracker { private final Promise outcome = Promise.promise(); private final String topicName; private final AtomicInteger rebalancesLeft = new AtomicInteger(10); diff --git a/tests/src/test/java/org/eclipse/hono/tests/client/AsyncHandlingAutoCommitKafkaConsumerIT.java b/tests/src/test/java/org/eclipse/hono/tests/client/AsyncHandlingAutoCommitKafkaConsumerIT.java index 30e37679dc..6a60e4e6ca 100644 --- a/tests/src/test/java/org/eclipse/hono/tests/client/AsyncHandlingAutoCommitKafkaConsumerIT.java +++ b/tests/src/test/java/org/eclipse/hono/tests/client/AsyncHandlingAutoCommitKafkaConsumerIT.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -30,7 +31,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.MetricName; import org.eclipse.hono.client.kafka.consumer.AsyncHandlingAutoCommitKafkaConsumer; +import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer; import org.eclipse.hono.tests.EnabledIfMessagingSystemConfigured; import org.eclipse.hono.tests.IntegrationTestSupport; import org.eclipse.hono.util.MessagingType; @@ -49,6 +52,7 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.junit5.Timeout; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import io.vertx.kafka.admin.KafkaAdminClient; @@ -171,13 +175,13 @@ public void testConsumerReadsAllRecordsForDynamicallyCreatedTopics( final var topicsToPublishTo = IntStream.range(0, numTopicsAndRecords) .mapToObj(i -> "%s%d".formatted(patternPrefix, i)) - .collect(Collectors.toList()); + .toList(); // create some matching topics - these shall be deleted after consumer start; // this shall make sure that topic deletion doesn't influence the test result final var otherTopics = IntStream.range(0, numTopicsAndRecords) .mapToObj(i -> "%s%d_other".formatted(patternPrefix, i)) - .collect(Collectors.toList()); + .toList(); final var recordsReceived = ctx.checkpoint(numTopicsAndRecords); final String recordKey = "addedAfterStartKey"; @@ -235,6 +239,99 @@ public void testConsumerReadsAllRecordsForDynamicallyCreatedTopics( } } + /** + * Verifies that a topic-pattern based AsyncHandlingAutoCommitKafkaConsumer removes topic-related metrics + * once a topic that matches the topic-pattern gets deleted. + * + * NOTE: The logic for removing the metrics is located in HonoKafkaConsumer, therefore there should better be a test + * in HonoKafkaConsumerIT. But this proves to be difficult with the current integration test Kafka setup, where + * topic auto-creation is enabled in the broker config. For some reason, even when setting 'allow.auto.create.topics=false' + * for the consumer and having ensured that offsets got committed before topic-deletion (along with disabled standard + * auto-commit), the topic gets again auto-created some time after it got deleted, letting the test fail. + * With the manual auto-commit handling of the AsyncHandlingAutoCommitKafkaConsumer, this isn't the case. + * + * @param partitionAssignmentStrategy The partition assignment strategy to use for the consumer. + * @param ctx The vert.x test context. + */ + @ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN) + @MethodSource("partitionAssignmentStrategies") + @Timeout(value = 10, timeUnit = TimeUnit.SECONDS) + public void testPatternBasedConsumerRemovesMetricsOfDeletedTopics( + final String partitionAssignmentStrategy, + final VertxTestContext ctx) { + + // prepare consumer + final var consumerConfig = IntegrationTestSupport.getKafkaConsumerConfig().getConsumerConfig("test"); + applyPartitionAssignmentStrategy(consumerConfig, partitionAssignmentStrategy); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + consumerConfig.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); + consumerConfig.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100"); + consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerConfig.put(HonoKafkaConsumer.CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, "200"); + + final Promise recordReceivedPromise = Promise.promise(); + final Function, Future> recordHandler = record -> { + LOG.debug("received record: {}", record); + recordReceivedPromise.complete(); + return Future.succeededFuture(); + }; + final String topicPrefix = "test_" + UUID.randomUUID(); + final String topic = topicPrefix + "_toBeDeleted"; + kafkaConsumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Pattern.compile(topicPrefix + ".*"), + recordHandler, consumerConfig); + // create topic and start consumer + final Promise consumerReadyTracker = Promise.promise(); + kafkaConsumer.addOnKafkaConsumerReadyHandler(consumerReadyTracker); + adminClient.createTopics(List.of(new NewTopic(topic, 1, REPLICATION_FACTOR))) + .compose(ok -> kafkaConsumer.start()) + .compose(ok -> consumerReadyTracker.future()) + .compose(ok -> { + ctx.verify(() -> assertThat(recordReceivedPromise.future().isComplete()).isFalse()); + LOG.debug("consumer started, publishing record to be received by the consumer..."); + return Future.all( + publish(topic, "recordKey", Buffer.buffer("testPayload")), + recordReceivedPromise.future()); + }) + .compose(ok -> { + LOG.debug("waiting for offset to be committed"); + final Promise offsetCommitCheckTracker = Promise.promise(); + final AtomicInteger checkCount = new AtomicInteger(0); + vertx.setPeriodic(100, tid -> { + if (!kafkaConsumer.isOffsetsCommitNeededForTopic(topic)) { + vertx.cancelTimer(tid); + offsetCommitCheckTracker.complete(); + } else if (checkCount.incrementAndGet() >= 10) { + vertx.cancelTimer(tid); + offsetCommitCheckTracker.fail("timeout waiting for offset commit"); + } + }); + return offsetCommitCheckTracker.future(); + }) + .compose(ok -> { + ctx.verify(() -> assertThat(getTopicRelatedMetricNames(topic)).isNotEmpty()); + LOG.debug("delete topic {}", topic); + return adminClient.deleteTopics(List.of(topic)); + }) + .compose(ok -> { + LOG.debug("waiting for metrics to be removed..."); + final Promise metricCheckTracker = Promise.promise(); + final AtomicInteger checkCount = new AtomicInteger(0); + vertx.setPeriodic(200, tid -> { + LOG.debug("topic-related metrics: {}", getTopicRelatedMetricNames(topic)); + if (getTopicRelatedMetricNames(topic).isEmpty()) { + vertx.cancelTimer(tid); + metricCheckTracker.complete(); + } else if (checkCount.incrementAndGet() >= 40) { + vertx.cancelTimer(tid); + metricCheckTracker.fail("timeout waiting for metrics to be removed"); + } + }); + return metricCheckTracker.future(); + }) + .onComplete(ctx.succeeding(v -> ctx.completeNow())); + } + private Future ensureTopicIsAmongSubscribedTopicPatternTopicsAndPublish( final VertxTestContext ctx, final String topic, @@ -288,5 +385,12 @@ private void applyPartitionAssignmentStrategy(final Map consumer Optional.ofNullable(partitionAssignmentStrategy) .ifPresent(s -> consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, s)); } + + private List getTopicRelatedMetricNames(final String topicName) { + return kafkaConsumer.metrics().keySet().stream() + .filter(metricName -> metricName.tags().containsValue(topicName)) + .map(MetricName::name) + .collect(Collectors.toList()); + } }