Skip to content

Commit

Permalink
Resolve Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
meghasemim1999 committed Oct 23, 2023
1 parent 81f5a4f commit aed72cc
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Source/BSN.Commons/Infrastructure/Kafka/IKafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace BSN.Commons.Infrastructure.Kafka
/// Represents A Kafka Api compatible consumer class
/// </summary>
/// <typeparam name="T">The object type that is being consumed</typeparam>
public interface IKafkaConsumer<T> : IDisposable
public interface IKafkaConsumer<T>
{
/// <summary>
/// Consumes the Kafka api
Expand Down
2 changes: 1 addition & 1 deletion Source/BSN.Commons/Infrastructure/Kafka/IKafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public KafkaProduceException(string message) : base(message) {}
/// Represents A Kafka Api compatible producer class
/// </summary>
/// <typeparam name="T">The object type that is being produced</typeparam>
public interface IKafkaProducer<T> : IDisposable
public interface IKafkaProducer<T>
{
/// <summary>
/// Produces the Kafka api, raises exceptions for errors
Expand Down
6 changes: 0 additions & 6 deletions Source/BSN.Commons/Infrastructure/Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,5 @@ public Task<T> ConsumeAsync(CancellationToken ct = default)
}

private readonly IConsumer<Null, T> _consumer;

/// <inheritdoc />
public void Dispose()
{
_consumer?.Dispose();
}
}
}
48 changes: 31 additions & 17 deletions Source/BSN.Commons/Infrastructure/Kafka/KafkaConsumerFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System.Collections.Generic;

namespace BSN.Commons.Infrastructure.Kafka
{
Expand All @@ -9,36 +11,48 @@ public class KafkaConsumerFactory<T> : IKafkaConsumerFactory<T>
/// <param name="options">Default Options for KafkaConsumers</param>
public KafkaConsumerFactory(IKafkaConsumerOptions options)
{
_defaultConsumerConfig = new ConsumerConfig
{
BootstrapServers = options.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest
};

// Here we did this because the ReceiveMessageMaxBytes in ProducerConfig type
// is int and can not accept high values that we expect
_defaultConsumerConfig.Set("receive.message.max.bytes", options.ReceiveMessageMaxBytes);
_defaultConsumerOptions = options;
}

/// <inheritdoc />
public IKafkaConsumer<T> Create(string topic, string groupId)
{
lock (this)
var consumerKey = topic + ":" + groupId;

if (_consumers.ContainsKey(consumerKey))
{
var config = new ConsumerConfig(_defaultConsumerConfig) { GroupId = groupId };
var consumer = new ConsumerBuilder<Null, T>(config).Build();
consumer.Subscribe(topic);

return new KafkaConsumer<T>(consumer);
return new KafkaConsumer<T>(_consumers[consumerKey]);
}

var config = new ConsumerConfig()
{
BootstrapServers = _defaultConsumerOptions.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest,
};


// Here we did this because the ReceiveMessageMaxBytes in ProducerConfig type
// is int and can not accept high values that we expect
config.Set("receive.message.max.bytes", _defaultConsumerOptions.ReceiveMessageMaxBytes);

var consumer = new ConsumerBuilder<Null, T>(config).Build();
consumer.Subscribe(topic);

_consumers.Add(consumerKey, consumer);

return new KafkaConsumer<T>(consumer);
}

private readonly ConsumerConfig _defaultConsumerConfig;
private readonly Dictionary<string, IConsumer<Null, T>> _consumers;
private IKafkaConsumerOptions _defaultConsumerOptions;

/// <inheritdoc />
public void Dispose()
{
// TODO release managed resources here
foreach (var consumer in _consumers)
{
consumer.Value.Dispose();
}
}
}
}
15 changes: 13 additions & 2 deletions Source/BSN.Commons/Infrastructure/Kafka/KafkaConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@
/// <inheritdoc />
public class KafkaConsumerOptions : IKafkaConsumerOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="KafkaConsumerOptions"/> class.
/// </summary>
/// <param name="bootstrapServers">List of kafka bootstrap servers seperated by ;</param>
/// <param name="receiveMessageMaxBytes">Represents the Size that a Receiving message can have in Bytes</param>
public KafkaConsumerOptions(string bootstrapServers, string receiveMessageMaxBytes)
{
BootstrapServers = bootstrapServers;
ReceiveMessageMaxBytes = receiveMessageMaxBytes;
}

/// <inheritdoc />
public string BootstrapServers { get; set; }
public string BootstrapServers { get; }

/// <inheritdoc />
public string ReceiveMessageMaxBytes { get; set; }
public string ReceiveMessageMaxBytes { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ ILogger logger
{
BootstrapServers = _kafkaConnectionOptions.BootstrapServers
});

_consumerFactory = new KafkaConsumerFactory<string>(
new KafkaConsumerOptions
{
BootstrapServers = _kafkaConnectionOptions.BootstrapServers,
ReceiveMessageMaxBytes = _kafkaConnectionOptions.ReceiveMessageMaxBytes,
});
new KafkaConsumerOptions(bootstrapServers: _kafkaConnectionOptions.BootstrapServers, receiveMessageMaxBytes: _kafkaConnectionOptions.ReceiveMessageMaxBytes)
);
}

/// <summary>
Expand Down Expand Up @@ -151,27 +148,12 @@ public void UnSubscribe<TEvent, TEventDataModel>(IEventReceiver eventReceiver) w

/// <inheritdoc />
public void Dispose()
{
foreach (var producer in _producers)
{
producer.Value.Dispose();
}

foreach (var consumer in _consumers)
{
foreach (var ct in _consumersCancellationTokenSources)
{
_consumersCancellationTokenSources[consumer.Key].Cancel();

consumer.Value.Dispose();
ct.Value.Cancel();
}

_producers.Clear();

_consumers.Clear();

_consumersCancellationTokenSources.Clear();

_subscriptionManager.Clear();

_producerFactory.Dispose();

_consumerFactory.Dispose();
Expand Down

0 comments on commit aed72cc

Please sign in to comment.