diff --git a/packages/amqp/README.md b/packages/amqp/README.md index 45eb50b..d2ac56b 100644 --- a/packages/amqp/README.md +++ b/packages/amqp/README.md @@ -151,6 +151,64 @@ import { FastifyInstance } from 'fastify' ``` +## Define different channels and attach them to subscriptions + +```typescript +import { Channel, AmqpConnection, Connection } from '@rhtml/amqp'; +import { InjectionToken } from '@rhtml/di'; + +/** + * Injection for MyAmqpChannel + */ +export const MyAmqpChannel = new InjectionToken(); + +@Module({ + providers: [ + { + provide: MyAmqpChannel, + deps: [AmqpConnection], + useFactory: async (connection: Connection) => { + const channel = await connection.createChannel(); + await channel.prefetch(1); + return channel; + }, + }, + ], + bootstrap: [WeatherDataController], +}) +export class AppModule {} + +@Controller({ + route: '/', +}) +export class WeatherDataController { + constructor( + private weatherDataService: WeatherDataService, + private amqpService: AmqpService + ) {} + + @Subscribe({ + queue: 'my-queue-with-acknowledgment', + consumeOptions: { + noAck: false, + }, + channel: MyAmqpChannel, + }) + async preParseRequest(message: ConsumeMessage, channel: AmqpChannel) { + // You need to manually acknowledge the message or it is threated as `unacked` in RabbitMQ Dashboard + channel.ack(message); + } + + @Subscribe({ + queue: 'my-queue-without-acknowledgment', + channel: MyAmqpChannel, + }) + async preParseRequest(message: ConsumeMessage, channel: AmqpChannel) { + // Do something here and message will be auto acknowledged + } +} +``` + ## Key Concepts 1. Message Subscription