From c36e260393a666fca16d107a5f2bcb10d766b8d6 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Thu, 9 Jan 2025 16:13:22 +0100 Subject: [PATCH] fix: Use correct user-defined groupId in Kafka adapter (#3409) --- .../kafka/adapter/KafkaProtocol.java | 8 ++--- .../shared/kafka/KafkaAdapterConfig.java | 32 ------------------- .../shared/kafka/KafkaConfigExtractor.java | 11 ++++--- .../messaging/kafka/SpKafkaConsumer.java | 1 + .../kafka/config/AbstractConfigFactory.java | 2 +- 5 files changed, 12 insertions(+), 42 deletions(-) delete mode 100644 streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java index 780d454e60..0e00e94b01 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/adapter/KafkaProtocol.java @@ -31,7 +31,7 @@ import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor; import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig; -import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig; +import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaBaseConfig; import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor; import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider; import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor; @@ -74,7 +74,7 @@ public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig { private static final Logger LOG = LoggerFactory.getLogger(KafkaProtocol.class); - private KafkaAdapterConfig config; + private KafkaBaseConfig config; public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka"; @@ -88,7 +88,7 @@ private void applyConfiguration(IStaticPropertyExtractor extractor) { this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, true); } - private Consumer createConsumer(KafkaAdapterConfig kafkaConfig) throws KafkaException { + private Consumer createConsumer(KafkaBaseConfig kafkaConfig) throws KafkaException { final Properties props = new Properties(); kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props)); @@ -96,8 +96,6 @@ private Consumer createConsumer(KafkaAdapterConfig kafkaConfig) props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId()); - props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java deleted file mode 100644 index e24dccf040..0000000000 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaAdapterConfig.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.extensions.connectors.kafka.shared.kafka; - -public class KafkaAdapterConfig extends KafkaBaseConfig { - - private String groupId; - - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } -} diff --git a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java index 0289fb56ca..98c964ef44 100644 --- a/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java +++ b/streampipes-extensions/streampipes-connectors-kafka/src/main/java/org/apache/streampipes/extensions/connectors/kafka/shared/kafka/KafkaConfigExtractor.java @@ -28,6 +28,7 @@ import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender; import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.security.auth.SecurityProtocol; import java.util.ArrayList; @@ -51,10 +52,10 @@ public class KafkaConfigExtractor { - public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor extractor, + public KafkaBaseConfig extractAdapterConfig(IStaticPropertyExtractor extractor, boolean containsTopic) { - var config = extractCommonConfigs(extractor, new KafkaAdapterConfig()); + var config = extractCommonConfigs(extractor, new KafkaBaseConfig()); var topic = ""; if (containsTopic) { @@ -62,11 +63,13 @@ public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor extracto } config.setTopic(topic); + var groupId = ""; if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)) { - config.setGroupId("StreamPipesKafkaConsumer" + System.currentTimeMillis()); + groupId = "StreamPipesKafkaConsumer" + System.currentTimeMillis(); } else { - config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT, String.class)); + groupId = extractor.singleValueParameter(GROUP_ID_INPUT, String.class); } + config.getConfigAppenders().add(new SimpleConfigAppender(Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId))); StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG, StaticPropertyAlternatives.class); diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java index fcbd38224f..6b8f1bbc23 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java @@ -82,6 +82,7 @@ public void run() { Properties props = makeProperties(protocol, appenders); + LOG.info("Using kafka properties: {}", props.toString()); KafkaConsumer consumer = new KafkaConsumer<>(props); if (!patternTopic) { consumer.subscribe(Collections.singletonList(topic)); diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java index 8225986c6d..4a12c6341c 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java @@ -48,6 +48,6 @@ public Properties buildProperties(List appenders) { Properties props = makeDefaultProperties(); appenders.forEach(appender -> appender.appendConfig(props)); - return props; + return props; } }