Skip to content

Commit

Permalink
RabbitMQ API updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jas88 committed Dec 4, 2024
1 parent a1be780 commit aa37099
Show file tree
Hide file tree
Showing 26 changed files with 142 additions and 173 deletions.
2 changes: 1 addition & 1 deletion src/SmiServices/Common/IMessageBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface IMessageBroker

IProducerModel SetupProducer(ProducerOptions producerOptions, bool isBatch);

IModel GetModel(string connectionName);
IChannel GetModel(string connectionName);

void Shutdown(TimeSpan timeout);
public void Wait();
Expand Down
2 changes: 1 addition & 1 deletion src/SmiServices/Common/Messages/IMessageHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public interface IMessageHeader
/// </summary>
Guid[] Parents { get; }

void Populate(IDictionary<string, object> props);
void Populate(IDictionary<string, object?> props);
void Log(ILogger logger, LogLevel level, string message, Exception? ex = null);

bool IsDescendantOf(IMessageHeader other);
Expand Down
2 changes: 1 addition & 1 deletion src/SmiServices/Common/Messages/MessageHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static MessageHeader FromDict(IDictionary<string, object> encodedHeaders,
/// Populates RabbitMQ header properties with the current MessageHeader
/// </summary>
/// <param name="headers"></param>
public void Populate(IDictionary<string, object> headers)
public void Populate(IDictionary<string, object?> headers)
{
headers.Add("MessageGuid", MessageGuid.ToString());
headers.Add("ProducerProcessID", ProducerProcessID);
Expand Down
8 changes: 5 additions & 3 deletions src/SmiServices/Common/Messaging/BatchProducerModel.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RabbitMQ.Client;
using SmiServices.Common.Messages;
using System;
using System.Threading.Tasks;

namespace SmiServices.Common.Messaging
{
Expand All @@ -13,7 +14,7 @@ public class BatchProducerModel : ProducerModel
{
public BatchProducerModel(
string exchangeName,
IModel model,
IChannel model,
IBasicProperties properties,
int maxPublishAttempts = 1,
IBackoffProvider? backoffProvider = null,
Expand All @@ -26,14 +27,15 @@ public BatchProducerModel(


/// <summary>
/// Sends a message but does not wait for the server to confirm the publish. Manually call ProducerModel.WaitForConfirms()
/// Sends a message but does not wait for the server to confirm the publish. Manually await the Task
/// to check all previously unacknowledged messages have been sent.
/// </summary>
/// <param name="message"></param>
/// <param name="inResponseTo"></param>
/// <param name="routingKey"></param>
/// <returns></returns>
public override IMessageHeader SendMessage(IMessage message, IMessageHeader? inResponseTo = null, string? routingKey = null)
public override Task<IMessageHeader> SendMessage(IMessage message, IMessageHeader? inResponseTo = null,
string? routingKey = null)
{
return SendMessageImpl(message, inResponseTo, routingKey);
}
Expand Down
4 changes: 2 additions & 2 deletions src/SmiServices/Common/Messaging/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class Consumer<TMessage> : IConsumer where TMessage : IMessage
private readonly object _oConsumeLock = new();
private bool _exiting;

protected IModel? Model;
protected IChannel? Model;

public virtual void Shutdown()
{
Expand All @@ -72,7 +72,7 @@ protected Consumer()
}


public void SetModel(IModel model)
public void SetModel(IChannel model)
{
if (model.IsClosed)
throw new ArgumentException("Model is closed");
Expand Down
52 changes: 25 additions & 27 deletions src/SmiServices/Common/Messaging/ControlMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

namespace SmiServices.Common.Messaging
Expand All @@ -28,7 +29,7 @@ public class ControlMessageConsumer : Consumer<IMessage>
private readonly string _processName;
private readonly string _processId;

private readonly IConnectionFactory _factory;
private readonly IChannel _channel;

private const string ControlQueueBindingKey = "smi.control.all.*";

Expand All @@ -41,24 +42,28 @@ public ControlMessageConsumer(
Action<string> stopEvent)
{
ArgumentNullException.ThrowIfNull(processName);
ArgumentNullException.ThrowIfNull(controlExchangeName);
ArgumentNullException.ThrowIfNull(stopEvent);

if (rabbitOptions.RabbitMqVirtualHost is null || rabbitOptions.RabbitMqUserName is null || rabbitOptions.RabbitMqPassword is null)
throw new InvalidOperationException("RabbitOptions must have all fields set");

_processName = processName.ToLower();
_processId = processId.ToString();

ControlConsumerOptions.QueueName = $"Control.{_processName}{_processId}";

_factory = new ConnectionFactory()
_channel = new ConnectionFactory
{
HostName = rabbitOptions.RabbitMqHostName,
VirtualHost = rabbitOptions.RabbitMqVirtualHost,
Port = rabbitOptions.RabbitMqHostPort,
UserName = rabbitOptions.RabbitMqUserName,
Password = rabbitOptions.RabbitMqPassword
};
}.CreateConnectionAsync(CancellationToken.None).Result.CreateChannelAsync(null, CancellationToken.None).Result;

ArgumentNullException.ThrowIfNull(controlExchangeName);
SetupControlQueueForHost(controlExchangeName);
SetupControlQueueForHost(controlExchangeName).Wait();

ArgumentNullException.ThrowIfNull(stopEvent);
StopHost += () => stopEvent("Control message stop");
}

Expand Down Expand Up @@ -90,7 +95,7 @@ public override void ProcessMessage(BasicDeliverEventArgs e)
string action = split[^1];

// If action contains a numeric and it's not our PID, then ignore
if (action.Any(char.IsDigit) && !action.EndsWith(_processId))
if (action.Any(char.IsDigit) && !action.EndsWith(_processId, StringComparison.Ordinal))
return;

// Ignore any messages not meant for us
Expand All @@ -102,22 +107,15 @@ public override void ProcessMessage(BasicDeliverEventArgs e)

// Handle any general actions - just stop and ping for now

if (action.StartsWith("stop"))
if (action.StartsWith("stop", StringComparison.Ordinal))
{
if (StopHost == null)
{
// This should never really happen
Logger.Info("Received stop command but no stop event registered");
return;
}

Logger.Info("Stop request received, raising StopHost event");
Task.Run(() => StopHost.Invoke());

return;
}

if (action.StartsWith("ping"))
if (action.StartsWith("ping", StringComparison.Ordinal))
{
Logger.Info("Pong!");
return;
Expand Down Expand Up @@ -150,10 +148,9 @@ public override void ProcessMessage(BasicDeliverEventArgs e)
/// </summary>
public override void Shutdown()
{
using IConnection connection = _factory.CreateConnection(_processName + _processId + "-ControlQueueShutdown");
using IModel model = connection.CreateModel();
Logger.Debug($"Deleting control queue: {ControlConsumerOptions.QueueName}");
model.QueueDelete(ControlConsumerOptions.QueueName);
if (ControlConsumerOptions.QueueName != null)
_channel.QueueDeleteAsync(ControlConsumerOptions.QueueName).Wait();
}

// NOTE(rkm 2020-05-12) Not used in this implementation
Expand All @@ -167,13 +164,11 @@ public override void Shutdown()
/// The connection is disposed and StartConsumer(...) can then be called on the parent MessageBroker with ControlConsumerOptions
/// </summary>
/// <param name="controlExchangeName"></param>
private void SetupControlQueueForHost(string controlExchangeName)
private async Task SetupControlQueueForHost(string controlExchangeName)
{
using IConnection connection = _factory.CreateConnection($"{_processName}{_processId}-ControlQueueSetup");
using IModel model = connection.CreateModel();
try
{
model.ExchangeDeclarePassive(controlExchangeName);
await _channel.ExchangeDeclarePassiveAsync(controlExchangeName, CancellationToken.None);
}
catch (OperationInterruptedException e)
{
Expand All @@ -186,17 +181,20 @@ private void SetupControlQueueForHost(string controlExchangeName)
// durable = false (queue will not persist over restarts of the RabbitMq server)
// exclusive = false (queue won't be deleted when THIS connection closes)
// autoDelete = true (queue will be deleted after a consumer connects and then disconnects)
model.QueueDeclare(ControlConsumerOptions.QueueName, durable: false, exclusive: false, autoDelete: true);
await _channel.QueueDeclareAsync(
ControlConsumerOptions.QueueName ??
throw new InvalidOperationException(nameof(ControlConsumerOptions.QueueName)), durable: false,
exclusive: false, autoDelete: true, cancellationToken: CancellationToken.None);

// Binding for any control requests, i.e. "stop"
Logger.Debug($"Creating binding {controlExchangeName}->{ControlConsumerOptions.QueueName} with key {ControlQueueBindingKey}");
model.QueueBind(ControlConsumerOptions.QueueName, controlExchangeName, ControlQueueBindingKey);
await _channel.QueueBindAsync(ControlConsumerOptions.QueueName, controlExchangeName, ControlQueueBindingKey);

// Specific microservice binding key, ignoring the id at the end of the process name
string bindingKey = $"smi.control.{_processName}.*";
var bindingKey = $"smi.control.{_processName}.*";

Logger.Debug($"Creating binding {controlExchangeName}->{ControlConsumerOptions.QueueName} with key {bindingKey}");
model.QueueBind(ControlConsumerOptions.QueueName, controlExchangeName, bindingKey);
await _channel.QueueBindAsync(ControlConsumerOptions.QueueName, controlExchangeName, bindingKey);
}

private static string? GetBodyFromArgs(BasicDeliverEventArgs e)
Expand Down
8 changes: 4 additions & 4 deletions src/SmiServices/Common/Messaging/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace SmiServices.Common.Messaging
public interface IConsumer
{
/// <summary>
/// Set the <see cref="IModel"/> which messages will be processed with
/// Set the <see cref="IChannel"/> which messages will be processed with
/// </summary>
/// <param name="model"></param>
void SetModel(IModel model);
/// <param name="model"></param>
void SetModel(IChannel model);

/// <summary>
/// Process a message received by the adapter.
Expand All @@ -38,7 +38,7 @@ public interface IConsumer
bool HoldUnprocessableMessages { get; set; }

/// <summary>
/// The BasicQos value configured on the <see cref="IModel"/>
/// The BasicQos value configured on the <see cref="IChannel"/>
/// </summary>
int QoSPrefetchCount { get; set; }
}
Expand Down
3 changes: 2 additions & 1 deletion src/SmiServices/Common/Messaging/IProducerModel.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Threading.Tasks;
using SmiServices.Common.Events;
using SmiServices.Common.Messages;

Expand All @@ -15,7 +16,7 @@ public interface IProducerModel
/// <param name="message">Message object to serialise and send.</param>
/// <param name="isInResponseTo">If you are responding to a message, pass that messages header in here (otherwise pass null)</param>
/// <param name="routingKey">Routing key for the exchange to direct the message.</param>
IMessageHeader SendMessage(IMessage message, IMessageHeader? isInResponseTo, string? routingKey);
Task<IMessageHeader> SendMessage(IMessage message, IMessageHeader? isInResponseTo, string? routingKey);

/// <summary>
/// Waits until all sent messages are confirmed by RabbitMQ
Expand Down
Loading

0 comments on commit aa37099

Please sign in to comment.