Skip to content

Commit

Permalink
fix: Use correct user-defined groupId in Kafka adapter (#3409)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer authored Jan 9, 2025
1 parent 19eeb1a commit c36e260
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaAdapterConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaBaseConfig;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigExtractor;
import org.apache.streampipes.extensions.connectors.kafka.shared.kafka.KafkaConfigProvider;
import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
Expand Down Expand Up @@ -74,7 +74,7 @@
public class KafkaProtocol implements StreamPipesAdapter, SupportsRuntimeConfig {

private static final Logger LOG = LoggerFactory.getLogger(KafkaProtocol.class);
private KafkaAdapterConfig config;
private KafkaBaseConfig config;

public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.kafka";

Expand All @@ -88,16 +88,14 @@ private void applyConfiguration(IStaticPropertyExtractor extractor) {
this.config = new KafkaConfigExtractor().extractAdapterConfig(extractor, true);
}

private Consumer<byte[], byte[]> createConsumer(KafkaAdapterConfig kafkaConfig) throws KafkaException {
private Consumer<byte[], byte[]> createConsumer(KafkaBaseConfig kafkaConfig) throws KafkaException {
final Properties props = new Properties();

kafkaConfig.getConfigAppenders().forEach(c -> c.appendConfig(props));

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());

props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());

props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslConfigAppender;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;

import java.util.ArrayList;
Expand All @@ -51,22 +52,24 @@

public class KafkaConfigExtractor {

public KafkaAdapterConfig extractAdapterConfig(IStaticPropertyExtractor extractor,
public KafkaBaseConfig extractAdapterConfig(IStaticPropertyExtractor extractor,
boolean containsTopic) {

var config = extractCommonConfigs(extractor, new KafkaAdapterConfig());
var config = extractCommonConfigs(extractor, new KafkaBaseConfig());

var topic = "";
if (containsTopic) {
topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
}
config.setTopic(topic);

var groupId = "";
if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)) {
config.setGroupId("StreamPipesKafkaConsumer" + System.currentTimeMillis());
groupId = "StreamPipesKafkaConsumer" + System.currentTimeMillis();
} else {
config.setGroupId(extractor.singleValueParameter(GROUP_ID_INPUT, String.class));
groupId = extractor.singleValueParameter(GROUP_ID_INPUT, String.class);
}
config.getConfigAppenders().add(new SimpleConfigAppender(Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId)));

StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
StaticPropertyAlternatives.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void run() {

Properties props = makeProperties(protocol, appenders);

LOG.info("Using kafka properties: {}", props.toString());
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
if (!patternTopic) {
consumer.subscribe(Collections.singletonList(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ public Properties buildProperties(List<KafkaConfigAppender> appenders) {
Properties props = makeDefaultProperties();
appenders.forEach(appender -> appender.appendConfig(props));

return props;
return props;
}
}

0 comments on commit c36e260

Please sign in to comment.