Skip to content

Commit

Permalink
feat: introduce support with SASL_SSL kafka authentication
Browse files Browse the repository at this point in the history
Co-authored-by: Maya Sastges <[email protected]>
  • Loading branch information
SvenKube and ironmaya committed Jan 24, 2024
1 parent 76931ed commit 7b4669f
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 17 deletions.
8 changes: 8 additions & 0 deletions service.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ service {

kafka {
bootstrap-servers = "localhost:9092"

// Optionally set the SASL_SSL security protocol and provide user and password to enable kafka authentication
// security {
// protocol = "SASL_SSL"
// user = ""
// password = ""
// }

consumer-group = "ari-proxy" // optional, default: ari-proxy
commands-topic = "ari-commands-topic"
events-and-responses-topic = "ari-eventsandresponses-topic"
Expand Down
36 changes: 23 additions & 13 deletions src/main/java/io/retel/ariproxy/KafkaConsumerActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -95,19 +101,23 @@ public Receive<Object> createReceive() {
}

private Consumer<String, String> createConsumer() {

Map<String, Object> config =
Map.of(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getString("bootstrap-servers"),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class,
ConsumerConfig.GROUP_ID_CONFIG,
kafkaConfig.getString("consumer-group"));
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getString("bootstrap-servers"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getString("consumer-group"));

if ("SASL_SSL".equals(kafkaConfig.getString("security.protocol"))) {
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name());
config.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName());
config.put(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
.formatted(
kafkaConfig.getString("security.user"),
kafkaConfig.getString("security.password")));
}

return new KafkaConsumer<>(config);
}
Expand Down
39 changes: 35 additions & 4 deletions src/main/java/io/retel/ariproxy/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +48,9 @@ public class Main {
private static final String NAME = "name";
private static final String HTTPPORT = "httpport";
public static final String KAFKA = "kafka";
public static final String KAFKA_SECURITY_PROTOCOL = "security.protocol";
public static final String KAFKA_SECURITY_USER = "security.user";
public static final String KAFKA_SECURITY_PASSWORD = "security.password";
public static final String REST = "rest";
private static final Duration HEALTH_REPORT_TIMEOUT = Duration.ofMillis(100);
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
Expand Down Expand Up @@ -135,8 +143,8 @@ public static void main(String[] args) {
private static Sink<ProducerRecord<String, String>, CompletionStage<Done>> createProducerSink(
final Config kafkaConfig, final ActorSystem<Void> system) {
final ProducerSettings<String, String> producerSettings =
ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS));
getKafkaProducerSettings(kafkaConfig, system);

return Producer.plainSink(
producerSettings.withProducer(producerSettings.createKafkaProducer()));
}
Expand All @@ -159,8 +167,7 @@ private static void runAriEventProcessor(
Source.<Message>maybe().viaMat(restartWebsocketFlow, Keep.right());

final ProducerSettings<String, String> producerSettings =
ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(serviceConfig.getConfig(KAFKA).getString(BOOTSTRAP_SERVERS));
getKafkaProducerSettings(serviceConfig.getConfig(KAFKA), system);

final Sink<ProducerRecord<String, String>, NotUsed> sink =
Producer.plainSink(producerSettings).mapMaterializedValue(done -> NotUsed.getInstance());
Expand All @@ -178,6 +185,30 @@ private static void runAriEventProcessor(
}
}

private static ProducerSettings<String, String> getKafkaProducerSettings(
final Config kafkaConfig, final ActorSystem<?> system) {
ProducerSettings<String, String> producerSettings =
ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(kafkaConfig.getString(BOOTSTRAP_SERVERS));

if ("SASL_SSL".equals(kafkaConfig.getString(KAFKA_SECURITY_PROTOCOL))) {
producerSettings =
producerSettings
.withProperty(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name())
.withProperty(
SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName())
.withProperty(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
.formatted(
kafkaConfig.getString(KAFKA_SECURITY_USER),
kafkaConfig.getString(KAFKA_SECURITY_PASSWORD)));
}

return producerSettings;
}

// NOTE: We need this method because the resulting flow can only be materialized once;
// see:
// https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#websocketclientflow
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/retel/ariproxy/health/KafkaConnectionCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaConnectionCheck {
Expand Down Expand Up @@ -59,6 +63,20 @@ private static KafkaConsumer<String, String> createKafkaConsumer(final Config ka
ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getString(CONSUMER_GROUP));
kafkaProperties.setProperty(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getString(BOOTSTRAP_SERVERS));

if ("SASL_SSL".equals(kafkaConfig.getString("security.protocol"))) {
kafkaProperties.setProperty(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name());
kafkaProperties.setProperty(
SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName());
kafkaProperties.setProperty(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
.formatted(
kafkaConfig.getString("security.user"),
kafkaConfig.getString("security.password")));
}

return new KafkaConsumer<>(kafkaProperties);
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ service {
}

kafka {
security {
protocol = "PLAIN"
user = ""
password = ""
}

auto-offset-reset = "earliest"
parallel-consumer-max-concurrency = 1
}
Expand Down

0 comments on commit 7b4669f

Please sign in to comment.