Skip to content

Commit

Permalink
Add JmsEventAggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
meghasemim1999 committed Oct 23, 2023
1 parent c3a367a commit 6da1e0c
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Source/BSN.Commons/BSN.Commons.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Apache.NMS" Version="1.8.0" />
<PackageReference Include="Apache.NMS.ActiveMQ" Version="1.7.2" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.32" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.32" />
<PackageReference Include="Confluent.Kafka" Version="2.2.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ namespace BSN.Commons.Infrastructure.MessageBroker.EventContracts.EventAggregato
public interface IEvent<T> : IEvent where T : IEventDataModel
{
/// <summary>
/// Gets the data model associated with the event. The data model serves as a container for structured
/// Gets or sets the data model associated with the event. The data model serves as a container for structured
/// information, allowing events to convey specific details or context. By including data models in events,
/// components can make use of this information for various purposes, such as processing, logging, or decision-making.
/// </summary>
T DataModel { get; }
T DataModel { get; set; }
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface ISubscriber
/// <typeparam name="TEvent">The type of event to subscribe to.</typeparam>
/// <typeparam name="TEventDataModel">The type of data model associated with the event.</typeparam>
/// <param name="eventReceiver">The event receiver to subscribe.</param>
void Subscribe<TEvent, TEventDataModel>(IEventReceiver eventReceiver) where TEvent : IEvent<TEventDataModel> where TEventDataModel : IEventDataModel;
void Subscribe<TEvent, TEventDataModel>(IEventReceiver eventReceiver) where TEvent : IEvent<TEventDataModel>, new() where TEventDataModel : IEventDataModel;

/// <summary>
/// Unsubscribes an event receiver from handling events of a specific type with associated data model.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace BSN.Commons.Infrastructure.MessageBroker.Jms
{
/// <summary>
/// Represents a class containing options for configuring a JMS connection.
/// </summary>
public class JmsConnectionOptions
{
/// <summary>
/// For example: tcp://localhost:61616
/// </summary>
public string BrokerUri { get; set; }

/// <summary>
/// For example: testQueue
/// </summary>
public string QueueName { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using BSN.Commons.Infrastructure.MessageBroker.EventAggregator;
using BSN.Commons.Infrastructure.MessageBroker.EventContracts.EventAggregator;
using BSN.Commons.Infrastructure.MessageBroker.EventContracts.EventAggregator.Contracts;
using BSN.Commons.Infrastructure.MessageBroker.EventContracts.EventAggregator.EventModels;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Util;
using Newtonsoft.Json;

namespace BSN.Commons.Infrastructure.MessageBroker.Jms
{
/// <summary>
/// Represents an implementation of <see cref="IEventAggregator"/>.
/// using JMS as the underlying message broker.
/// </summary>
public class JmsEventAggregator : IEventAggregator
{
/// <summary>
/// Initializes a new instance of the <see cref="JmsEventAggregator"/> class.
/// </summary>
public JmsEventAggregator(
JmsConnectionOptions jmsConnectionOptions,
IEventAggregatorSubscriptionManager eventAggregatorSubscriptionManager
)
{
_subscriptionManager = eventAggregatorSubscriptionManager ?? new InMemoryEventAggregatorSubscriptionManager();

IConnectionFactory factory = new ConnectionFactory(jmsConnectionOptions.BrokerUri);
_connection = factory.CreateConnection();
_connection.Start();
_session = _connection.CreateSession();
_destination = SessionUtil.GetDestination(_session, jmsConnectionOptions.QueueName);

_producer = _session.CreateProducer(_destination);
_consumers = new Dictionary<string, IMessageConsumer>();
}

/// <inheritdoc />
public void Publish<TEventModel>(IEvent<TEventModel> @event) where TEventModel : IEventDataModel
{
string serializedEvent = JsonConvert.SerializeObject(@event.DataModel);

ITextMessage message = _session.CreateTextMessage(serializedEvent);

_producer.Send(message);
}

/// <inheritdoc />
public Task PublishAsync<TEventModel>(IEvent<TEventModel> @event) where TEventModel : IEventDataModel
{
return Task.Run(() => Publish(@event));
}

/// <inheritdoc />
public bool HasSubscriptionForEvent<TEvent, TEventDataModel>() where TEvent : IEvent<TEventDataModel> where TEventDataModel : IEventDataModel
{
return _subscriptionManager.HasSubscriptionsForEvent<TEvent>();
}

/// <inheritdoc />
public void Subscribe<TEvent, TEventDataModel>(IEventReceiver eventReceiver) where TEvent : IEvent<TEventDataModel>, new() where TEventDataModel : IEventDataModel
{
string eventName = typeof(TEvent).Name;

_subscriptionManager.AddSubscription<TEvent>(eventReceiver);

var consumer = _session.CreateConsumer(_destination);

consumer.Listener += message =>
{
if (message is ITextMessage textMessage)
{
string serializedEvent = textMessage.Text;

TEventDataModel eventDataModel = JsonConvert.DeserializeObject<TEventDataModel>(serializedEvent);

TEvent @event = new TEvent
{
DataModel = eventDataModel
};

eventReceiver.Handle(@event);
}
};

_consumers.Add(eventName, consumer);
}

/// <inheritdoc />
public void UnSubscribe<TEvent, TEventDataModel>(IEventReceiver eventReceiver) where TEvent : IEvent<TEventDataModel> where TEventDataModel : IEventDataModel
{
string eventName = typeof(TEvent).Name;

_subscriptionManager.RemoveSubscription(eventName);

if (_consumers.ContainsKey(eventName))
{
_consumers[eventName].Dispose();
_consumers.Remove(eventName);
}
}

/// <inheritdoc />
public void Dispose()
{
_producer?.Dispose();

foreach (var consumer in _consumers)
{
consumer.Value.Dispose();
}

_destination?.Dispose();
_session?.Dispose();
_connection?.Dispose();
}

private readonly IEventAggregatorSubscriptionManager _subscriptionManager;
private readonly IConnection _connection;
private readonly ISession _session;
private readonly IDestination _destination;
private readonly IMessageProducer _producer;
private readonly Dictionary<string, IMessageConsumer> _consumers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public bool HasSubscriptionForEvent<TEvent, TEventDataModel>() where TEvent : IE
}

/// <inheritdoc />
public void Subscribe<TEvent, TEventDataModel>(IEventReceiver eventReceiver) where TEvent : IEvent<TEventDataModel> where TEventDataModel : IEventDataModel
public void Subscribe<TEvent, TEventDataModel>(IEventReceiver eventReceiver) where TEvent : IEvent<TEventDataModel>, new() where TEventDataModel : IEventDataModel
{
string eventName = _subscriptionsManager.GetEventKey<TEvent>();

Expand Down

0 comments on commit 6da1e0c

Please sign in to comment.