Skip to content

Commit

Permalink
feat(@rhtml/amqp): added additional channel configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Stradivario committed Jan 10, 2025
1 parent 8988bc9 commit d1bb485
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
8 changes: 8 additions & 0 deletions packages/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ import { AmqpModule } from '@rhtml/amqp';
username: 'amqp user', // AMQP username (default: 'guest')
password: 'amqp password', // AMQP password (default: 'guest')
vhost: '', // Virtual host to use (default: empty string)
/**
* Optional
* Set the prefetch count for this channel.
* The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement;
* once there are count messages outstanding,
* the server will not send more messages on this channel until one or more have been acknowledged.
*/
prefetchCount: 1
}),
],
})
Expand Down
12 changes: 11 additions & 1 deletion packages/amqp/src/amqp.constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { InjectionToken } from '@rhtml/di';
import { Channel, Connection } from 'amqplib';
import { Channel, Connection, Options } from 'amqplib';

/**
* Injection for AmqpConnection
Expand All @@ -12,3 +12,13 @@ export type AmqpConnection = Connection;
*/
export const AmqpChannel = new InjectionToken<Channel>();
export type AmqpChannel = Channel;

export interface ModuleConfig extends Options.Connect {
/**
* Set the prefetch count for this channel.
* The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement;
* once there are count messages outstanding,
* the server will not send more messages on this channel until one or more have been acknowledged.
*/
prefetchCount?: number;
}
14 changes: 10 additions & 4 deletions packages/amqp/src/amqp.module.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Module, ModuleWithProviders } from '@rhtml/di';
import amqpClient, { Connection, Options } from 'amqplib';
import amqpClient, { Connection } from 'amqplib';

import { AmqpChannel, AmqpConnection } from './amqp.constants';
import { AmqpChannel, AmqpConnection, ModuleConfig } from './amqp.constants';
import { AmqpService } from './amqp.service';

@Module({
providers: [AmqpService],
})
export class AmqpModule {
public static forRoot(config: Options.Connect): ModuleWithProviders {
public static forRoot(config: ModuleConfig): ModuleWithProviders {
return {
module: AmqpModule,
providers: [
Expand All @@ -19,7 +19,13 @@ export class AmqpModule {
{
provide: AmqpChannel,
deps: [AmqpConnection],
useFactory: (connection: Connection) => connection.createChannel(),
useFactory: async (connection: Connection) => {
const channel = await connection.createChannel();
if (config.prefetchCount) {
await channel.prefetch(config.prefetchCount);
}
return channel;
},
},
],
};
Expand Down

0 comments on commit d1bb485

Please sign in to comment.