Skip to content

Commit

Permalink
feat: add retry and back-off when creating receivers and senders
Browse files Browse the repository at this point in the history
  • Loading branch information
Hadi-E authored and raschan committed Jun 21, 2024
1 parent 72b66bb commit f8596c8
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 13 deletions.
70 changes: 70 additions & 0 deletions src/service/queue/queue.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,42 @@ describe('QueueService', () => {
const messageControl = getInternallyCreatedMessageControl();
expect(messageControl.accept).toHaveBeenCalled();
});

it('should return an existing receiver if already created', async () => {
const receiver = {} as Receiver;
const source = 'test-queue';
queueService['receivers'].set('default:test-queue', receiver);

const result = await queueService['getReceiver'](source, 1, jest.fn(), 'default');

expect(result).toBe(receiver);
expect(amqpService.createReceiver).not.toHaveBeenCalled();
});

it('should create a new receiver if not already created', async () => {
const receiver = {} as Receiver;
const source = 'test-queue';
const messageHandler = jest.fn();

(amqpService as any).createReceiver.mockResolvedValue(receiver);

const result = await queueService['getReceiver'](source, 1, messageHandler, 'default');

expect(result).toBe(receiver);
expect(amqpService.createReceiver).toHaveBeenCalledWith(source, 1, expect.any(Function), 'default');
});

it('should retry creating a receiver on failure', async () => {
const source = 'test-queue';
const messageHandler = jest.fn();

(amqpService as any).createReceiver.mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as Receiver);

const result = await queueService['getReceiver'](source, 1, messageHandler, 'default');

expect(result).toBeDefined();
expect(amqpService.createReceiver).toHaveBeenCalledTimes(2);
});
});
});

Expand Down Expand Up @@ -347,6 +383,40 @@ describe('QueueService', () => {

expect(sender.send).toHaveBeenCalledWith({ body: 'null', message_annotations: { 'x-opt-delivery-delay': delay * 1000 } });
});

it('should return an existing sender if already created', async () => {
const sender = {} as AwaitableSender;
const target = 'test-queue';
queueService['senders'].set('default:test-queue', sender);

const result = await queueService['getSender'](target, 'default');

expect(result).toBe(sender);
expect(amqpService.createSender).not.toHaveBeenCalled();
});

it('should create a new sender if not already created', async () => {
const sender = {} as AwaitableSender;
const target = 'test-queue';

(amqpService as any).createSender.mockResolvedValue(sender);

const result = await queueService['getSender'](target, 'default');

expect(result).toBe(sender);
expect(amqpService.createSender).toHaveBeenCalledWith(target, 'default');
});

it('should retry creating a sender on failure', async () => {
const target = 'test-queue';

(amqpService as any).createSender.mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as AwaitableSender);

const result = await queueService['getSender'](target, 'default');

expect(result).toBeDefined();
expect(amqpService.createSender).toHaveBeenCalledTimes(2);
});
});

describe('removeListener()', () => {
Expand Down
36 changes: 23 additions & 13 deletions src/service/queue/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import { AwaitableSender, Delivery, EventContext, Message, Receiver, Source } fr

import {
extendObject,
getLoggerContext,
Logger,
sleep,
tryParseJSON,
ValidationNullObjectException,
Logger,
getLoggerContext,
ValidationException,
ValidationNullObjectException,
} from '../../util';
import { MessageControl } from '../../domain';
import { SendState } from '../../enum';
Expand All @@ -29,6 +29,7 @@ const toString = Object.prototype.toString;
export class QueueService {
private readonly receivers: Map<string, Receiver>;
private readonly senders: Map<string, AwaitableSender>;
private readonly reconnectDelay: number = 5000; // 5 seconds

constructor(private readonly amqpService: AMQPService, private readonly objectValidatorService: ObjectValidatorService) {
// this means only one sender and receiver / app / queue
Expand Down Expand Up @@ -334,28 +335,37 @@ export class QueueService {

const receiverToken = this.getLinkToken(sourceToken, connection);

if (!this.receivers.has(receiverToken)) {
if (this.receivers.has(receiverToken)) {
return this.receivers.get(receiverToken);
}

try {
const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection);
this.receivers.set(receiverToken, receiver);
return receiver;
} catch (error) {
logger.error(`Error creating receiver: ${error.message}`, error.stack);
await sleep(this.reconnectDelay);
return this.getReceiver(source, credit, messageHandler, connection);
}

return this.receivers.get(receiverToken);
}

private async getSender(target: string, connection: string): Promise<AwaitableSender> {
let sender;

const senderToken = this.getLinkToken(target, connection);

if (this.senders.has(senderToken)) {
sender = this.senders.get(senderToken);
} else {
sender = await this.amqpService.createSender(target, connection);
return this.senders.get(senderToken);
}

try {
const sender = await this.amqpService.createSender(target, connection);
this.senders.set(senderToken, sender);
return sender;
} catch (error) {
logger.error(`Error creating sender: ${error.message}`, error.stack);
await sleep(this.reconnectDelay);
return this.getSender(target, connection);
}

return sender;
}

private encodeMessage(message: any): string {
Expand Down

0 comments on commit f8596c8

Please sign in to comment.