From 6da1e0c5f07a6f2f7c052a13776643525a1e7f57 Mon Sep 17 00:00:00 2001 From: Ebrahim Date: Mon, 23 Oct 2023 08:33:04 +0330 Subject: [PATCH] Add JmsEventAggregator --- Source/BSN.Commons/BSN.Commons.csproj | 2 + .../EventAggregator/Contracts/IEvent.cs | 4 +- .../EventAggregator/Contracts/ISubscriber.cs | 2 +- .../MessageBroker/Jms/JmsConnectionOptions.cs | 18 +++ .../MessageBroker/Jms/JmsEventAggregator.cs | 127 ++++++++++++++++++ .../RabbitMqEventAggregator.cs | 2 +- 6 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 Source/BSN.Commons/Infrastructure/MessageBroker/Jms/JmsConnectionOptions.cs create mode 100644 Source/BSN.Commons/Infrastructure/MessageBroker/Jms/JmsEventAggregator.cs diff --git a/Source/BSN.Commons/BSN.Commons.csproj b/Source/BSN.Commons/BSN.Commons.csproj index 6a871b5..c3c1afc 100644 --- a/Source/BSN.Commons/BSN.Commons.csproj +++ b/Source/BSN.Commons/BSN.Commons.csproj @@ -51,6 +51,8 @@ + + diff --git a/Source/BSN.Commons/Infrastructure/MessageBroker/EventContracts/EventAggregator/Contracts/IEvent.cs b/Source/BSN.Commons/Infrastructure/MessageBroker/EventContracts/EventAggregator/Contracts/IEvent.cs index 7a66be9..de1e553 100644 --- a/Source/BSN.Commons/Infrastructure/MessageBroker/EventContracts/EventAggregator/Contracts/IEvent.cs +++ b/Source/BSN.Commons/Infrastructure/MessageBroker/EventContracts/EventAggregator/Contracts/IEvent.cs @@ -18,11 +18,11 @@ namespace BSN.Commons.Infrastructure.MessageBroker.EventContracts.EventAggregato public interface IEvent : IEvent where T : IEventDataModel { /// - /// Gets the data model associated with the event. The data model serves as a container for structured + /// Gets or sets the data model associated with the event. The data model serves as a container for structured /// information, allowing events to convey specific details or context. By including data models in events, /// components can make use of this information for various purposes, such as processing, logging, or decision-making. /// - T DataModel { get; } + T DataModel { get; set; } } /// diff --git a/Source/BSN.Commons/Infrastructure/MessageBroker/EventContracts/EventAggregator/Contracts/ISubscriber.cs b/Source/BSN.Commons/Infrastructure/MessageBroker/EventContracts/EventAggregator/Contracts/ISubscriber.cs index 368ca06..422fb57 100644 --- a/Source/BSN.Commons/Infrastructure/MessageBroker/EventContracts/EventAggregator/Contracts/ISubscriber.cs +++ b/Source/BSN.Commons/Infrastructure/MessageBroker/EventContracts/EventAggregator/Contracts/ISubscriber.cs @@ -22,7 +22,7 @@ public interface ISubscriber /// The type of event to subscribe to. /// The type of data model associated with the event. /// The event receiver to subscribe. - void Subscribe(IEventReceiver eventReceiver) where TEvent : IEvent where TEventDataModel : IEventDataModel; + void Subscribe(IEventReceiver eventReceiver) where TEvent : IEvent, new() where TEventDataModel : IEventDataModel; /// /// Unsubscribes an event receiver from handling events of a specific type with associated data model. diff --git a/Source/BSN.Commons/Infrastructure/MessageBroker/Jms/JmsConnectionOptions.cs b/Source/BSN.Commons/Infrastructure/MessageBroker/Jms/JmsConnectionOptions.cs new file mode 100644 index 0000000..d57665d --- /dev/null +++ b/Source/BSN.Commons/Infrastructure/MessageBroker/Jms/JmsConnectionOptions.cs @@ -0,0 +1,18 @@ +namespace BSN.Commons.Infrastructure.MessageBroker.Jms +{ + /// + /// Represents a class containing options for configuring a JMS connection. + /// + public class JmsConnectionOptions + { + /// + /// For example: tcp://localhost:61616 + /// + public string BrokerUri { get; set; } + + /// + /// For example: testQueue + /// + public string QueueName { get; set; } + } +} \ No newline at end of file diff --git a/Source/BSN.Commons/Infrastructure/MessageBroker/Jms/JmsEventAggregator.cs b/Source/BSN.Commons/Infrastructure/MessageBroker/Jms/JmsEventAggregator.cs new file mode 100644 index 0000000..c3c22e8 --- /dev/null +++ b/Source/BSN.Commons/Infrastructure/MessageBroker/Jms/JmsEventAggregator.cs @@ -0,0 +1,127 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using BSN.Commons.Infrastructure.MessageBroker.EventAggregator; +using BSN.Commons.Infrastructure.MessageBroker.EventContracts.EventAggregator; +using BSN.Commons.Infrastructure.MessageBroker.EventContracts.EventAggregator.Contracts; +using BSN.Commons.Infrastructure.MessageBroker.EventContracts.EventAggregator.EventModels; +using Apache.NMS; +using Apache.NMS.ActiveMQ; +using Apache.NMS.Util; +using Newtonsoft.Json; + +namespace BSN.Commons.Infrastructure.MessageBroker.Jms +{ + /// + /// Represents an implementation of . + /// using JMS as the underlying message broker. + /// + public class JmsEventAggregator : IEventAggregator + { + /// + /// Initializes a new instance of the class. + /// + public JmsEventAggregator( + JmsConnectionOptions jmsConnectionOptions, + IEventAggregatorSubscriptionManager eventAggregatorSubscriptionManager + ) + { + _subscriptionManager = eventAggregatorSubscriptionManager ?? new InMemoryEventAggregatorSubscriptionManager(); + + IConnectionFactory factory = new ConnectionFactory(jmsConnectionOptions.BrokerUri); + _connection = factory.CreateConnection(); + _connection.Start(); + _session = _connection.CreateSession(); + _destination = SessionUtil.GetDestination(_session, jmsConnectionOptions.QueueName); + + _producer = _session.CreateProducer(_destination); + _consumers = new Dictionary(); + } + + /// + public void Publish(IEvent @event) where TEventModel : IEventDataModel + { + string serializedEvent = JsonConvert.SerializeObject(@event.DataModel); + + ITextMessage message = _session.CreateTextMessage(serializedEvent); + + _producer.Send(message); + } + + /// + public Task PublishAsync(IEvent @event) where TEventModel : IEventDataModel + { + return Task.Run(() => Publish(@event)); + } + + /// + public bool HasSubscriptionForEvent() where TEvent : IEvent where TEventDataModel : IEventDataModel + { + return _subscriptionManager.HasSubscriptionsForEvent(); + } + + /// + public void Subscribe(IEventReceiver eventReceiver) where TEvent : IEvent, new() where TEventDataModel : IEventDataModel + { + string eventName = typeof(TEvent).Name; + + _subscriptionManager.AddSubscription(eventReceiver); + + var consumer = _session.CreateConsumer(_destination); + + consumer.Listener += message => + { + if (message is ITextMessage textMessage) + { + string serializedEvent = textMessage.Text; + + TEventDataModel eventDataModel = JsonConvert.DeserializeObject(serializedEvent); + + TEvent @event = new TEvent + { + DataModel = eventDataModel + }; + + eventReceiver.Handle(@event); + } + }; + + _consumers.Add(eventName, consumer); + } + + /// + public void UnSubscribe(IEventReceiver eventReceiver) where TEvent : IEvent where TEventDataModel : IEventDataModel + { + string eventName = typeof(TEvent).Name; + + _subscriptionManager.RemoveSubscription(eventName); + + if (_consumers.ContainsKey(eventName)) + { + _consumers[eventName].Dispose(); + _consumers.Remove(eventName); + } + } + + /// + public void Dispose() + { + _producer?.Dispose(); + + foreach (var consumer in _consumers) + { + consumer.Value.Dispose(); + } + + _destination?.Dispose(); + _session?.Dispose(); + _connection?.Dispose(); + } + + private readonly IEventAggregatorSubscriptionManager _subscriptionManager; + private readonly IConnection _connection; + private readonly ISession _session; + private readonly IDestination _destination; + private readonly IMessageProducer _producer; + private readonly Dictionary _consumers; + } +} \ No newline at end of file diff --git a/Source/BSN.Commons/Infrastructure/MessageBroker/RabbitMqEventAggregator/RabbitMqEventAggregator.cs b/Source/BSN.Commons/Infrastructure/MessageBroker/RabbitMqEventAggregator/RabbitMqEventAggregator.cs index c9470b8..da641c0 100644 --- a/Source/BSN.Commons/Infrastructure/MessageBroker/RabbitMqEventAggregator/RabbitMqEventAggregator.cs +++ b/Source/BSN.Commons/Infrastructure/MessageBroker/RabbitMqEventAggregator/RabbitMqEventAggregator.cs @@ -50,7 +50,7 @@ public bool HasSubscriptionForEvent() where TEvent : IE } /// - public void Subscribe(IEventReceiver eventReceiver) where TEvent : IEvent where TEventDataModel : IEventDataModel + public void Subscribe(IEventReceiver eventReceiver) where TEvent : IEvent, new() where TEventDataModel : IEventDataModel { string eventName = _subscriptionsManager.GetEventKey();