Skip to content

Commit

Permalink
feat(queue-service): add removeListener method
Browse files Browse the repository at this point in the history
Closes #70
  • Loading branch information
raschan committed Feb 7, 2024
1 parent db4e035 commit 522ca89
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 21 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,20 @@ the connections defined in the `forRoot` method contains a connection named `Con
> Note: If you leave out the name of the connection, the listener will be attached to the default connection (if exists).

### Removing a listener

If you want to remove a listener, you can use the `removeListener()` method of the `QueueService`. The method's first parameter is the name string or `Source` object of the queue, and the second (optional) parameter is the connection name if you have set it up. Here is an example:

```typescript
// with the default connection
await this.queueService.removeListener('example');
```

```typescript
// with named connection
await this.queueService.removeListener('example', Connections.TEST);
```

### Message control

When a new message arrives at a queue, the assigned method with `@Listen()` decorator receives the transformed and validated message body
Expand Down
30 changes: 20 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 57 additions & 1 deletion src/service/queue/queue.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ describe('QueueService', () => {
{
provide: AMQPService,
useValue: {
createReceiver: jest.fn().mockResolvedValue(jest.fn()),
createReceiver: jest.fn().mockResolvedValue(jest.fn().mockResolvedValue(new EventContextMock().receiver)),
createSender: jest.fn().mockResolvedValue(new EventContextMock().sender),
disconnect: jest.fn().mockResolvedValue(jest.fn()),
getModuleOptions(): QueueModuleOptions {
Expand Down Expand Up @@ -349,6 +349,62 @@ describe('QueueService', () => {
});
});

describe('removeListener()', () => {
it('should remove listener', async () => {
(amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver);
await queueService.listen(defaultQueue, () => void 0, {});
expect(queueService['receivers'].size).toBe(1);

const receiver = queueService['receivers'].get(queueService['receivers'].keys().next().value);

const result = await queueService.removeListener(defaultQueue);
expect(receiver.close).toBeCalled();
expect(result).toBe(true);
expect(queueService['receivers'].size).toBe(0);
});

it('should remove listener with connectionName', async () => {
(amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver);
const connection = 'test_connection';
await queueService.listen(defaultQueue, () => void 0, {}, connection);

const receiver = queueService['receivers'].get(queueService['receivers'].keys().next().value);

expect(queueService['receivers'].size).toBe(1);

const result = await queueService.removeListener(defaultQueue, connection);
expect(receiver.close).toBeCalled();
expect(result).toBe(true);
expect(queueService['receivers'].size).toBe(0);
});

it('should remove listener with Source object', async () => {
(amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver);
const source: Source = {
address: defaultQueue,
filter: filter.selector("((JMSCorrelationID) <> ''"),
};
await queueService.listen(source, () => void 0, {});
expect(queueService['receivers'].size).toBe(1);
const receiver = queueService['receivers'].get(queueService['receivers'].keys().next().value);

const result = await queueService.removeListener(source);
expect(receiver.close).toBeCalled();
expect(result).toBe(true);
expect(queueService['receivers'].size).toBe(0);
});

it('should not do anything with non-existing listener', async () => {
(amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver);
await queueService.listen(defaultQueue, () => void 0, {});
expect(queueService['receivers'].size).toBe(1);

const result = await queueService.removeListener('otherQueue');
expect(result).toBe(false);
expect(queueService['receivers'].size).toBe(1);
});
});

it('should shutdown', async () => {
(amqpService.createReceiver as jest.Mock).mockResolvedValue(new EventContextMock().receiver);
await queueService.listen(defaultQueue, () => void 0, {});
Expand Down
38 changes: 29 additions & 9 deletions src/service/queue/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class QueueService {
* objects when a new message arrives on the queue. If a receiver is already
* created for the given queue then a new receiver won't be created.
*
* @param {string} source Name of the queue.
* @param {string} source Name or Source object of the queue.
* @param {function(body: T, control: MessageControl, metadata: Omit<Message, 'body'>) => Promise<void>} callback Function what will invoked when message arrives.
* @param {ListenOptions<T>} options Options for message processing.
* @param {string} connection Name of the connection
Expand Down Expand Up @@ -300,27 +300,47 @@ export class QueueService {
this.receivers.clear();
}

/**
* Removes listener from active listeners
*
* @param {string} source Name or Source object of the queue.
* @param {string} connection Name of the connection
*
* @returns {Promise<boolean>} Returns true if listener was removed, otherwise false. If listener was not found, returns false.
*
* @public
*/
public async removeListener(source: string | Source, connection: string = AMQP_DEFAULT_CONNECTION_TOKEN): Promise<boolean> {
const sourceToken = typeof source === 'string' ? source : JSON.stringify(source);
const receiverToken = this.getLinkToken(sourceToken, connection);

if (this.receivers.has(receiverToken)) {

Check failure on line 318 in src/service/queue/queue.service.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Delete `⏎`
const receiver = this.receivers.get(receiverToken);
await receiver.close();

return this.receivers.delete(receiverToken);
}

return false;
}

private async getReceiver(
source: string | Source,
credit: number,
messageHandler: (context: EventContext) => Promise<void>,
connection: string,
): Promise<Receiver> {
let receiver;

const sourceToken = typeof source === 'string' ? source : JSON.stringify(source);

const receiverToken = this.getLinkToken(sourceToken, connection);

if (this.receivers.has(receiverToken)) {
receiver = this.receivers.get(receiverToken);
} else {
receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection);

if (!this.receivers.has(receiverToken)) {
const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection);
this.receivers.set(receiverToken, receiver);
}

return receiver;
return this.receivers.get(receiverToken)!;

Check warning on line 343 in src/service/queue/queue.service.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Forbidden non-null assertion
}

private async getSender(target: string, connection: string): Promise<AwaitableSender> {
Expand Down
2 changes: 1 addition & 1 deletion src/test/event-context.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class EventContextMock implements EventContext {
},
credit: 0,
addCredit: jest.fn(),
close: jest.fn().mockResolvedValue(true),
close: jest.fn(() => true),
};
public sender: any = {
send: jest.fn().mockResolvedValue({ sent: true }),
Expand Down

0 comments on commit 522ca89

Please sign in to comment.