From aa370994485036e228783390c3361a69e0546eb8 Mon Sep 17 00:00:00 2001 From: James A Sutherland Date: Wed, 4 Dec 2024 14:45:05 -0600 Subject: [PATCH] RabbitMQ API updates --- src/SmiServices/Common/IMessageBroker.cs | 2 +- .../Common/Messages/IMessageHeader.cs | 2 +- .../Common/Messages/MessageHeader.cs | 2 +- .../Common/Messaging/BatchProducerModel.cs | 8 +- src/SmiServices/Common/Messaging/Consumer.cs | 4 +- .../Messaging/ControlMessageConsumer.cs | 52 ++++++------ src/SmiServices/Common/Messaging/IConsumer.cs | 8 +- .../Common/Messaging/IProducerModel.cs | 3 +- .../Common/Messaging/ProducerModel.cs | 85 +++++++------------ .../Common/Messaging/RabbitMQBroker.cs | 12 +-- .../Processing/IMessageProcessor.cs | 2 +- .../Processing/MessageProcessor.cs | 2 +- .../RequiresExternalService.cs | 35 +++++--- .../RequiresMongoDb.cs | 22 ++--- .../RequiresRabbit.cs | 24 ++---- .../RequiresRelationalDb.cs | 14 +-- .../Common/Messaging/ProducerModelTests.cs | 8 +- .../Common/MicroserviceTester.cs | 4 +- .../ExtractionRequestQueueConsumerTest.cs | 2 +- .../AnonFailedMessageConsumerTests.cs | 4 +- .../AnonVerificationMessageConsumerTests.cs | 2 +- ...tractFileCollectionMessageConsumerTests.cs | 4 +- ...tractionRequestInfoMessageConsumerTests.cs | 4 +- .../DicomAnonymiserConsumerTests.cs | 4 +- .../DicomTagReaderTestHelper.cs | 2 +- .../FileCopier/FileCopyQueueConsumerTest.cs | 4 +- 26 files changed, 142 insertions(+), 173 deletions(-) diff --git a/src/SmiServices/Common/IMessageBroker.cs b/src/SmiServices/Common/IMessageBroker.cs index a2f4318b5..2d1794026 100644 --- a/src/SmiServices/Common/IMessageBroker.cs +++ b/src/SmiServices/Common/IMessageBroker.cs @@ -15,7 +15,7 @@ public interface IMessageBroker IProducerModel SetupProducer(ProducerOptions producerOptions, bool isBatch); - IModel GetModel(string connectionName); + IChannel GetModel(string connectionName); void Shutdown(TimeSpan timeout); public void Wait(); diff --git a/src/SmiServices/Common/Messages/IMessageHeader.cs b/src/SmiServices/Common/Messages/IMessageHeader.cs index 5c2b0df85..0eaf40cc2 100644 --- a/src/SmiServices/Common/Messages/IMessageHeader.cs +++ b/src/SmiServices/Common/Messages/IMessageHeader.cs @@ -19,7 +19,7 @@ public interface IMessageHeader /// Guid[] Parents { get; } - void Populate(IDictionary props); + void Populate(IDictionary props); void Log(ILogger logger, LogLevel level, string message, Exception? ex = null); bool IsDescendantOf(IMessageHeader other); diff --git a/src/SmiServices/Common/Messages/MessageHeader.cs b/src/SmiServices/Common/Messages/MessageHeader.cs index b5c74b52c..b0fb13dae 100644 --- a/src/SmiServices/Common/Messages/MessageHeader.cs +++ b/src/SmiServices/Common/Messages/MessageHeader.cs @@ -88,7 +88,7 @@ public static MessageHeader FromDict(IDictionary encodedHeaders, /// Populates RabbitMQ header properties with the current MessageHeader /// /// - public void Populate(IDictionary headers) + public void Populate(IDictionary headers) { headers.Add("MessageGuid", MessageGuid.ToString()); headers.Add("ProducerProcessID", ProducerProcessID); diff --git a/src/SmiServices/Common/Messaging/BatchProducerModel.cs b/src/SmiServices/Common/Messaging/BatchProducerModel.cs index 206b7457c..d38c78fa6 100644 --- a/src/SmiServices/Common/Messaging/BatchProducerModel.cs +++ b/src/SmiServices/Common/Messaging/BatchProducerModel.cs @@ -1,6 +1,7 @@ using RabbitMQ.Client; using SmiServices.Common.Messages; using System; +using System.Threading.Tasks; namespace SmiServices.Common.Messaging { @@ -13,7 +14,7 @@ public class BatchProducerModel : ProducerModel { public BatchProducerModel( string exchangeName, - IModel model, + IChannel model, IBasicProperties properties, int maxPublishAttempts = 1, IBackoffProvider? backoffProvider = null, @@ -26,14 +27,15 @@ public BatchProducerModel( /// - /// Sends a message but does not wait for the server to confirm the publish. Manually call ProducerModel.WaitForConfirms() + /// Sends a message but does not wait for the server to confirm the publish. Manually await the Task /// to check all previously unacknowledged messages have been sent. /// /// /// /// /// - public override IMessageHeader SendMessage(IMessage message, IMessageHeader? inResponseTo = null, string? routingKey = null) + public override Task SendMessage(IMessage message, IMessageHeader? inResponseTo = null, + string? routingKey = null) { return SendMessageImpl(message, inResponseTo, routingKey); } diff --git a/src/SmiServices/Common/Messaging/Consumer.cs b/src/SmiServices/Common/Messaging/Consumer.cs index 97a3bd2f5..039c69e22 100644 --- a/src/SmiServices/Common/Messaging/Consumer.cs +++ b/src/SmiServices/Common/Messaging/Consumer.cs @@ -45,7 +45,7 @@ public abstract class Consumer : IConsumer where TMessage : IMessage private readonly object _oConsumeLock = new(); private bool _exiting; - protected IModel? Model; + protected IChannel? Model; public virtual void Shutdown() { @@ -72,7 +72,7 @@ protected Consumer() } - public void SetModel(IModel model) + public void SetModel(IChannel model) { if (model.IsClosed) throw new ArgumentException("Model is closed"); diff --git a/src/SmiServices/Common/Messaging/ControlMessageConsumer.cs b/src/SmiServices/Common/Messaging/ControlMessageConsumer.cs index 176d24f10..60154ce4e 100644 --- a/src/SmiServices/Common/Messaging/ControlMessageConsumer.cs +++ b/src/SmiServices/Common/Messaging/ControlMessageConsumer.cs @@ -9,6 +9,7 @@ using System.Linq; using System.Text; using System.Text.RegularExpressions; +using System.Threading; using System.Threading.Tasks; namespace SmiServices.Common.Messaging @@ -28,7 +29,7 @@ public class ControlMessageConsumer : Consumer private readonly string _processName; private readonly string _processId; - private readonly IConnectionFactory _factory; + private readonly IChannel _channel; private const string ControlQueueBindingKey = "smi.control.all.*"; @@ -41,24 +42,28 @@ public ControlMessageConsumer( Action stopEvent) { ArgumentNullException.ThrowIfNull(processName); + ArgumentNullException.ThrowIfNull(controlExchangeName); + ArgumentNullException.ThrowIfNull(stopEvent); + + if (rabbitOptions.RabbitMqVirtualHost is null || rabbitOptions.RabbitMqUserName is null || rabbitOptions.RabbitMqPassword is null) + throw new InvalidOperationException("RabbitOptions must have all fields set"); + _processName = processName.ToLower(); _processId = processId.ToString(); ControlConsumerOptions.QueueName = $"Control.{_processName}{_processId}"; - _factory = new ConnectionFactory() + _channel = new ConnectionFactory { HostName = rabbitOptions.RabbitMqHostName, VirtualHost = rabbitOptions.RabbitMqVirtualHost, Port = rabbitOptions.RabbitMqHostPort, UserName = rabbitOptions.RabbitMqUserName, Password = rabbitOptions.RabbitMqPassword - }; + }.CreateConnectionAsync(CancellationToken.None).Result.CreateChannelAsync(null, CancellationToken.None).Result; - ArgumentNullException.ThrowIfNull(controlExchangeName); - SetupControlQueueForHost(controlExchangeName); + SetupControlQueueForHost(controlExchangeName).Wait(); - ArgumentNullException.ThrowIfNull(stopEvent); StopHost += () => stopEvent("Control message stop"); } @@ -90,7 +95,7 @@ public override void ProcessMessage(BasicDeliverEventArgs e) string action = split[^1]; // If action contains a numeric and it's not our PID, then ignore - if (action.Any(char.IsDigit) && !action.EndsWith(_processId)) + if (action.Any(char.IsDigit) && !action.EndsWith(_processId, StringComparison.Ordinal)) return; // Ignore any messages not meant for us @@ -102,22 +107,15 @@ public override void ProcessMessage(BasicDeliverEventArgs e) // Handle any general actions - just stop and ping for now - if (action.StartsWith("stop")) + if (action.StartsWith("stop", StringComparison.Ordinal)) { - if (StopHost == null) - { - // This should never really happen - Logger.Info("Received stop command but no stop event registered"); - return; - } - Logger.Info("Stop request received, raising StopHost event"); Task.Run(() => StopHost.Invoke()); return; } - if (action.StartsWith("ping")) + if (action.StartsWith("ping", StringComparison.Ordinal)) { Logger.Info("Pong!"); return; @@ -150,10 +148,9 @@ public override void ProcessMessage(BasicDeliverEventArgs e) /// public override void Shutdown() { - using IConnection connection = _factory.CreateConnection(_processName + _processId + "-ControlQueueShutdown"); - using IModel model = connection.CreateModel(); Logger.Debug($"Deleting control queue: {ControlConsumerOptions.QueueName}"); - model.QueueDelete(ControlConsumerOptions.QueueName); + if (ControlConsumerOptions.QueueName != null) + _channel.QueueDeleteAsync(ControlConsumerOptions.QueueName).Wait(); } // NOTE(rkm 2020-05-12) Not used in this implementation @@ -167,13 +164,11 @@ public override void Shutdown() /// The connection is disposed and StartConsumer(...) can then be called on the parent MessageBroker with ControlConsumerOptions /// /// - private void SetupControlQueueForHost(string controlExchangeName) + private async Task SetupControlQueueForHost(string controlExchangeName) { - using IConnection connection = _factory.CreateConnection($"{_processName}{_processId}-ControlQueueSetup"); - using IModel model = connection.CreateModel(); try { - model.ExchangeDeclarePassive(controlExchangeName); + await _channel.ExchangeDeclarePassiveAsync(controlExchangeName, CancellationToken.None); } catch (OperationInterruptedException e) { @@ -186,17 +181,20 @@ private void SetupControlQueueForHost(string controlExchangeName) // durable = false (queue will not persist over restarts of the RabbitMq server) // exclusive = false (queue won't be deleted when THIS connection closes) // autoDelete = true (queue will be deleted after a consumer connects and then disconnects) - model.QueueDeclare(ControlConsumerOptions.QueueName, durable: false, exclusive: false, autoDelete: true); + await _channel.QueueDeclareAsync( + ControlConsumerOptions.QueueName ?? + throw new InvalidOperationException(nameof(ControlConsumerOptions.QueueName)), durable: false, + exclusive: false, autoDelete: true, cancellationToken: CancellationToken.None); // Binding for any control requests, i.e. "stop" Logger.Debug($"Creating binding {controlExchangeName}->{ControlConsumerOptions.QueueName} with key {ControlQueueBindingKey}"); - model.QueueBind(ControlConsumerOptions.QueueName, controlExchangeName, ControlQueueBindingKey); + await _channel.QueueBindAsync(ControlConsumerOptions.QueueName, controlExchangeName, ControlQueueBindingKey); // Specific microservice binding key, ignoring the id at the end of the process name - string bindingKey = $"smi.control.{_processName}.*"; + var bindingKey = $"smi.control.{_processName}.*"; Logger.Debug($"Creating binding {controlExchangeName}->{ControlConsumerOptions.QueueName} with key {bindingKey}"); - model.QueueBind(ControlConsumerOptions.QueueName, controlExchangeName, bindingKey); + await _channel.QueueBindAsync(ControlConsumerOptions.QueueName, controlExchangeName, bindingKey); } private static string? GetBodyFromArgs(BasicDeliverEventArgs e) diff --git a/src/SmiServices/Common/Messaging/IConsumer.cs b/src/SmiServices/Common/Messaging/IConsumer.cs index 42959bb17..63ffcb415 100644 --- a/src/SmiServices/Common/Messaging/IConsumer.cs +++ b/src/SmiServices/Common/Messaging/IConsumer.cs @@ -11,10 +11,10 @@ namespace SmiServices.Common.Messaging public interface IConsumer { /// - /// Set the which messages will be processed with + /// Set the which messages will be processed with /// - /// - void SetModel(IModel model); + /// + void SetModel(IChannel model); /// /// Process a message received by the adapter. @@ -38,7 +38,7 @@ public interface IConsumer bool HoldUnprocessableMessages { get; set; } /// - /// The BasicQos value configured on the + /// The BasicQos value configured on the /// int QoSPrefetchCount { get; set; } } diff --git a/src/SmiServices/Common/Messaging/IProducerModel.cs b/src/SmiServices/Common/Messaging/IProducerModel.cs index bb0ced60c..4e07236ed 100644 --- a/src/SmiServices/Common/Messaging/IProducerModel.cs +++ b/src/SmiServices/Common/Messaging/IProducerModel.cs @@ -1,3 +1,4 @@ +using System.Threading.Tasks; using SmiServices.Common.Events; using SmiServices.Common.Messages; @@ -15,7 +16,7 @@ public interface IProducerModel /// Message object to serialise and send. /// If you are responding to a message, pass that messages header in here (otherwise pass null) /// Routing key for the exchange to direct the message. - IMessageHeader SendMessage(IMessage message, IMessageHeader? isInResponseTo, string? routingKey); + Task SendMessage(IMessage message, IMessageHeader? isInResponseTo, string? routingKey); /// /// Waits until all sent messages are confirmed by RabbitMQ diff --git a/src/SmiServices/Common/Messaging/ProducerModel.cs b/src/SmiServices/Common/Messaging/ProducerModel.cs index 7ff02710e..67087f243 100644 --- a/src/SmiServices/Common/Messaging/ProducerModel.cs +++ b/src/SmiServices/Common/Messaging/ProducerModel.cs @@ -1,4 +1,3 @@ -using Newtonsoft.Json; using NLog; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -6,8 +5,8 @@ using SmiServices.Common.Messages; using System; using System.Collections.Generic; -using System.Text; using System.Threading; +using System.Threading.Tasks; namespace SmiServices.Common.Messaging { @@ -20,8 +19,8 @@ public class ProducerModel : IProducerModel private readonly ILogger _logger; - private readonly IModel _model; - private readonly IBasicProperties _messageBasicProperties; + private readonly IChannel _model; + private readonly BasicProperties _messageBasicProperties; private readonly string _exchangeName; @@ -55,8 +54,8 @@ public class ProducerModel : IProducerModel /// /// public ProducerModel( - string exchangeName, IModel model, - IBasicProperties properties, + string exchangeName, IChannel model, + BasicProperties properties, int maxRetryAttempts = 1, IBackoffProvider? backoffProvider = null, string? probeQueueName = null, @@ -82,11 +81,19 @@ public ProducerModel( //TODO Understand this a bit better and investigate whether this also happens on consumer processes // Handle messages 'returned' by RabbitMQ - occurs when a messages published as persistent can't be routed to a queue - _model.BasicReturn += (s, a) => _logger.Warn("BasicReturn for Exchange '{0}' Routing Key '{1}' ReplyCode '{2}' ({3})", a.Exchange, a.RoutingKey, a.ReplyCode, a.ReplyText); - _model.BasicReturn += (s, a) => Fatal(a); + _model.BasicReturnAsync += (s, a) => + { + _logger.Warn("BasicReturn for Exchange '{0}' Routing Key '{1}' ReplyCode '{2}' ({3})", + a.Exchange, a.RoutingKey, a.ReplyCode, a.ReplyText); + Fatal(a); + return Task.CompletedTask; + }; // Handle RabbitMQ putting the queue into flow control mode - _model.FlowControl += (s, a) => _logger.Warn("FlowControl for " + exchangeName); + _model.FlowControlAsync += (s, a) => + { + _logger.Warn($"FlowControl for {exchangeName}"); + }; _backoffProvider = backoffProvider; @@ -97,7 +104,7 @@ public ProducerModel( if (_probeQueueName != null) { - var messageCount = model.MessageCount(_probeQueueName); + var messageCount = model.MessageCountAsync(_probeQueueName).Result; _logger.Debug($"Probe queue has {messageCount} message(s)"); } } @@ -110,49 +117,18 @@ public ProducerModel( /// /// /// - public virtual IMessageHeader SendMessage(IMessage message, IMessageHeader? inResponseTo = null, string? routingKey = null) + public virtual async Task SendMessage(IMessage message, IMessageHeader? inResponseTo = null, + string? routingKey = null) { - IMessageHeader header = SendMessageImpl(message, inResponseTo, routingKey); - WaitForConfirms(); - header.Log(_logger, LogLevel.Trace, "Sent " + header.MessageGuid + " to " + _exchangeName); - + var header = await SendMessageImpl(message, inResponseTo, routingKey); + header.Log(_logger, LogLevel.Trace, $"Sent {header.MessageGuid} to {_exchangeName}"); return header; } + [Obsolete("RabbitMQ handles waiting and retry now", true)] public void WaitForConfirms() { // Attempt to get a publish confirmation from RabbitMQ, with some retry/timeout - - var keepTrying = true; - var numAttempts = 0; - - while (keepTrying) - { - if (_model.WaitForConfirms(TimeSpan.FromMilliseconds(ConfirmTimeoutMs), out var timedOut)) - { - _backoffProvider?.Reset(); - return; - } - - if (timedOut) - { - keepTrying = (++numAttempts < _maxRetryAttempts); - _logger.Warn($"RabbitMQ WaitForConfirms timed out. numAttempts: {numAttempts}"); - - TimeSpan? backoff = _backoffProvider?.GetNextBackoff(); - if (backoff.HasValue) - { - _logger.Warn($"Backing off for {backoff}"); - Thread.Sleep(backoff.Value); - } - - continue; - } - - throw new ApplicationException("RabbitMQ got a Nack"); - } - - throw new ApplicationException("Could not confirm message published after timeout"); } /// @@ -162,33 +138,34 @@ public void WaitForConfirms() /// /// /// - protected IMessageHeader SendMessageImpl(IMessage message, IMessageHeader? inResponseTo = null, string? routingKey = null) + protected async Task SendMessageImpl(IMessage message, IMessageHeader? inResponseTo = null, string? routingKey = null) { lock (_oSendLock) { - byte[] body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); + var body = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(message); _messageBasicProperties.Timestamp = new AmqpTimestamp(MessageHeader.UnixTimeNow()); - _messageBasicProperties.Headers = new Dictionary(); + _messageBasicProperties.Headers = new Dictionary(); IMessageHeader header = inResponseTo != null ? new MessageHeader(inResponseTo) : new MessageHeader(); header.Populate(_messageBasicProperties.Headers); if (_probeQueueName != null && _probeMessageCounter >= _probeCounterLimit) { - while (_model.MessageCount(_probeQueueName) >= _probeQueueLimit) + while (_model.MessageCountAsync(_probeQueueName).Result >= _probeQueueLimit) { _logger.Warn($"Probe queue ({_probeQueueName}) over message limit ({_probeCounterLimit}). Sleeping for {_probeTimeout}"); Thread.Sleep(_probeTimeout); } + _probeMessageCounter = 0; } + } - _model.BasicPublish(_exchangeName, routingKey ?? "", true, _messageBasicProperties, body); - ++_probeMessageCounter; + await _model.BasicPublishAsync(_exchangeName, routingKey ?? "", true, _messageBasicProperties, body); + ++_probeMessageCounter; - return header; - } + return header; } private void Fatal(BasicReturnEventArgs a) diff --git a/src/SmiServices/Common/Messaging/RabbitMQBroker.cs b/src/SmiServices/Common/Messaging/RabbitMQBroker.cs index 3fb9ef200..77beb4bc1 100644 --- a/src/SmiServices/Common/Messaging/RabbitMQBroker.cs +++ b/src/SmiServices/Common/Messaging/RabbitMQBroker.cs @@ -273,14 +273,14 @@ public IProducerModel SetupProducer(ProducerOptions producerOptions, bool isBatc /// /// /// - public IModel GetModel(string connectionName) + public IChannel GetModel(string connectionName) { //TODO This method has no callback available for fatal errors if (ShutdownCalled) throw new ApplicationException("Adapter has been shut down"); - var model = _connection.CreateModel(); + var model = _connection.CreateChannelAsync().Result; lock (_oResourceLock) { @@ -344,13 +344,13 @@ private void CheckValidServerSettings() private class RabbitResources : IDisposable { - public IModel Model { get; } + public IChannel Model { get; } protected readonly object OResourceLock = new(); protected readonly ILogger Logger; - public RabbitResources(IModel model) + public RabbitResources(IChannel model) { Logger = LogManager.GetLogger(GetType().Name); Model = model; @@ -367,7 +367,7 @@ private class ProducerResources : RabbitResources { public IProducerModel? ProducerModel { get; set; } - public ProducerResources(IModel model, IProducerModel ipm) : base(model) + public ProducerResources(IChannel model, IProducerModel ipm) : base(model) { ProducerModel = ipm; } @@ -390,7 +390,7 @@ public override void Dispose() Model.Dispose(); } - internal ConsumerResources(EventingBasicConsumer ebc, string q, IModel model) : base(model) + internal ConsumerResources(EventingBasicConsumer ebc, string q, IChannel model) : base(model) { this.ebc = ebc; this.QueueName = q; diff --git a/src/SmiServices/Microservices/MongoDBPopulator/Processing/IMessageProcessor.cs b/src/SmiServices/Microservices/MongoDBPopulator/Processing/IMessageProcessor.cs index 51a5dee8e..5c6f5a256 100644 --- a/src/SmiServices/Microservices/MongoDBPopulator/Processing/IMessageProcessor.cs +++ b/src/SmiServices/Microservices/MongoDBPopulator/Processing/IMessageProcessor.cs @@ -28,7 +28,7 @@ public interface IMessageProcessor /// /// Model to acknowledge messages on /// - IModel? Model { get; set; } + IChannel? Model { get; set; } /// /// Count of the total number of acknowledged messages during this processors lifetime diff --git a/src/SmiServices/Microservices/MongoDBPopulator/Processing/MessageProcessor.cs b/src/SmiServices/Microservices/MongoDBPopulator/Processing/MessageProcessor.cs index 39a521c16..c3f4d6935 100644 --- a/src/SmiServices/Microservices/MongoDBPopulator/Processing/MessageProcessor.cs +++ b/src/SmiServices/Microservices/MongoDBPopulator/Processing/MessageProcessor.cs @@ -38,7 +38,7 @@ public abstract class MessageProcessor : IMessageProcessor where T : IMess /// /// Model to use when sending ACK for messages /// - public IModel? Model { get; set; } + public IChannel? Model { get; set; } /// /// diff --git a/tests/SmiServices.IntegrationTests/RequiresExternalService.cs b/tests/SmiServices.IntegrationTests/RequiresExternalService.cs index 288e6aa6e..7a9fe4bf6 100644 --- a/tests/SmiServices.IntegrationTests/RequiresExternalService.cs +++ b/tests/SmiServices.IntegrationTests/RequiresExternalService.cs @@ -6,32 +6,45 @@ namespace SmiServices.IntegrationTests { - public class RequiresExternalService : CategoryAttribute, IApplyToContext + public abstract class RequiresExternalService : CategoryAttribute, IApplyToContext { - protected readonly bool FailIfUnavailable; - private readonly bool IgnoreIfWinCiSkip; + private static readonly bool _failIfUnavailable; + private static readonly bool _ignoreIfWinCiSkip; + private static bool _cached = false; + private static string? _cache = null; - public RequiresExternalService() + static RequiresExternalService() { - string? ci = Environment.GetEnvironmentVariable("CI"); - if (!string.IsNullOrWhiteSpace(ci) && (ci == "1" || ci.Equals("TRUE", StringComparison.CurrentCultureIgnoreCase))) - FailIfUnavailable = true; + var ci = Environment.GetEnvironmentVariable("CI"); + if (!string.IsNullOrWhiteSpace(ci) && (ci == "1" || ci.Equals("TRUE", StringComparison.OrdinalIgnoreCase))) + _failIfUnavailable = true; if ( Environment.GetEnvironmentVariable("CI_SKIP_WIN_SERVICES") == "1" && RuntimeInformation.IsOSPlatform(System.Runtime.InteropServices.OSPlatform.Windows) ) - IgnoreIfWinCiSkip = true; + _ignoreIfWinCiSkip = true; } public void ApplyToContext(TestExecutionContext context) { - if (IgnoreIfWinCiSkip) + if (_ignoreIfWinCiSkip) Assert.Ignore("CI_SKIP_WIN_SERVICES"); - ApplyToContextImpl(context); + if (!_cached) + { + _cached = true; + _cache = ApplyToContextImpl(); + } + + if (_cache is null) return; + + if (_failIfUnavailable) + Assert.Fail(_cache); + else + Assert.Ignore(_cache); } - protected virtual void ApplyToContextImpl(TestExecutionContext context) { } + protected abstract string? ApplyToContextImpl(); } } diff --git a/tests/SmiServices.IntegrationTests/RequiresMongoDb.cs b/tests/SmiServices.IntegrationTests/RequiresMongoDb.cs index 037c6d64a..41170c8bd 100644 --- a/tests/SmiServices.IntegrationTests/RequiresMongoDb.cs +++ b/tests/SmiServices.IntegrationTests/RequiresMongoDb.cs @@ -2,7 +2,6 @@ using MongoDB.Bson; using MongoDB.Driver; using NUnit.Framework; -using NUnit.Framework.Internal; using System; using System.IO; using YamlDotNet.Serialization; @@ -12,9 +11,9 @@ namespace SmiServices.IntegrationTests [AttributeUsage(AttributeTargets.Method | AttributeTargets.Class | AttributeTargets.Interface | AttributeTargets.Assembly, AllowMultiple = true)] public class RequiresMongoDb : RequiresExternalService { - protected override void ApplyToContextImpl(TestExecutionContext context) + protected override string? ApplyToContextImpl() { - MongoClientSettings address = GetMongoClientSettings(); + var address = GetMongoClientSettings(); Console.WriteLine("Checking the following configuration:" + Environment.NewLine + address); @@ -26,23 +25,18 @@ protected override void ApplyToContextImpl(TestExecutionContext context) } catch (Exception e) { - string msg = + return e is MongoNotPrimaryException - ? "Connected to non-primary MongoDB server. Check replication is enabled" - : $"Could not connect to MongoDB at {address}"; - - msg += $": {e}"; - - if (FailIfUnavailable) - Assert.Fail(msg); - else - Assert.Ignore(msg); + ? "Connected to non-primary MongoDB server. Check replication is enabled" + : $"Could not connect to MongoDB at {address}: {e}"; } + + return null; } public static MongoClientSettings GetMongoClientSettings() { - IDeserializer deserializer = new DeserializerBuilder() + var deserializer = new DeserializerBuilder() .IgnoreUnmatchedProperties() .Build(); diff --git a/tests/SmiServices.IntegrationTests/RequiresRabbit.cs b/tests/SmiServices.IntegrationTests/RequiresRabbit.cs index 7aae294be..ca112c85f 100644 --- a/tests/SmiServices.IntegrationTests/RequiresRabbit.cs +++ b/tests/SmiServices.IntegrationTests/RequiresRabbit.cs @@ -1,6 +1,5 @@ using NUnit.Framework; -using NUnit.Framework.Internal; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using System; @@ -14,9 +13,8 @@ namespace SmiServices.IntegrationTests AttributeTargets.Assembly, AllowMultiple = true)] public class RequiresRabbit : RequiresExternalService { - protected override void ApplyToContextImpl(TestExecutionContext context) + protected override string? ApplyToContextImpl() { - var factory = GetConnectionFactory(); factory.ContinuationTimeout = TimeSpan.FromSeconds(5); factory.HandshakeContinuationTimeout = TimeSpan.FromSeconds(5); @@ -26,9 +24,10 @@ protected override void ApplyToContextImpl(TestExecutionContext context) try { - using var conn = factory.CreateConnection(); - using var model = conn.CreateModel(); - model.ExchangeDeclare("TEST.ControlExchange", ExchangeType.Topic, durable: true); + using var conn = factory.CreateConnectionAsync().Result; + using var model = conn.CreateChannelAsync().Result; + model.ExchangeDeclareAsync("TEST.ControlExchange", ExchangeType.Topic, durable: true).Wait(); + return null; } catch (BrokerUnreachableException e) { @@ -40,26 +39,17 @@ protected override void ApplyToContextImpl(TestExecutionContext context) sb.AppendLine($"UserName: {factory.UserName}"); sb.AppendLine($"Port: {factory.Port}"); - string msg = $"Could not connect to RabbitMQ {Environment.NewLine}{sb}{Environment.NewLine}{e.Message}"; - - // NOTE(rkm 2021-01-30) Don't fail for Windows CI builds - bool shouldFail = FailIfUnavailable && !Environment.OSVersion.ToString().Contains("windows", StringComparison.CurrentCultureIgnoreCase); - - if (shouldFail) - Assert.Fail(msg); - else - Assert.Ignore(msg); + return $"Could not connect to RabbitMQ {Environment.NewLine}{sb}{Environment.NewLine}{e.Message}"; } } public static ConnectionFactory GetConnectionFactory() { - IDeserializer deserializer = new DeserializerBuilder() + var deserializer = new DeserializerBuilder() .IgnoreUnmatchedProperties() .Build(); return deserializer.Deserialize(new StreamReader(Path.Combine(TestContext.CurrentContext.TestDirectory, "Rabbit.yaml"))); } - } } diff --git a/tests/SmiServices.IntegrationTests/RequiresRelationalDb.cs b/tests/SmiServices.IntegrationTests/RequiresRelationalDb.cs index b2f5d39fc..d48da9f03 100644 --- a/tests/SmiServices.IntegrationTests/RequiresRelationalDb.cs +++ b/tests/SmiServices.IntegrationTests/RequiresRelationalDb.cs @@ -1,7 +1,6 @@ using FAnsi; using FAnsi.Discovery; using NUnit.Framework; -using NUnit.Framework.Internal; using SmiServices.Common; using System; using System.IO; @@ -21,21 +20,16 @@ public RequiresRelationalDb(DatabaseType type) _type = type; } - protected override void ApplyToContextImpl(TestExecutionContext context) + protected override string? ApplyToContextImpl() { FansiImplementations.Load(); var connectionStrings = GetRelationalDatabaseConnectionStrings(); var server = connectionStrings.GetServer(_type); - if (server.Exists()) - return; - - string msg = $"Could not connect to {_type} at '{server.Name}' with the provided connection options"; - if (FailIfUnavailable) - Assert.Fail(msg); - else - Assert.Ignore(msg); + return server.Exists() + ? null + : $"Could not connect to {_type} at '{server.Name}' with the provided connection options"; } public static ConStrs GetRelationalDatabaseConnectionStrings() diff --git a/tests/SmiServices.UnitTests/Common/Messaging/ProducerModelTests.cs b/tests/SmiServices.UnitTests/Common/Messaging/ProducerModelTests.cs index 0d711e697..0284abfa5 100644 --- a/tests/SmiServices.UnitTests/Common/Messaging/ProducerModelTests.cs +++ b/tests/SmiServices.UnitTests/Common/Messaging/ProducerModelTests.cs @@ -23,7 +23,7 @@ public void SendMessage_HappyPath() { // Arrange bool timedOut = false; - var mockModel = new Mock(MockBehavior.Strict); + var mockModel = new Mock(MockBehavior.Strict); mockModel.Setup(x => x.BasicPublish("Exchange", "", true, It.IsAny(), It.IsAny>())); mockModel.Setup(x => x.WaitForConfirms(It.IsAny(), out timedOut)).Returns(true); @@ -49,7 +49,7 @@ public void SendMessage_ThrowsException_OnTimeout() { // Arrange bool timedOut = true; - var mockModel = new Mock(MockBehavior.Strict); + var mockModel = new Mock(MockBehavior.Strict); mockModel.Setup(x => x.BasicPublish("Exchange", "", true, It.IsAny(), It.IsAny>())); mockModel.Setup(x => x.WaitForConfirms(It.IsAny(), out timedOut)).Returns(false); @@ -74,7 +74,7 @@ public void SendMessage_WithSenseQueue_HappyPath() { // Arrange bool timedOut = false; - var mockModel = new Mock(MockBehavior.Strict); + var mockModel = new Mock(MockBehavior.Strict); mockModel.Setup(x => x.BasicPublish("Exchange", "", true, It.IsAny(), It.IsAny>())); mockModel.Setup(x => x.WaitForConfirms(It.IsAny(), out timedOut)).Returns(true); mockModel.Setup(x => x.MessageCount("ProbeQueue")).Returns(0); @@ -101,7 +101,7 @@ public void SendMessage_WithSenseQueue_OverLimit() { // Arrange bool timedOut = false; - var mockModel = new Mock(MockBehavior.Strict); + var mockModel = new Mock(MockBehavior.Strict); mockModel.Setup(x => x.BasicPublish("Exchange", "", true, It.IsAny(), It.IsAny>())); mockModel.Setup(x => x.WaitForConfirms(It.IsAny(), out timedOut)).Returns(true); mockModel.SetupSequence(x => x.MessageCount("ProbeQueue")) diff --git a/tests/SmiServices.UnitTests/Common/MicroserviceTester.cs b/tests/SmiServices.UnitTests/Common/MicroserviceTester.cs index 043df3ca0..c13d07d06 100644 --- a/tests/SmiServices.UnitTests/Common/MicroserviceTester.cs +++ b/tests/SmiServices.UnitTests/Common/MicroserviceTester.cs @@ -156,7 +156,7 @@ public void CreateExchange(string exchangeName, string? queueName = null, bool i /// public IEnumerable> ConsumeMessages(string queueName) where T : IMessage { - IModel model = Broker.GetModel($"ConsumeMessages-{queueName}"); + IChannel model = Broker.GetModel($"ConsumeMessages-{queueName}"); while (true) { @@ -187,7 +187,7 @@ public void Dispose() if (CleanUpAfterTest) { - using IModel model = Broker.GetModel(nameof(Dispose)); + using IChannel model = Broker.GetModel(nameof(Dispose)); _declaredExchanges.ForEach(x => model.ExchangeDelete(x)); _declaredQueues.ForEach(x => model.QueueDelete(x)); } diff --git a/tests/SmiServices.UnitTests/Microservices/CohortExtractor/Messaging/ExtractionRequestQueueConsumerTest.cs b/tests/SmiServices.UnitTests/Microservices/CohortExtractor/Messaging/ExtractionRequestQueueConsumerTest.cs index 32f208831..9cf74f8a2 100644 --- a/tests/SmiServices.UnitTests/Microservices/CohortExtractor/Messaging/ExtractionRequestQueueConsumerTest.cs +++ b/tests/SmiServices.UnitTests/Microservices/CohortExtractor/Messaging/ExtractionRequestQueueConsumerTest.cs @@ -118,7 +118,7 @@ private void AssertMessagePublishedWithSpecifiedKey(GlobalOptions globals, bool fatalErrorEventArgs = args; }; - var mockModel = new Mock(MockBehavior.Strict); + var mockModel = new Mock(MockBehavior.Strict); mockModel.Setup(x => x.IsClosed).Returns(false); mockModel.Setup(x => x.BasicAck(It.IsAny(), It.IsAny())).Verifiable(); diff --git a/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/AnonFailedMessageConsumerTests.cs b/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/AnonFailedMessageConsumerTests.cs index d41c4c7df..111d93f6c 100644 --- a/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/AnonFailedMessageConsumerTests.cs +++ b/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/AnonFailedMessageConsumerTests.cs @@ -46,7 +46,7 @@ public void ProcessMessageImpl_HappyPath() jobStoreMock.Setup(x => x.PersistMessageToStore(It.IsAny(), It.IsAny())); var consumer = new AnonFailedMessageConsumer(jobStoreMock.Object); - consumer.SetModel(new Mock(MockBehavior.Loose).Object); + consumer.SetModel(new Mock(MockBehavior.Loose).Object); var message = new ExtractedFileStatusMessage { @@ -80,7 +80,7 @@ public void ProcessMessageImpl_HandlesApplicationException() .Throws(new ApplicationException("Some error...")); var consumer = new AnonFailedMessageConsumer(jobStoreMock.Object); - consumer.SetModel(new Mock(MockBehavior.Loose).Object); + consumer.SetModel(new Mock(MockBehavior.Loose).Object); var message = new ExtractedFileStatusMessage { diff --git a/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/AnonVerificationMessageConsumerTests.cs b/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/AnonVerificationMessageConsumerTests.cs index 766891e28..f91dc4717 100644 --- a/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/AnonVerificationMessageConsumerTests.cs +++ b/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/AnonVerificationMessageConsumerTests.cs @@ -65,7 +65,7 @@ public void TearDown() { } private AnonVerificationMessageConsumer NewConsumer(bool processBatches, int maxUnacknowledgedMessages, TimeSpan verificationMessageQueueFlushTime) { var consumer = new AnonVerificationMessageConsumer(_mockJobStore.Object, processBatches, maxUnacknowledgedMessages, verificationMessageQueueFlushTime); - consumer.SetModel(new Mock(MockBehavior.Loose).Object); + consumer.SetModel(new Mock(MockBehavior.Loose).Object); return consumer; } diff --git a/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/ExtractFileCollectionMessageConsumerTests.cs b/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/ExtractFileCollectionMessageConsumerTests.cs index f588c0f93..4727dd663 100644 --- a/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/ExtractFileCollectionMessageConsumerTests.cs +++ b/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/ExtractFileCollectionMessageConsumerTests.cs @@ -46,7 +46,7 @@ public void ProcessMessageImpl_HappyPath() jobStoreMock.Setup(x => x.PersistMessageToStore(It.IsAny(), It.IsAny())); var consumer = new ExtractFileCollectionMessageConsumer(jobStoreMock.Object); - consumer.SetModel(new Mock(MockBehavior.Loose).Object); + consumer.SetModel(new Mock(MockBehavior.Loose).Object); var message = new ExtractFileCollectionInfoMessage(); @@ -74,7 +74,7 @@ public void ProcessMessageImpl_HandlesApplicationException() .Throws(new ApplicationException("Some error...")); var consumer = new ExtractFileCollectionMessageConsumer(jobStoreMock.Object); - consumer.SetModel(new Mock(MockBehavior.Loose).Object); + consumer.SetModel(new Mock(MockBehavior.Loose).Object); var message = new ExtractFileCollectionInfoMessage(); diff --git a/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/ExtractionRequestInfoMessageConsumerTests.cs b/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/ExtractionRequestInfoMessageConsumerTests.cs index b4238cd51..867a698bb 100644 --- a/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/ExtractionRequestInfoMessageConsumerTests.cs +++ b/tests/SmiServices.UnitTests/Microservices/CohortPackager/Messaging/ExtractionRequestInfoMessageConsumerTests.cs @@ -46,7 +46,7 @@ public void ProcessMessageImpl_HappyPath() jobStoreMock.Setup(x => x.PersistMessageToStore(It.IsAny(), It.IsAny())); var consumer = new ExtractionRequestInfoMessageConsumer(jobStoreMock.Object); - consumer.SetModel(new Mock(MockBehavior.Loose).Object); + consumer.SetModel(new Mock(MockBehavior.Loose).Object); var message = new ExtractionRequestInfoMessage(); @@ -74,7 +74,7 @@ public void ProcessMessageImpl_HandlesApplicationException() .Throws(new ApplicationException("Some error...")); var consumer = new ExtractionRequestInfoMessageConsumer(jobStoreMock.Object); - consumer.SetModel(new Mock(MockBehavior.Loose).Object); + consumer.SetModel(new Mock(MockBehavior.Loose).Object); var message = new ExtractionRequestInfoMessage(); diff --git a/tests/SmiServices.UnitTests/Microservices/DicomAnonymiser/DicomAnonymiserConsumerTests.cs b/tests/SmiServices.UnitTests/Microservices/DicomAnonymiser/DicomAnonymiserConsumerTests.cs index e99dcac6e..c6a7f7684 100644 --- a/tests/SmiServices.UnitTests/Microservices/DicomAnonymiser/DicomAnonymiserConsumerTests.cs +++ b/tests/SmiServices.UnitTests/Microservices/DicomAnonymiser/DicomAnonymiserConsumerTests.cs @@ -30,7 +30,7 @@ public class DicomAnonymiserConsumerTests private string _sourceDcmPathAbs = null!; private ExtractFileMessage _extractFileMessage = null!; private DicomAnonymiserOptions _options = null!; - private Mock _mockModel = null!; + private Mock _mockModel = null!; [OneTimeSetUp] public void OneTimeSetUp() @@ -96,7 +96,7 @@ public void SetUp() RoutingKeyFailure = "nay" }; - _mockModel = new Mock(MockBehavior.Strict); + _mockModel = new Mock(MockBehavior.Strict); _mockModel.Setup(x => x.IsClosed).Returns(false); _mockModel.Setup(x => x.BasicAck(It.IsAny(), It.IsAny())); _mockModel.Setup(x => x.BasicNack(It.IsAny(), It.IsAny(), It.IsAny())); diff --git a/tests/SmiServices.UnitTests/Microservices/DicomTagReader/DicomTagReaderTestHelper.cs b/tests/SmiServices.UnitTests/Microservices/DicomTagReader/DicomTagReaderTestHelper.cs index 4b4fed64a..5e708ff50 100644 --- a/tests/SmiServices.UnitTests/Microservices/DicomTagReader/DicomTagReaderTestHelper.cs +++ b/tests/SmiServices.UnitTests/Microservices/DicomTagReader/DicomTagReaderTestHelper.cs @@ -24,7 +24,7 @@ public class DicomTagReaderTestHelper public AccessionDirectoryMessage TestAccessionDirectoryMessage = null!; private IConnection _testConnection = null!; - private IModel _testModel = null!; + private IChannel _testModel = null!; public Mock TestSeriesModel = null!; public Mock TestImageModel = null!; diff --git a/tests/SmiServices.UnitTests/Microservices/FileCopier/FileCopyQueueConsumerTest.cs b/tests/SmiServices.UnitTests/Microservices/FileCopier/FileCopyQueueConsumerTest.cs index d1fd55d15..423a57dd8 100644 --- a/tests/SmiServices.UnitTests/Microservices/FileCopier/FileCopyQueueConsumerTest.cs +++ b/tests/SmiServices.UnitTests/Microservices/FileCopier/FileCopyQueueConsumerTest.cs @@ -16,7 +16,7 @@ public class FileCopyQueueConsumerTest #region Fixture Methods private ExtractFileMessage _message = null!; - private Mock _mockModel = null!; + private Mock _mockModel = null!; private Mock _mockFileCopier = null!; [OneTimeSetUp] @@ -45,7 +45,7 @@ public void SetUp() IsIdentifiableExtraction = true, OutputPath = "bar", }; - _mockModel = new Mock(MockBehavior.Strict); + _mockModel = new Mock(MockBehavior.Strict); _mockModel.Setup(x => x.IsClosed).Returns(false); _mockModel.Setup(x => x.BasicAck(It.IsAny(), It.IsAny())); _mockModel.Setup(x => x.BasicNack(It.IsAny(), It.IsAny(), It.IsAny()));