Skip to content

Commit

Permalink
reading kafka key and value serializer from config
Browse files Browse the repository at this point in the history
  • Loading branch information
ag060 committed Jan 8, 2025
1 parent 1dc8456 commit 0b24e0c
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.akto.kafka.KafkaConfig;
import com.akto.kafka.KafkaConsumerConfig;
import com.akto.kafka.KafkaProducerConfig;
import com.akto.kafka.Serializer;
import com.akto.threat.backend.client.IPLookupClient;
import com.akto.threat.backend.service.MaliciousEventService;
import com.akto.threat.backend.service.ThreatActorService;
Expand Down Expand Up @@ -56,6 +57,8 @@ public static void main(String[] args) throws Exception {
.build())
.setProducerConfig(
KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(1000).build())
.setKeySerializer(Serializer.STRING)
.setValueSerializer(Serializer.STRING)
.build();

IPLookupClient ipLookupClient = new IPLookupClient(getMaxmindFile());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
public class MaliciousEventService {

private final Kafka kafka;
private MongoClient mongoClient;
private IPLookupClient ipLookupClient;
private final MongoClient mongoClient;
private final IPLookupClient ipLookupClient;

public MaliciousEventService(
KafkaConfig kafkaConfig, MongoClient mongoClient, IPLookupClient ipLookupClient) {
Expand All @@ -44,8 +44,6 @@ public MaliciousEventService(
}

public void recordMaliciousEvent(String accountId, RecordMaliciousEventRequest request) {
System.out.println("Received malicious event: " + request);

MaliciousEventMessage evt = request.getMaliciousEvent();
String actor = evt.getActor();
String filterId = evt.getFilterId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class FlushMessagesToDB {

Expand All @@ -32,11 +35,22 @@ public class FlushMessagesToDB {

public FlushMessagesToDB(KafkaConfig kafkaConfig, MongoClient mongoClient) {
String kafkaBrokerUrl = kafkaConfig.getBootstrapServers();
String groupId = kafkaConfig.getGroupId();

Properties properties =
Utils.configProperties(
kafkaBrokerUrl, groupId, kafkaConfig.getConsumerConfig().getMaxPollRecords());
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl);
properties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
kafkaConfig.getKeySerializer().getDeserializer());
properties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
kafkaConfig.getValueSerializer().getDeserializer());
properties.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
kafkaConfig.getConsumerConfig().getMaxPollRecords());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

this.kafkaConsumer = new KafkaConsumer<>(properties);
this.kafkaConfig = kafkaConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.akto.kafka.KafkaConfig;
import com.akto.kafka.KafkaConsumerConfig;
import com.akto.kafka.KafkaProducerConfig;
import com.akto.kafka.Serializer;
import com.akto.threat.detection.constants.KafkaTopic;
import com.akto.threat.detection.session_factory.SessionFactoryUtils;
import com.akto.threat.detection.tasks.CleanupTask;
Expand All @@ -19,7 +20,7 @@ public class Main {

private static final String CONSUMER_GROUP_ID = "akto.threat_detection";

public static void main(String[] args) throws Exception {
public static void main(String[] args) {
runMigrations();

SessionFactory sessionFactory = SessionFactoryUtils.createFactory();
Expand All @@ -37,6 +38,8 @@ public static void main(String[] args) throws Exception {
.build())
.setProducerConfig(
KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(100).build())
.setKeySerializer(Serializer.STRING)
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();

KafkaConfig internalKafka =
Expand All @@ -50,6 +53,8 @@ public static void main(String[] args) throws Exception {
.build())
.setProducerConfig(
KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(100).build())
.setKeySerializer(Serializer.STRING)
.setValueSerializer(Serializer.BYTE_ARRAY)
.build();

new MaliciousTrafficDetectorTask(trafficKafka, internalKafka, createRedisClient()).run();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package com.akto.threat.detection.kafka;

import com.akto.kafka.KafkaConfig;
import com.akto.kafka.Serializer;
import com.google.protobuf.Message;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProtoProducer {
private KafkaProducer<String, byte[]> producer;
private final KafkaProducer<String, byte[]> producer;
public boolean producerReady;

public KafkaProtoProducer(KafkaConfig kafkaConfig) {
this.producer = generateProducer(
kafkaConfig.getBootstrapServers(),
kafkaConfig.getProducerConfig().getLingerMs(),
kafkaConfig.getProducerConfig().getBatchSize());
this.producer =
generateProducer(
kafkaConfig.getBootstrapServers(),
kafkaConfig.getProducerConfig().getLingerMs(),
kafkaConfig.getProducerConfig().getBatchSize());
}

public void send(String topic, Message message) {
Expand All @@ -28,21 +29,21 @@ public void close() {
producer.close(Duration.ofMillis(0)); // close immediately
}

private KafkaProducer<String, byte[]> generateProducer(String brokerIP, int lingerMS, int batchSize) {
if (producer != null)
close(); // close existing producer connection
private KafkaProducer<String, byte[]> generateProducer(
String brokerIP, int lingerMS, int batchSize) {
if (producer != null) close(); // close existing producer connection

int requestTimeoutMs = 5000;
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.STRING.getSerializer());
kafkaProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.BYTE_ARRAY.getSerializer());
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS);
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0);
kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, lingerMS + requestTimeoutMs);
return new KafkaProducer<String, byte[]>(kafkaProps);
return new KafkaProducer<>(kafkaProps);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.akto.threat.detection.tasks;

import com.akto.kafka.KafkaConfig;
import com.akto.runtime.utils.Utils;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
Expand All @@ -11,7 +11,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public abstract class AbstractKafkaConsumerTask<V> implements Task {

Expand All @@ -23,15 +22,17 @@ public AbstractKafkaConsumerTask(KafkaConfig kafkaConfig, String kafkaTopic) {
this.kafkaTopic = kafkaTopic;
this.kafkaConfig = kafkaConfig;

String kafkaBrokerUrl = kafkaConfig.getBootstrapServers();
String groupId = kafkaConfig.getGroupId();

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConfig.getConsumerConfig().getMaxPollRecords());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
properties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
kafkaConfig.getValueSerializer().getDeserializer());
properties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
kafkaConfig.getValueSerializer().getDeserializer());
properties.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
kafkaConfig.getConsumerConfig().getMaxPollRecords());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Expand All @@ -49,8 +50,9 @@ public void run() {
() -> {
// Poll data from Kafka topic
while (true) {
ConsumerRecords<String, V> records = kafkaConsumer.poll(
Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli()));
ConsumerRecords<String, V> records =
kafkaConsumer.poll(
Duration.ofMillis(kafkaConfig.getConsumerConfig().getPollDurationMilli()));
if (records.isEmpty()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleRequestKafkaEnvelope;
import com.akto.proto.http_response_param.v1.HttpResponseParam;
import com.akto.rules.TestPlugin;
import com.akto.runtime.utils.Utils;
import com.akto.test_editor.execution.VariableResolver;
import com.akto.test_editor.filter.data_operands_impl.ValidationResult;
import com.akto.threat.detection.actor.SourceIPActorGenerator;
Expand All @@ -38,10 +37,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.*;

/*
Class is responsible for consuming traffic data from the Kafka topic.
Expand All @@ -66,13 +62,21 @@ public MaliciousTrafficDetectorTask(
KafkaConfig trafficConfig, KafkaConfig internalConfig, RedisClient redisClient) {
this.kafkaConfig = trafficConfig;

String kafkaBrokerUrl = trafficConfig.getBootstrapServers();
String groupId = trafficConfig.getGroupId();

this.kafkaConsumer =
new KafkaConsumer<>(
Utils.configProperties(
kafkaBrokerUrl, groupId, trafficConfig.getConsumerConfig().getMaxPollRecords()));
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, trafficConfig.getBootstrapServers());
properties.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
trafficConfig.getKeySerializer().getDeserializer());
properties.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
trafficConfig.getValueSerializer().getDeserializer());
properties.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
trafficConfig.getConsumerConfig().getMaxPollRecords());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, trafficConfig.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
this.kafkaConsumer = new KafkaConsumer<>(properties);

this.httpCallParser = new HttpCallParser(120, 1000);

Expand Down
2 changes: 1 addition & 1 deletion libs/protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.28.3</version>
<version>4.29.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
Expand Down
29 changes: 22 additions & 7 deletions libs/utils/src/main/java/com/akto/kafka/Kafka.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.akto.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,18 +16,29 @@ public Kafka(KafkaConfig kafkaConfig) {
this(
kafkaConfig.getBootstrapServers(),
kafkaConfig.getProducerConfig().getLingerMs(),
kafkaConfig.getProducerConfig().getBatchSize());
kafkaConfig.getProducerConfig().getBatchSize(),
kafkaConfig.getKeySerializer(),
kafkaConfig.getValueSerializer());
}

public Kafka(String brokerIP, int lingerMS, int batchSize) {
public Kafka(
String brokerIP,
int lingerMS,
int batchSize,
Serializer keySerializer,
Serializer valueSerializer) {
producerReady = false;
try {
setProducer(brokerIP, lingerMS, batchSize);
setProducer(brokerIP, lingerMS, batchSize, keySerializer, valueSerializer);
} catch (Exception e) {
e.printStackTrace();
}
}

public Kafka(String brokerIP, int lingerMS, int batchSize) {
this(brokerIP, 0, 0, Serializer.STRING, Serializer.STRING);
}

public void send(String message, String topic) {
if (!this.producerReady) return;

Expand All @@ -41,14 +51,19 @@ public void close() {
producer.close(Duration.ofMillis(0)); // close immediately
}

private void setProducer(String brokerIP, int lingerMS, int batchSize) {
private void setProducer(
String brokerIP,
int lingerMS,
int batchSize,
Serializer keySerializer,
Serializer valueSerializer) {
if (producer != null) close(); // close existing producer connection

int requestTimeoutMs = 5000;
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getSerializer());
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getSerializer());
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS);
kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0);
Expand Down
Loading

0 comments on commit 0b24e0c

Please sign in to comment.