From d1bb485e20b8de2dce31cbecc87fb1c4a42176a5 Mon Sep 17 00:00:00 2001 From: Kristiyan Tachev Date: Fri, 10 Jan 2025 15:46:42 +0200 Subject: [PATCH] feat(@rhtml/amqp): added additional channel configuration --- packages/amqp/README.md | 8 ++++++++ packages/amqp/src/amqp.constants.ts | 12 +++++++++++- packages/amqp/src/amqp.module.ts | 14 ++++++++++---- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/packages/amqp/README.md b/packages/amqp/README.md index fecb4be..45eb50b 100644 --- a/packages/amqp/README.md +++ b/packages/amqp/README.md @@ -41,6 +41,14 @@ import { AmqpModule } from '@rhtml/amqp'; username: 'amqp user', // AMQP username (default: 'guest') password: 'amqp password', // AMQP password (default: 'guest') vhost: '', // Virtual host to use (default: empty string) + /** + * Optional + * Set the prefetch count for this channel. + * The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; + * once there are count messages outstanding, + * the server will not send more messages on this channel until one or more have been acknowledged. + */ + prefetchCount: 1 }), ], }) diff --git a/packages/amqp/src/amqp.constants.ts b/packages/amqp/src/amqp.constants.ts index 653de8f..3e819c9 100644 --- a/packages/amqp/src/amqp.constants.ts +++ b/packages/amqp/src/amqp.constants.ts @@ -1,5 +1,5 @@ import { InjectionToken } from '@rhtml/di'; -import { Channel, Connection } from 'amqplib'; +import { Channel, Connection, Options } from 'amqplib'; /** * Injection for AmqpConnection @@ -12,3 +12,13 @@ export type AmqpConnection = Connection; */ export const AmqpChannel = new InjectionToken(); export type AmqpChannel = Channel; + +export interface ModuleConfig extends Options.Connect { + /** + * Set the prefetch count for this channel. + * The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; + * once there are count messages outstanding, + * the server will not send more messages on this channel until one or more have been acknowledged. + */ + prefetchCount?: number; +} diff --git a/packages/amqp/src/amqp.module.ts b/packages/amqp/src/amqp.module.ts index 5f6d638..1cc603f 100644 --- a/packages/amqp/src/amqp.module.ts +++ b/packages/amqp/src/amqp.module.ts @@ -1,14 +1,14 @@ import { Module, ModuleWithProviders } from '@rhtml/di'; -import amqpClient, { Connection, Options } from 'amqplib'; +import amqpClient, { Connection } from 'amqplib'; -import { AmqpChannel, AmqpConnection } from './amqp.constants'; +import { AmqpChannel, AmqpConnection, ModuleConfig } from './amqp.constants'; import { AmqpService } from './amqp.service'; @Module({ providers: [AmqpService], }) export class AmqpModule { - public static forRoot(config: Options.Connect): ModuleWithProviders { + public static forRoot(config: ModuleConfig): ModuleWithProviders { return { module: AmqpModule, providers: [ @@ -19,7 +19,13 @@ export class AmqpModule { { provide: AmqpChannel, deps: [AmqpConnection], - useFactory: (connection: Connection) => connection.createChannel(), + useFactory: async (connection: Connection) => { + const channel = await connection.createChannel(); + if (config.prefetchCount) { + await channel.prefetch(config.prefetchCount); + } + return channel; + }, }, ], };