From 66fe8b8a9e08e9e0013274d2885488ae6adc2526 Mon Sep 17 00:00:00 2001 From: Hadi Date: Mon, 17 Jun 2024 15:15:12 +0100 Subject: [PATCH 1/2] feat: add retry and back-off when creating receivers and senders --- src/service/queue/queue.service.spec.ts | 70 +++++++++++++++++++++++++ src/service/queue/queue.service.ts | 36 ++++++++----- 2 files changed, 93 insertions(+), 13 deletions(-) diff --git a/src/service/queue/queue.service.spec.ts b/src/service/queue/queue.service.spec.ts index 4b00821..d99464f 100644 --- a/src/service/queue/queue.service.spec.ts +++ b/src/service/queue/queue.service.spec.ts @@ -241,6 +241,42 @@ describe('QueueService', () => { const messageControl = getInternallyCreatedMessageControl(); expect(messageControl.accept).toHaveBeenCalled(); }); + + it('should return an existing receiver if already created', async () => { + const receiver = {} as Receiver; + const source = 'test-queue'; + queueService['receivers'].set('default:test-queue', receiver); + + const result = await queueService['getReceiver'](source, 1, jest.fn(), 'default'); + + expect(result).toBe(receiver); + expect(amqpService.createReceiver).not.toHaveBeenCalled(); + }); + + it('should create a new receiver if not already created', async () => { + const receiver = {} as Receiver; + const source = 'test-queue'; + const messageHandler = jest.fn(); + + (amqpService as any).createReceiver.mockResolvedValue(receiver); + + const result = await queueService['getReceiver'](source, 1, messageHandler, 'default'); + + expect(result).toBe(receiver); + expect(amqpService.createReceiver).toHaveBeenCalledWith(source, 1, expect.any(Function), 'default'); + }); + + it('should retry creating a receiver on failure', async () => { + const source = 'test-queue'; + const messageHandler = jest.fn(); + + (amqpService as any).createReceiver.mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as Receiver); + + const result = await queueService['getReceiver'](source, 1, messageHandler, 'default'); + + expect(result).toBeDefined(); + expect(amqpService.createReceiver).toHaveBeenCalledTimes(2); + }); }); }); @@ -347,6 +383,40 @@ describe('QueueService', () => { expect(sender.send).toHaveBeenCalledWith({ body: 'null', message_annotations: { 'x-opt-delivery-delay': delay * 1000 } }); }); + + it('should return an existing sender if already created', async () => { + const sender = {} as AwaitableSender; + const target = 'test-queue'; + queueService['senders'].set('default:test-queue', sender); + + const result = await queueService['getSender'](target, 'default'); + + expect(result).toBe(sender); + expect(amqpService.createSender).not.toHaveBeenCalled(); + }); + + it('should create a new sender if not already created', async () => { + const sender = {} as AwaitableSender; + const target = 'test-queue'; + + (amqpService as any).createSender.mockResolvedValue(sender); + + const result = await queueService['getSender'](target, 'default'); + + expect(result).toBe(sender); + expect(amqpService.createSender).toHaveBeenCalledWith(target, 'default'); + }); + + it('should retry creating a sender on failure', async () => { + const target = 'test-queue'; + + (amqpService as any).createSender.mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as AwaitableSender); + + const result = await queueService['getSender'](target, 'default'); + + expect(result).toBeDefined(); + expect(amqpService.createSender).toHaveBeenCalledTimes(2); + }); }); describe('removeListener()', () => { diff --git a/src/service/queue/queue.service.ts b/src/service/queue/queue.service.ts index 8c4c24b..b5898cd 100644 --- a/src/service/queue/queue.service.ts +++ b/src/service/queue/queue.service.ts @@ -4,12 +4,12 @@ import { AwaitableSender, Delivery, EventContext, Message, Receiver, Source } fr import { extendObject, + getLoggerContext, + Logger, sleep, tryParseJSON, - ValidationNullObjectException, - Logger, - getLoggerContext, ValidationException, + ValidationNullObjectException, } from '../../util'; import { MessageControl } from '../../domain'; import { SendState } from '../../enum'; @@ -29,6 +29,7 @@ const toString = Object.prototype.toString; export class QueueService { private readonly receivers: Map; private readonly senders: Map; + private readonly reconnectDelay: number = 5000; // 5 seconds constructor(private readonly amqpService: AMQPService, private readonly objectValidatorService: ObjectValidatorService) { // this means only one sender and receiver / app / queue @@ -334,28 +335,37 @@ export class QueueService { const receiverToken = this.getLinkToken(sourceToken, connection); - if (!this.receivers.has(receiverToken)) { + if (this.receivers.has(receiverToken)) { + return this.receivers.get(receiverToken); + } + + try { const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); this.receivers.set(receiverToken, receiver); + return receiver; + } catch (error) { + logger.error(`Error creating receiver: ${error.message}`, error.stack); + await sleep(this.reconnectDelay); + return this.getReceiver(source, credit, messageHandler, connection); } - - return this.receivers.get(receiverToken); } private async getSender(target: string, connection: string): Promise { - let sender; - const senderToken = this.getLinkToken(target, connection); if (this.senders.has(senderToken)) { - sender = this.senders.get(senderToken); - } else { - sender = await this.amqpService.createSender(target, connection); + return this.senders.get(senderToken); + } + try { + const sender = await this.amqpService.createSender(target, connection); this.senders.set(senderToken, sender); + return sender; + } catch (error) { + logger.error(`Error creating sender: ${error.message}`, error.stack); + await sleep(this.reconnectDelay); + return this.getSender(target, connection); } - - return sender; } private encodeMessage(message: any): string { From 602c2220d002fc834fe890a1251c22275de7a9dc Mon Sep 17 00:00:00 2001 From: Hadi Date: Tue, 18 Jun 2024 17:14:21 +0100 Subject: [PATCH 2/2] feat: expose retry in connection option configuration --- .../queue-module-options.interface.ts | 13 +++ src/service/queue/queue.service.spec.ts | 95 +++++++++++++++++++ src/service/queue/queue.service.ts | 68 +++++++++---- 3 files changed, 156 insertions(+), 20 deletions(-) diff --git a/src/interface/queue-module-options.interface.ts b/src/interface/queue-module-options.interface.ts index 0c45643..34c8389 100644 --- a/src/interface/queue-module-options.interface.ts +++ b/src/interface/queue-module-options.interface.ts @@ -71,4 +71,17 @@ export interface AMQPConnectionOptions { * Connection options directly used by `rhea` */ connectionOptions?: ConnectionOptions; + + /** + * Retry configuration for senders and receivers + */ + retryConnection?: { + receiver?: RetryConfig; + sender?: RetryConfig; + }; +} + +export interface RetryConfig { + retryDelay?: number; + maxRetryAttempts?: number; } diff --git a/src/service/queue/queue.service.spec.ts b/src/service/queue/queue.service.spec.ts index d99464f..5f92933 100644 --- a/src/service/queue/queue.service.spec.ts +++ b/src/service/queue/queue.service.spec.ts @@ -64,6 +64,19 @@ describe('QueueService', () => { createReceiver: jest.fn().mockResolvedValue(jest.fn().mockResolvedValue(new EventContextMock().receiver)), createSender: jest.fn().mockResolvedValue(new EventContextMock().sender), disconnect: jest.fn().mockResolvedValue(jest.fn()), + getConnectionOptions: jest.fn(() => ({ + connectionUri: 'amqp://test', + retryConnection: { + receiver: { + retryDelay: 1000, + maxRetryAttempts: 3, + }, + sender: { + retryDelay: 1000, + maxRetryAttempts: 3, + }, + }, + })), getModuleOptions(): QueueModuleOptions { return moduleOptions; }, @@ -277,6 +290,47 @@ describe('QueueService', () => { expect(result).toBeDefined(); expect(amqpService.createReceiver).toHaveBeenCalledTimes(2); }); + + it('should not retry creating a receiver if maxRetryAttempts is 1', async () => { + (amqpService as any).getConnectionOptions.mockReturnValueOnce({ + retryConnection: { + receiver: { + retryDelay: 1000, + maxRetryAttempts: 1, + }, + }, + }); + + const source = 'test-queue'; + const messageHandler = jest.fn(); + + (amqpService as any).createReceiver.mockRejectedValue(new Error('Test error')); + + await expect(queueService['getReceiver'](source, 1, messageHandler, 'default')).rejects.toThrow('Test error'); + + expect(amqpService.createReceiver).toHaveBeenCalledTimes(1); + }); + + it('should not retry if both retryDelay and maxRetryAttempts are set to zero', async () => { + (amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({ + connectionUri: 'amqp://test', + retryConnection: { + receiver: { + retryDelay: 0, + maxRetryAttempts: 0, + }, + }, + }); + + const source = 'test-queue'; + const messageHandler = jest.fn(); + + (amqpService.createReceiver as jest.Mock).mockRejectedValue(new Error('Test error')); + + await expect(queueService['getReceiver'](source, 1, messageHandler, 'default')).rejects.toThrow('Test error'); + + expect(amqpService.createReceiver).toHaveBeenCalledTimes(1); + }); }); }); @@ -417,6 +471,47 @@ describe('QueueService', () => { expect(result).toBeDefined(); expect(amqpService.createSender).toHaveBeenCalledTimes(2); }); + + it('should retry creating a sender with custom retry configuration', async () => { + (amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({ + connectionUri: 'amqp://test', + retryConnection: { + sender: { + retryDelay: 500, + maxRetryAttempts: 2, + }, + }, + }); + + const target = 'test-queue'; + + (amqpService.createSender as jest.Mock).mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as AwaitableSender); + + const result = await queueService['getSender'](target, 'default'); + + expect(result).toBeDefined(); + expect(amqpService.createSender).toHaveBeenCalledTimes(2); + }); + + it('should not retry creating a sender if maxRetryAttempts is 1', async () => { + (amqpService.getConnectionOptions as jest.Mock).mockReturnValueOnce({ + connectionUri: 'amqp://test', + retryConnection: { + sender: { + retryDelay: 1000, + maxRetryAttempts: 1, + }, + }, + }); + + const target = 'test-queue'; + + (amqpService.createSender as jest.Mock).mockRejectedValue(new Error('Test error')); + + await expect(queueService['getSender'](target, 'default')).rejects.toThrow('Test error'); + + expect(amqpService.createSender).toHaveBeenCalledTimes(1); + }); }); describe('removeListener()', () => { diff --git a/src/service/queue/queue.service.ts b/src/service/queue/queue.service.ts index b5898cd..98b07cf 100644 --- a/src/service/queue/queue.service.ts +++ b/src/service/queue/queue.service.ts @@ -29,7 +29,6 @@ const toString = Object.prototype.toString; export class QueueService { private readonly receivers: Map; private readonly senders: Map; - private readonly reconnectDelay: number = 5000; // 5 seconds constructor(private readonly amqpService: AMQPService, private readonly objectValidatorService: ObjectValidatorService) { // this means only one sender and receiver / app / queue @@ -332,22 +331,36 @@ export class QueueService { connection: string, ): Promise { const sourceToken = typeof source === 'string' ? source : JSON.stringify(source); - const receiverToken = this.getLinkToken(sourceToken, connection); if (this.receivers.has(receiverToken)) { return this.receivers.get(receiverToken); } - try { - const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); - this.receivers.set(receiverToken, receiver); - return receiver; - } catch (error) { - logger.error(`Error creating receiver: ${error.message}`, error.stack); - await sleep(this.reconnectDelay); - return this.getReceiver(source, credit, messageHandler, connection); - } + const connectionOptions = this.amqpService.getConnectionOptions(connection); + const retryDelay = connectionOptions.retryConnection?.receiver?.retryDelay ?? 0; + const maxRetryAttempts = connectionOptions.retryConnection?.receiver?.maxRetryAttempts ?? 1; + + let attempt = 0; + + do { + try { + const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); + this.receivers.set(receiverToken, receiver); + return receiver; + } catch (error) { + logger.error(`Error creating receiver (attempt ${attempt + 1}): ${error.message}`, error.stack); + + attempt = attempt + 1; + if (attempt >= maxRetryAttempts) { + throw new Error(`Max retry attempts reached for creating receiver: ${error.message}`); + } + + if (retryDelay > 0) { + await sleep(retryDelay); + } + } + } while (attempt < maxRetryAttempts); } private async getSender(target: string, connection: string): Promise { @@ -357,15 +370,30 @@ export class QueueService { return this.senders.get(senderToken); } - try { - const sender = await this.amqpService.createSender(target, connection); - this.senders.set(senderToken, sender); - return sender; - } catch (error) { - logger.error(`Error creating sender: ${error.message}`, error.stack); - await sleep(this.reconnectDelay); - return this.getSender(target, connection); - } + const connectionOptions = this.amqpService.getConnectionOptions(connection); + const retryDelay = connectionOptions.retryConnection?.sender?.retryDelay ?? 0; + const maxRetryAttempts = connectionOptions.retryConnection?.sender?.maxRetryAttempts ?? 1; + + let attempt = 0; + + do { + try { + const sender = await this.amqpService.createSender(target, connection); + this.senders.set(senderToken, sender); + return sender; + } catch (error) { + logger.error(`Error creating sender (attempt ${attempt + 1}): ${error.message}`, error.stack); + + attempt++; + if (attempt >= maxRetryAttempts) { + throw new Error(`Max retry attempts reached for creating sender: ${error.message}`); + } + + if (retryDelay > 0) { + await sleep(retryDelay); + } + } + } while (attempt < maxRetryAttempts); } private encodeMessage(message: any): string {