Skip to content

Commit

Permalink
feat(amqp): added more generic way to define connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Stradivario committed Jan 15, 2025
1 parent 62549ff commit 3de3f9c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
11 changes: 3 additions & 8 deletions packages/amqp/src/amqp.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@ export class AmqpModule {
providers: [
{
provide: AmqpConnection,
useFactory: () => amqpClient.connect(config),
useFactory: () => AmqpService.createConnection(config),
},
{
provide: AmqpChannel,
deps: [AmqpConnection],
useFactory: async (connection: Connection) => {
const channel = await connection.createChannel();
if (config.prefetchCount) {
await channel.prefetch(config.prefetchCount);
}
return channel;
},
useFactory: async (connection: Connection) =>
AmqpService.createChannel(connection, config.prefetchCount),
},
],
};
Expand Down
18 changes: 17 additions & 1 deletion packages/amqp/src/amqp.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Inject, Injectable } from '@rhtml/di';
import { Channel, ConsumeMessage, Options } from 'amqplib';
import amqpClient, { Connection } from 'amqplib';

import { AmqpChannel } from './amqp.constants';
import { AmqpChannel, ModuleConfig } from './amqp.constants';

@Injectable()
export class AmqpService {
Expand Down Expand Up @@ -32,4 +33,19 @@ export class AmqpService {
options?.consumeOptions
);
}

public static createConnection(config: ModuleConfig) {
return amqpClient.connect(config);
}

public static async createChannel(
connection: Connection,
prefetchCount?: number
) {
const channel = await connection.createChannel();
if (prefetchCount) {
await channel.prefetch(prefetchCount);
}
return channel;
}
}
2 changes: 1 addition & 1 deletion packages/amqp/src/decorators/subscribe.decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const Subscribe =
await target[memberName].call(this, msg, channel);
} catch (e) {
console.error(
`[Subscription]: queue "${queue}" fail internally inside ${target} ${memberName}`
`[AMQP][Subscribe]: queue "${queue}" failed to handle internally inside subscription "${memberName}" method`
);
}
},
Expand Down

0 comments on commit 3de3f9c

Please sign in to comment.