Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3669] Fix deletion of obsolete metrics in KafkaConsumer. #3672

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
Expand Down Expand Up @@ -94,7 +99,13 @@ public class HonoKafkaConsumer<V> implements Lifecycle, ServiceClient {
*/
public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 250;

private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(30);
/**
* The name of the configuration property to set the delay after which obsolete metrics for a deleted Kafka topic
* are removed from the metrics of the Kafka consumer.
*/
public static final String CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = "hono.obsolete.metrics.removal.delay.millis";

private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS_DEFAULT = TimeUnit.SECONDS.toMillis(30);
private static final String MSG_CONSUMER_NOT_INITIALIZED_STARTED = "consumer not initialized/started";
private static final Logger LOG = LoggerFactory.getLogger(HonoKafkaConsumer.class);

Expand Down Expand Up @@ -500,6 +511,16 @@ public final boolean isRecordHandlingAndPollingPaused() {
return pollingPaused.get();
}

/**
* Get the metrics kept by the consumer.
*
* @return The metrics.
* @throws IllegalStateException if invoked before the KafkaConsumer is set via the {@link #start()} method.
*/
public final Map<MetricName, ? extends Metric> metrics() {
return getUnderlyingConsumer().metrics();
}

/**
* Gets the used vert.x KafkaConsumer.
*
Expand Down Expand Up @@ -625,7 +646,7 @@ postponing record handling until consumer has been initialized \
/**
* {@inheritDoc}
* <p>
* This methods triggers the creation of a Kafka consumer in the background. A new attempt to create the
* This method triggers the creation of a Kafka consumer in the background. A new attempt to create the
* consumer is made periodically until creation succeeds or the {@link #stop()} method has been invoked.
* <p>
* Client code may {@linkplain #addOnKafkaConsumerReadyHandler(Handler) register a dedicated handler}
Expand Down Expand Up @@ -893,9 +914,13 @@ private void updateSubscribedTopicPatternTopicsAndRemoveMetrics() {
.filter(t -> !subscribedTopicPatternTopics.contains(t))
.collect(Collectors.toSet());
if (!deletedTopics.isEmpty()) {
LOG.debug("deleted topics: {}", deletedTopics);
// actual removal to be done with a delay, as there might still be unprocessed fetch response data
// regarding these topics, in which case metrics would get re-created after they were removed
runOnContext(v -> vertx.setTimer(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, tid -> {
final long obsoleteMetricsRemovalDelayMillis = Optional
.ofNullable(consumerConfig.get(CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS))
.map(Long::parseLong).orElse(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS_DEFAULT);
runOnContext(v -> vertx.setTimer(obsoleteMetricsRemovalDelayMillis, tid -> {
runOnKafkaWorkerThread(v2 -> {
removeMetricsForDeletedTopics(deletedTopics.stream()
.filter(t -> !subscribedTopicPatternTopics.contains(t)));
Expand Down Expand Up @@ -985,7 +1010,7 @@ protected void onPartitionsRevokedBlocking(final Set<TopicPartition> partitionsS
* <p>
* This default implementation does nothing. Subclasses may override this method.
*
* @param partitionsSet The list of partitions that are not assigned to this consumer any more.
* @param partitionsSet The list of partitions that are not assigned to this consumer anymore.
*/
protected void onPartitionsLostBlocking(final Set<TopicPartition> partitionsSet) {
// do nothing by default
Expand Down Expand Up @@ -1050,7 +1075,7 @@ protected void runOnContext(final Handler<Void> codeToRun) {
/**
* Runs the given handler on the Kafka polling thread.
* <p>
* The invocation of the handler is skipped if the this consumer is already closed.
* The invocation of the handler is skipped if this consumer is already closed.
*
* @param handler The handler to invoke.
* @throws IllegalStateException if the corresponding executor service isn't available because no subscription
Expand Down Expand Up @@ -1151,7 +1176,7 @@ public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(final S
LOG.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", topic);
return Future.succeededFuture();
}

LOG.debug("ensureTopicIsAmongSubscribedTopics: called for topic [{}]", topic);
synchronized (subscriptionUpdateTrackersForToBeAddedTopics) {
final var tracker = new SubscriptionUpdateTracker(topic);

Expand All @@ -1168,7 +1193,7 @@ public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(final S

private void triggerTopicPatternSubscriptionUpdate() {
if (!subscriptionUpdateTriggered.compareAndSet(false, true)) {
LOG.debug("ensureTopicIsAmongSubscribedTopics: subscription update already triggered");
LOG.debug("triggerTopicPatternSubscriptionUpdate: subscription update already triggered");
return;
}
runOnKafkaWorkerThread(v -> {
Expand Down Expand Up @@ -1198,7 +1223,7 @@ private void triggerTopicPatternSubscriptionUpdate() {
try {
LOG.info("triggering refresh of subscribed topic list ...");
getUnderlyingConsumer().subscribe(topicPattern, rebalanceListener);
if (!metadataMaxAge.isPresent() || metadataMaxAge.get() > THRESHOLD_METADATA_MAX_AGE_MS) {
if (metadataMaxAge.isEmpty() || metadataMaxAge.get() > THRESHOLD_METADATA_MAX_AGE_MS) {
// Partitions of newly created topics are being assigned by means of
// a rebalance. We make sure the rebalancing happens during the next poll()
// operation in order to not having to wait for the metadata to become stale
Expand All @@ -1217,35 +1242,46 @@ private void triggerTopicPatternSubscriptionUpdate() {
}

private void failAllSubscriptionUpdateTrackers(final Exception failure) {
final List<SubscriptionUpdateTracker> toBeFailedTrackers = new ArrayList<>();
final List<SubscriptionUpdateTracker> toBeFailedTrackers;
synchronized (subscriptionUpdateTrackersForToBeAddedTopics) {
toBeFailedTrackers.addAll(subscriptionUpdateTrackersForToBeAddedTopics.values());
toBeFailedTrackers = new ArrayList<>(subscriptionUpdateTrackersForToBeAddedTopics.values());
subscriptionUpdateTrackersForToBeAddedTopics.clear();
}
toBeFailedTrackers.forEach(tracker -> {
runOnContext(v -> tracker.fail(failure));
});
toBeFailedTrackers.forEach(tracker -> runOnContext(v -> tracker.fail(failure)));
}

private void removeMetricsForDeletedTopics(final Stream<String> deletedTopics) {
final Metrics metrics = getInternalMetricsObject(kafkaConsumer.unwrap());
if (metrics != null) {
deletedTopics.forEach(topic -> {
LOG.debug("removing metrics for deleted topic: {}", topic);
metrics.removeSensor("topic." + topic + ".bytes-fetched");
metrics.removeSensor("topic." + topic + ".records-fetched");
});
}
}

@SuppressWarnings("unchecked")
private Metrics getInternalMetricsObject(final Consumer<String, V> consumer) {
if (consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer) {
try {
final Field field = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("metrics");
field.setAccessible(true);
return (Metrics) field.get(consumer);
final Field delegateField = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("delegate");
delegateField.setAccessible(true);
final ConsumerDelegate<String, V> delegate = (ConsumerDelegate<String, V>) delegateField.get(consumer);
if (delegate instanceof AsyncKafkaConsumer) {
final Field metricsField = AsyncKafkaConsumer.class.getDeclaredField("metrics");
metricsField.setAccessible(true);
return (Metrics) metricsField.get(delegate);
} else if (delegate instanceof LegacyKafkaConsumer) {
final Field metricsField = LegacyKafkaConsumer.class.getDeclaredField("metrics");
metricsField.setAccessible(true);
return (Metrics) metricsField.get(delegate);
}
} catch (final Exception e) {
LOG.warn("failed to get metrics object", e);
}
} else {
LOG.warn("unsupported consumer type: {}", consumer.getClass().getName());
}
return null;
}
Expand Down Expand Up @@ -1300,7 +1336,7 @@ private ExecutorService getKafkaConsumerWorker(final KafkaConsumer<String, V> co
return worker;
}

private final class SubscriptionUpdateTracker {
private static final class SubscriptionUpdateTracker {
private final Promise<Void> outcome = Promise.promise();
private final String topicName;
private final AtomicInteger rebalancesLeft = new AtomicInteger(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -30,7 +31,9 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.MetricName;
import org.eclipse.hono.client.kafka.consumer.AsyncHandlingAutoCommitKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.tests.EnabledIfMessagingSystemConfigured;
import org.eclipse.hono.tests.IntegrationTestSupport;
import org.eclipse.hono.util.MessagingType;
Expand All @@ -49,6 +52,7 @@
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.junit5.Timeout;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.kafka.admin.KafkaAdminClient;
Expand Down Expand Up @@ -171,13 +175,13 @@ public void testConsumerReadsAllRecordsForDynamicallyCreatedTopics(

final var topicsToPublishTo = IntStream.range(0, numTopicsAndRecords)
.mapToObj(i -> "%s%d".formatted(patternPrefix, i))
.collect(Collectors.toList());
.toList();

// create some matching topics - these shall be deleted after consumer start;
// this shall make sure that topic deletion doesn't influence the test result
final var otherTopics = IntStream.range(0, numTopicsAndRecords)
.mapToObj(i -> "%s%d_other".formatted(patternPrefix, i))
.collect(Collectors.toList());
.toList();

final var recordsReceived = ctx.checkpoint(numTopicsAndRecords);
final String recordKey = "addedAfterStartKey";
Expand Down Expand Up @@ -235,6 +239,99 @@ public void testConsumerReadsAllRecordsForDynamicallyCreatedTopics(
}
}

/**
* Verifies that a topic-pattern based AsyncHandlingAutoCommitKafkaConsumer removes topic-related metrics
* once a topic that matches the topic-pattern gets deleted.
*
* NOTE: The logic for removing the metrics is located in HonoKafkaConsumer, therefore there should better be a test
* in HonoKafkaConsumerIT. But this proves to be difficult with the current integration test Kafka setup, where
* topic auto-creation is enabled in the broker config. For some reason, even when setting 'allow.auto.create.topics=false'
* for the consumer and having ensured that offsets got committed before topic-deletion (along with disabled standard
* auto-commit), the topic gets again auto-created some time after it got deleted, letting the test fail.
* With the manual auto-commit handling of the AsyncHandlingAutoCommitKafkaConsumer, this isn't the case.
*
* @param partitionAssignmentStrategy The partition assignment strategy to use for the consumer.
* @param ctx The vert.x test context.
*/
@ParameterizedTest(name = IntegrationTestSupport.PARAMETERIZED_TEST_NAME_PATTERN)
@MethodSource("partitionAssignmentStrategies")
@Timeout(value = 10, timeUnit = TimeUnit.SECONDS)
public void testPatternBasedConsumerRemovesMetricsOfDeletedTopics(
final String partitionAssignmentStrategy,
final VertxTestContext ctx) {

// prepare consumer
final var consumerConfig = IntegrationTestSupport.getKafkaConsumerConfig().getConsumerConfig("test");
applyPartitionAssignmentStrategy(consumerConfig, partitionAssignmentStrategy);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerConfig.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
consumerConfig.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100");
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerConfig.put(HonoKafkaConsumer.CONFIG_HONO_OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, "200");

final Promise<Void> recordReceivedPromise = Promise.promise();
final Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler = record -> {
LOG.debug("received record: {}", record);
recordReceivedPromise.complete();
return Future.succeededFuture();
};
final String topicPrefix = "test_" + UUID.randomUUID();
final String topic = topicPrefix + "_toBeDeleted";
kafkaConsumer = new AsyncHandlingAutoCommitKafkaConsumer<>(vertx, Pattern.compile(topicPrefix + ".*"),
recordHandler, consumerConfig);
// create topic and start consumer
final Promise<Void> consumerReadyTracker = Promise.promise();
kafkaConsumer.addOnKafkaConsumerReadyHandler(consumerReadyTracker);
adminClient.createTopics(List.of(new NewTopic(topic, 1, REPLICATION_FACTOR)))
.compose(ok -> kafkaConsumer.start())
.compose(ok -> consumerReadyTracker.future())
.compose(ok -> {
ctx.verify(() -> assertThat(recordReceivedPromise.future().isComplete()).isFalse());
LOG.debug("consumer started, publishing record to be received by the consumer...");
return Future.all(
publish(topic, "recordKey", Buffer.buffer("testPayload")),
recordReceivedPromise.future());
})
.compose(ok -> {
LOG.debug("waiting for offset to be committed");
final Promise<Void> offsetCommitCheckTracker = Promise.promise();
final AtomicInteger checkCount = new AtomicInteger(0);
vertx.setPeriodic(100, tid -> {
if (!kafkaConsumer.isOffsetsCommitNeededForTopic(topic)) {
vertx.cancelTimer(tid);
offsetCommitCheckTracker.complete();
} else if (checkCount.incrementAndGet() >= 10) {
vertx.cancelTimer(tid);
offsetCommitCheckTracker.fail("timeout waiting for offset commit");
}
});
return offsetCommitCheckTracker.future();
})
.compose(ok -> {
ctx.verify(() -> assertThat(getTopicRelatedMetricNames(topic)).isNotEmpty());
LOG.debug("delete topic {}", topic);
return adminClient.deleteTopics(List.of(topic));
})
.compose(ok -> {
LOG.debug("waiting for metrics to be removed...");
final Promise<Void> metricCheckTracker = Promise.promise();
final AtomicInteger checkCount = new AtomicInteger(0);
vertx.setPeriodic(200, tid -> {
LOG.debug("topic-related metrics: {}", getTopicRelatedMetricNames(topic));
if (getTopicRelatedMetricNames(topic).isEmpty()) {
vertx.cancelTimer(tid);
metricCheckTracker.complete();
} else if (checkCount.incrementAndGet() >= 40) {
vertx.cancelTimer(tid);
metricCheckTracker.fail("timeout waiting for metrics to be removed");
}
});
return metricCheckTracker.future();
})
.onComplete(ctx.succeeding(v -> ctx.completeNow()));
}

private Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopicsAndPublish(
final VertxTestContext ctx,
final String topic,
Expand Down Expand Up @@ -288,5 +385,12 @@ private void applyPartitionAssignmentStrategy(final Map<String, String> consumer
Optional.ofNullable(partitionAssignmentStrategy)
.ifPresent(s -> consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, s));
}

private List<String> getTopicRelatedMetricNames(final String topicName) {
return kafkaConsumer.metrics().keySet().stream()
.filter(metricName -> metricName.tags().containsValue(topicName))
.map(MetricName::name)
.collect(Collectors.toList());
}
}

Loading