diff --git a/README.md b/README.md index 06d03d43..185340c0 100644 --- a/README.md +++ b/README.md @@ -198,3 +198,52 @@ SQS queues are built in a way that every message is only consumed once, and then Both publishers and consumers accept a queue name and configuration as parameters. If the referenced queue (or SNS topic) does not exist at the moment the publisher or the consumer is instantiated, it is automatically created. Similarly, if the referenced topic does not exist during instantiation, it is also automatically created. If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist. + +## Automatic Reconnects (RabbitMQ) + +`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism. + +Example: + +```ts +export const TEST_AMQP_CONFIG: AmqpConfig = { + vhost: '', + hostname: 'localhost', + username: 'guest', + password: 'guest', + port: 5672, + useTls: false, +} + +const amqpConnectionManager = new AmqpConnectionManager(config, logger) +await amqpConnectionManager.init() + +const publisher = new TestAmqpPublisher( + { amqpConnectionManager }, + { + /// other amqp options + }) +await publisher.init() + +const consumer = new TestAmqpConsumer( + { amqpConnectionManager }, + { + /// other amqp options + }) +await consumer.start() + +// break connection, to simulate unexpected disconnection in production +await (await amqpConnectionManager.getConnection()).close() + +const message = { + // some test message +} + +// This will fail, but will trigger reconnection within amqpConnectionManager +publisher.publish(message) + +// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager + +// This will succeed and consumer, which also received new connection, will be able to consume it +publisher.publish(message) +``` diff --git a/packages/amqp/index.ts b/packages/amqp/index.ts index 096e4a92..7b1078bf 100644 --- a/packages/amqp/index.ts +++ b/packages/amqp/index.ts @@ -2,14 +2,16 @@ export type { CommonMessage } from './lib/types/MessageTypes' export type { AMQPQueueConfig } from './lib/AbstractAmqpService' -export { AbstractAmqpConsumer } from './lib/AbstractAmqpConsumer' +export { AbstractAmqpConsumerMonoSchema } from './lib/AbstractAmqpConsumerMonoSchema' export { AbstractAmqpConsumerMultiSchema } from './lib/AbstractAmqpConsumerMultiSchema' export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver' -export { AbstractAmqpPublisher } from './lib/AbstractAmqpPublisher' +export { AbstractAmqpPublisherMonoSchema } from './lib/AbstractAmqpPublisherMonoSchema' export { AbstractAmqpPublisherMultiSchema } from './lib/AbstractAmqpPublisherMultiSchema' export type { AmqpConfig } from './lib/amqpConnectionResolver' export { resolveAmqpConnection } from './lib/amqpConnectionResolver' +export { AmqpConnectionManager } from './lib/AmqpConnectionManager' +export type { ConnectionReceiver } from './lib/AmqpConnectionManager' export { deserializeAmqpMessage } from './lib/amqpMessageDeserializer' diff --git a/packages/amqp/lib/AbstractAmqpBaseConsumer.ts b/packages/amqp/lib/AbstractAmqpBaseConsumer.ts index d2dbc11d..096e9c0c 100644 --- a/packages/amqp/lib/AbstractAmqpBaseConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpBaseConsumer.ts @@ -6,7 +6,7 @@ import type { ExistingQueueOptions, } from '@message-queue-toolkit/core' import { isMessageError, parseMessage } from '@message-queue-toolkit/core' -import type { Message } from 'amqplib' +import type { Connection, Message } from 'amqplib' import type { AMQPConsumerDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService' import { AbstractAmqpService } from './AbstractAmqpService' @@ -108,12 +108,12 @@ export abstract class AbstractAmqpBaseConsumer { + await super.receiveNewConnection(connection) + await this.consume() + } + private async consume() { await this.channel.consume(this.queueName, (message) => { if (message === null) { return @@ -158,6 +158,15 @@ export abstract class AbstractAmqpBaseConsumer extends AbstractAmqpService { + protected sendToQueue(message: MessagePayloadType): void { + try { + this.channel.sendToQueue(this.queueName, objectToBuffer(message)) + } catch (err) { + // Unfortunately, reliable retry mechanism can't be implemented with try-catch block, + // as not all failures end up here. If connection is closed programmatically, it works fine, + // but if server closes connection unexpectedly (e. g. RabbitMQ is shut down), then we don't land here + // @ts-ignore + if (err.message === 'Channel closed') { + this.logger.error(`AMQP channel closed`) + void this.reconnect() + } else { + throw err + } + } + } + + /* c8 ignore start */ + protected resolveMessage(): Either { + throw new Error('Not implemented for publisher') + } + /* c8 ignore stop */ +} diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumerMonoSchema.ts similarity index 93% rename from packages/amqp/lib/AbstractAmqpConsumer.ts rename to packages/amqp/lib/AbstractAmqpConsumerMonoSchema.ts index f697c9aa..da5f48fc 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumerMonoSchema.ts @@ -9,7 +9,7 @@ import type { import { AbstractAmqpBaseConsumer } from './AbstractAmqpBaseConsumer' import type { AMQPConsumerDependencies } from './AbstractAmqpService' -export abstract class AbstractAmqpConsumer +export abstract class AbstractAmqpConsumerMonoSchema extends AbstractAmqpBaseConsumer implements QueueConsumer { diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts similarity index 70% rename from packages/amqp/lib/AbstractAmqpPublisher.ts rename to packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts index c8ea4439..05f2aea4 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts @@ -1,21 +1,18 @@ import type { Either } from '@lokalise/node-core' import type { ExistingQueueOptions, - MessageInvalidFormatError, - MessageValidationError, MonoSchemaQueueOptions, NewQueueOptions, SyncPublisher, } from '@message-queue-toolkit/core' -import { objectToBuffer } from '@message-queue-toolkit/core' import type { ZodSchema } from 'zod' import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer' -import { AbstractAmqpService } from './AbstractAmqpService' +import { AbstractAmqpBasePublisher } from './AbstractAmqpBasePublisher' import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService' -export abstract class AbstractAmqpPublisher - extends AbstractAmqpService +export abstract class AbstractAmqpPublisherMonoSchema + extends AbstractAmqpBasePublisher implements SyncPublisher { private readonly messageSchema: ZodSchema @@ -39,17 +36,12 @@ export abstract class AbstractAmqpPublisher this.logMessage(resolvedLogMessage) } - this.channel.sendToQueue(this.queueName, objectToBuffer(message)) + this.sendToQueue(message) } /* c8 ignore start */ - protected resolveMessage(): Either { - throw new Error('Not implemented for publisher') - } - protected override resolveSchema(): Either> { throw new Error('Not implemented for publisher') } - /* c8 ignore stop */ } diff --git a/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts b/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts index 5464cc38..e196d132 100644 --- a/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts +++ b/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts @@ -1,21 +1,19 @@ import type { Either } from '@lokalise/node-core' import type { ExistingQueueOptions, - MessageInvalidFormatError, - MessageValidationError, NewQueueOptions, SyncPublisher, MultiSchemaPublisherOptions, } from '@message-queue-toolkit/core' -import { MessageSchemaContainer, objectToBuffer } from '@message-queue-toolkit/core' +import { MessageSchemaContainer } from '@message-queue-toolkit/core' import type { ZodSchema } from 'zod' import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer' -import { AbstractAmqpService } from './AbstractAmqpService' +import { AbstractAmqpBasePublisher } from './AbstractAmqpBasePublisher' import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService' export abstract class AbstractAmqpPublisherMultiSchema - extends AbstractAmqpService + extends AbstractAmqpBasePublisher implements SyncPublisher { private readonly messageSchemaContainer: MessageSchemaContainer @@ -47,16 +45,9 @@ export abstract class AbstractAmqpPublisherMultiSchema { - throw new Error('Not implemented for publisher') - } - - /* c8 ignore stop */ - protected override resolveSchema( message: MessagePayloadType, ): Either> { diff --git a/packages/amqp/lib/AbstractAmqpService.ts b/packages/amqp/lib/AbstractAmqpService.ts index 1fa378e8..03f1b5bb 100644 --- a/packages/amqp/lib/AbstractAmqpService.ts +++ b/packages/amqp/lib/AbstractAmqpService.ts @@ -9,10 +9,11 @@ import type { Channel, Connection, Message } from 'amqplib' import type { Options } from 'amqplib/properties' import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer' +import type { AmqpConnectionManager, ConnectionReceiver } from './AmqpConnectionManager' import { deleteAmqp } from './utils/amqpInitter' export type AMQPDependencies = QueueDependencies & { - amqpConnection: Connection + amqpConnectionManager: AmqpConnectionManager } export type AMQPConsumerDependencies = AMQPDependencies & QueueConsumerDependencies @@ -28,17 +29,21 @@ export type AMQPQueueLocatorType = { } export abstract class AbstractAmqpService< - MessagePayloadType extends object, - DependenciesType extends AMQPDependencies = AMQPDependencies, -> extends AbstractQueueService< - MessagePayloadType, - Message, - DependenciesType, - CreateAMQPQueueOptions, - AMQPQueueLocatorType, - NewQueueOptions | ExistingQueueOptions -> { - protected readonly connection: Connection + MessagePayloadType extends object, + DependenciesType extends AMQPDependencies = AMQPDependencies, + > + extends AbstractQueueService< + MessagePayloadType, + Message, + DependenciesType, + CreateAMQPQueueOptions, + AMQPQueueLocatorType, + NewQueueOptions | ExistingQueueOptions + > + implements ConnectionReceiver +{ + protected connection?: Connection + private connectionManager: AmqpConnectionManager // @ts-ignore protected channel: Channel private isShuttingDown: boolean @@ -53,33 +58,38 @@ export abstract class AbstractAmqpService< this.queueName = options.locatorConfig ? options.locatorConfig.queueName : options.creationConfig?.queueName - this.connection = dependencies.amqpConnection this.isShuttingDown = false + this.connectionManager = dependencies.amqpConnectionManager + this.connection = this.connectionManager.getConnectionSync() + this.connectionManager.subscribeConnectionReceiver(this) } - private async destroyConnection(): Promise { - if (this.channel) { - try { - await this.channel.close() - } finally { - // @ts-ignore - this.channel = undefined - } - } - } + async receiveNewConnection(connection: Connection) { + this.connection = connection - public async init() { this.isShuttingDown = false + // If channel already exists, recreate it + const oldChannel = this.channel + + try { + this.channel = await this.connection.createChannel() + } catch (err) { + // @ts-ignore + this.logger.error(`Error creating channel: ${err.message}`) + await this.connectionManager.reconnect() + return + } - // If channel exists, recreate it - if (this.channel) { + if (oldChannel) { this.isShuttingDown = true - await this.destroyConnection() + try { + await oldChannel.close() + } catch { + // errors are ok + } this.isShuttingDown = false } - this.channel = await this.connection.createChannel() - if (this.deletionConfig && this.creationConfig) { await deleteAmqp(this.channel, this.deletionConfig, this.creationConfig) } @@ -87,7 +97,7 @@ export abstract class AbstractAmqpService< this.channel.on('close', () => { if (!this.isShuttingDown) { this.logger.error(`AMQP connection lost!`) - this.init().catch((err) => { + this.reconnect().catch((err) => { this.handleError(err) throw err }) @@ -103,22 +113,50 @@ export abstract class AbstractAmqpService< this.creationConfig.queueOptions, ) } else { - // queue check breaks channel if not successful - const checkChannel = await this.connection.createChannel() - checkChannel.on('error', () => { - // it's OK - }) + await this.checkQueueExists() + } + } + + private async checkQueueExists() { + // queue check breaks channel if not successful + const checkChannel = await this.connection!.createChannel() + checkChannel.on('error', () => { + // it's OK + }) + try { + await checkChannel.checkQueue(this.locatorConfig!.queueName) + await checkChannel.close() + } catch (err) { + throw new Error(`Queue with queueName ${this.locatorConfig!.queueName} does not exist.`) + } + } + + private async destroyChannel(): Promise { + if (this.channel) { try { - await checkChannel.checkQueue(this.locatorConfig!.queueName) - await checkChannel.close() + await this.channel.close() } catch (err) { - throw new Error(`Queue with queueName ${this.locatorConfig!.queueName} does not exist.`) + // We don't care about connection closing errors + } finally { + // @ts-ignore + this.channel = undefined } } } + public async init() { + // if we don't have connection yet, it's fine, we'll wait for a later receiveNewConnection() call + if (this.connection) { + await this.receiveNewConnection(this.connection) + } + } + + public async reconnect() { + await this.connectionManager.reconnect() + } + async close(): Promise { this.isShuttingDown = true - await this.destroyConnection() + await this.destroyChannel() } } diff --git a/packages/amqp/lib/AmqpConnectionManager.ts b/packages/amqp/lib/AmqpConnectionManager.ts new file mode 100644 index 00000000..31349bb8 --- /dev/null +++ b/packages/amqp/lib/AmqpConnectionManager.ts @@ -0,0 +1,116 @@ +import type { Logger } from '@message-queue-toolkit/core' +import type { Connection } from 'amqplib' + +import type { AmqpConfig } from './amqpConnectionResolver' +import { resolveAmqpConnection } from './amqpConnectionResolver' + +export type ConnectionReceiver = { + receiveNewConnection(connection: Connection): Promise + close(): Promise +} + +export class AmqpConnectionManager { + private readonly config: AmqpConfig + private readonly logger: Logger + private readonly connectionReceivers: ConnectionReceiver[] + private connection?: Connection + public reconnectsActive: boolean + + public isReconnecting: boolean + + constructor(config: AmqpConfig, logger: Logger) { + this.config = config + this.connectionReceivers = [] + this.reconnectsActive = true + this.isReconnecting = false + this.logger = logger + } + + private async createConnection() { + const connection = await resolveAmqpConnection(this.config) + connection.on('error', (err) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + this.logger.error(`AmqpConnectionManager: Connection error: ${err.message}`) + this.connection = undefined + if (this.reconnectsActive && !this.isReconnecting) { + void this.reconnect() + } + }) + connection.on('close', () => { + if (this.reconnectsActive && !this.isReconnecting) { + this.logger.error(`AmqpConnectionManager: Connection closed unexpectedly`) + if (this.reconnectsActive) { + void this.reconnect() + } + } + }) + + const promises: Promise[] = [] + + this.logger.info( + `Propagating new connection across ${this.connectionReceivers.length} receivers`, + ) + for (const receiver of this.connectionReceivers) { + promises.push(receiver.receiveNewConnection(connection)) + } + await Promise.all(promises) + + return connection + } + + getConnectionSync() { + return this.connection + } + + async getConnection(): Promise { + if (!this.connection) { + this.connection = await this.createConnection() + } + return this.connection + } + + async reconnect() { + if (this.isReconnecting) { + return + } + this.logger.info('AmqpConnectionManager: Start reconnecting') + + this.isReconnecting = true + const oldConnection = this.connection + this.connection = await this.createConnection() + if (oldConnection) { + try { + await oldConnection.close() + } catch { + // this can fail + } + } + this.isReconnecting = false + + this.logger.info('AmqpConnectionManager: Reconnect complete') + } + + async init() { + this.reconnectsActive = true + await this.getConnection() + } + + async close() { + this.reconnectsActive = false + + for (const receiver of this.connectionReceivers) { + await receiver.close() + } + + try { + await this.connection?.close() + } catch { + // it's OK + } + this.connection = undefined + } + + subscribeConnectionReceiver(connectionReceiver: ConnectionReceiver) { + this.connectionReceivers.push(connectionReceiver) + } +} diff --git a/packages/amqp/lib/amqpConnectionResolver.ts b/packages/amqp/lib/amqpConnectionResolver.ts index ff50206e..56a5c914 100644 --- a/packages/amqp/lib/amqpConnectionResolver.ts +++ b/packages/amqp/lib/amqpConnectionResolver.ts @@ -4,7 +4,7 @@ import { globalLogger } from '@lokalise/node-core' import { connect } from 'amqplib' const CONNECT_RETRY_SECONDS = 10 -const MAX_RETRY_ATTEMPTS = 5 +const MAX_RETRY_ATTEMPTS = 10 export type AmqpConfig = { hostname: string @@ -22,14 +22,18 @@ export async function resolveAmqpConnection(config: AmqpConfig) { while (true) { const url = `${protocol}://${config.username}:${config.password}@${config.hostname}:${config.port}/${config.vhost}` + const retryTime = CONNECT_RETRY_SECONDS * 1000 * (counter + 1) try { - return await connect(url) + const connection = await connect(url) + return connection } catch (e) { globalLogger.error( - `Failed to connect to AMQP broker at ${config.hostname}:${config.port}. Retrying in ${CONNECT_RETRY_SECONDS} seconds...`, + `Failed to connect to AMQP broker at ${config.hostname}:${config.port}. Retrying in ${ + retryTime / 1000 + } seconds...`, ) } - await setTimeout(CONNECT_RETRY_SECONDS * 1000) + await setTimeout(retryTime) counter++ if (counter > MAX_RETRY_ATTEMPTS) { diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.ts index 6bce12d2..15956ffe 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.ts @@ -1,13 +1,13 @@ import type { Either } from '@lokalise/node-core' -import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer' +import { AbstractAmqpConsumerMonoSchema } from '../../lib/AbstractAmqpConsumerMonoSchema' import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService' import { userPermissionMap } from '../repositories/PermissionRepository' import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas' import { PERMISSIONS_MESSAGE_SCHEMA } from './userConsumerSchemas' -export class AmqpPermissionConsumer extends AbstractAmqpConsumer { +export class AmqpPermissionConsumer extends AbstractAmqpConsumerMonoSchema { public static QUEUE_NAME = 'user_permissions' constructor(dependencies: AMQPConsumerDependencies) { diff --git a/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts index a0724b2a..abeddbe0 100644 --- a/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts @@ -6,6 +6,7 @@ import { describe, beforeEach, afterEach, expect, it } from 'vitest' import { objectToBuffer } from '../../../core/lib/utils/queueUtils' import { waitAndRetry } from '../../../core/lib/utils/waitUtils' import { FakeConsumerErrorResolver } from '../fakes/FakeConsumerErrorResolver' +import type { AmqpPermissionPublisher } from '../publishers/AmqpPermissionPublisher' import { userPermissionMap } from '../repositories/PermissionRepository' import { TEST_AMQP_CONFIG } from '../utils/testAmqpConfig' import type { Dependencies } from '../utils/testContext' @@ -17,25 +18,29 @@ import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas' const userIds = [100, 200, 300] const perms: [string, ...string[]] = ['perm1', 'perm2'] -async function waitForPermissions(userIds: number[]) { - return await waitAndRetry(async () => { - const usersPerms = userIds.reduce((acc, userId) => { - if (userPermissionMap[userId]) { - acc.push(userPermissionMap[userId]) - } - return acc - }, [] as string[][]) +function checkPermissions(userIds: number[]) { + const usersPerms = userIds.reduce((acc, userId) => { + if (userPermissionMap[userId]) { + acc.push(userPermissionMap[userId]) + } + return acc + }, [] as string[][]) + + if (usersPerms && usersPerms.length !== userIds.length) { + return null + } - if (usersPerms && usersPerms.length !== userIds.length) { + for (const userPerms of usersPerms) + if (userPerms.length !== perms.length) { return null } - for (const userPerms of usersPerms) - if (userPerms.length !== perms.length) { - return null - } + return usersPerms +} - return usersPerms +async function waitForPermissions(userIds: number[]) { + return await waitAndRetry(async () => { + return checkPermissions(userIds) }) } @@ -43,6 +48,8 @@ describe('PermissionsConsumer', () => { describe('consume', () => { let diContainer: AwilixContainer let channel: Channel + let publisher: AmqpPermissionPublisher + let consumer: AmqpPermissionConsumer beforeEach(async () => { delete userPermissionMap[100] delete userPermissionMap[200] @@ -51,11 +58,16 @@ describe('PermissionsConsumer', () => { consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG), }) - channel = await diContainer.cradle.amqpConnection.createChannel() + channel = await diContainer.cradle.amqpConnectionManager.getConnectionSync()!.createChannel() + publisher = diContainer.cradle.permissionPublisher + consumer = diContainer.cradle.permissionConsumer await diContainer.cradle.permissionConsumer.start() }) afterEach(async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + channel = await connection.createChannel() + await channel.deleteQueue(AmqpPermissionConsumer.QUEUE_NAME) await channel.close() const { awilixManager } = diContainer.cradle @@ -90,6 +102,47 @@ describe('PermissionsConsumer', () => { expect(updatedUsersPermissions[0]).toHaveLength(2) }) + it('Reconnects if connection is lost', async () => { + await (await diContainer.cradle.amqpConnectionManager.getConnection()).close() + + const users = Object.values(userPermissionMap) + expect(users).toHaveLength(0) + + userPermissionMap[100] = [] + userPermissionMap[200] = [] + userPermissionMap[300] = [] + + publisher.publish({ + messageType: 'add', + userIds, + permissions: perms, + }) + + const updatedUsersPermissions = await waitAndRetry(() => { + const checkResult = checkPermissions(userIds) + if (checkResult) { + return checkResult + } + + publisher.publish({ + messageType: 'add', + userIds, + permissions: perms, + }) + return null + }, 50) + + if (null === updatedUsersPermissions) { + throw new Error('Users permissions unexpectedly null') + } + + expect(updatedUsersPermissions).toBeDefined() + expect(updatedUsersPermissions[0]).toHaveLength(2) + + await publisher.close() + await consumer.close() + }) + it('Wait for users to be created and then create permissions', async () => { const users = Object.values(userPermissionMap) expect(users).toHaveLength(0) diff --git a/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts b/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts index 4e72059b..c0481ab4 100644 --- a/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts @@ -37,10 +37,12 @@ describe('PermissionsConsumerMultiSchema', () => { }) await waitAndRetry(() => { - return logger.loggedMessages.length === 1 + return logger.loggedMessages.length === 3 }) - expect(logger.loggedMessages.length).toBe(1) + expect(logger.loggedMessages.length).toBe(3) + expect(logger.loggedMessages[1]).toEqual({ messageType: 'add' }) + expect(logger.loggedMessages[2]).toEqual({ messageType: 'add' }) }) }) @@ -140,5 +142,23 @@ describe('PermissionsConsumerMultiSchema', () => { expect(consumer.addCounter).toBe(1) expect(consumer.removeCounter).toBe(2) }) + + it('Reconnects if connection is lost', async () => { + await (await diContainer.cradle.amqpConnectionManager.getConnection()).close() + publisher.publish({ + messageType: 'add', + }) + + await waitAndRetry(() => { + publisher.publish({ + messageType: 'add', + }) + + return consumer.addCounter > 0 + }) + + expect(consumer.addCounter > 0).toBe(true) + await consumer.close() + }) }) }) diff --git a/packages/amqp/test/fakes/FakeConsumer.ts b/packages/amqp/test/fakes/FakeConsumer.ts index 726173aa..4e1f4449 100644 --- a/packages/amqp/test/fakes/FakeConsumer.ts +++ b/packages/amqp/test/fakes/FakeConsumer.ts @@ -1,11 +1,11 @@ import type { Either } from '@lokalise/node-core' import type { ZodType } from 'zod' -import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer' +import { AbstractAmqpConsumerMonoSchema } from '../../lib/AbstractAmqpConsumerMonoSchema' import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService' import type { CommonMessage } from '../../lib/types/MessageTypes' -export class FakeConsumer extends AbstractAmqpConsumer { +export class FakeConsumer extends AbstractAmqpConsumerMonoSchema { constructor(dependencies: AMQPConsumerDependencies, queueName = 'dummy', messageSchema: ZodType) { super(dependencies, { creationConfig: { diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 1c52ce64..f3b19941 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -11,6 +11,7 @@ import { PERMISSIONS_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas' import { FakeConsumer } from '../fakes/FakeConsumer' import { FakeConsumerErrorResolver } from '../fakes/FakeConsumerErrorResolver' import { FakeLogger } from '../fakes/FakeLogger' +import { userPermissionMap } from '../repositories/PermissionRepository' import { TEST_AMQP_CONFIG } from '../utils/testAmqpConfig' import type { Dependencies } from '../utils/testContext' import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext' @@ -21,6 +22,30 @@ import type { AmqpPermissionPublisherMultiSchema } from './AmqpPermissionPublish const perms: [string, ...string[]] = ['perm1', 'perm2'] const userIds = [100, 200, 300] +function checkPermissions(userIds: number[]) { + const usersPerms = userIds.reduce((acc, userId) => { + if (userPermissionMap[userId]) { + acc.push(userPermissionMap[userId]) + } + return acc + }, [] as string[][]) + + if (usersPerms.length > userIds.length) { + return usersPerms.slice(0, userIds.length - 1) + } + + if (usersPerms && usersPerms.length !== userIds.length) { + return null + } + + for (const userPerms of usersPerms) + if (userPerms.length !== perms.length) { + return null + } + + return usersPerms +} + describe('PermissionPublisher', () => { describe('logging', () => { let logger: FakeLogger @@ -45,10 +70,14 @@ describe('PermissionPublisher', () => { publisher.publish(message) await waitAndRetry(() => { - return logger.loggedMessages.length === 1 + return logger.loggedMessages.length === 2 }) - expect(logger.loggedMessages.length).toBe(1) + expect(logger.loggedMessages[1]).toEqual({ + messageType: 'add', + permissions: ['perm1', 'perm2'], + userIds: [100, 200, 300], + }) }) }) @@ -68,7 +97,8 @@ describe('PermissionPublisher', () => { }) beforeEach(async () => { - channel = await diContainer.cradle.amqpConnection.createChannel() + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + channel = await connection.createChannel() }) afterEach(async () => { @@ -106,20 +136,17 @@ describe('PermissionPublisher', () => { beforeAll(async () => { diContainer = await registerDependencies(TEST_AMQP_CONFIG, { consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG), - permissionConsumer: asClass(FakeConsumer, { - lifetime: Lifetime.SINGLETON, - asyncInit: 'start', - asyncDispose: 'close', - asyncDisposePriority: 10, - }), }) }) beforeEach(async () => { - channel = await diContainer.cradle.amqpConnection.createChannel() + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + channel = await connection.createChannel() }) afterEach(async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + channel = await connection.createChannel() await channel.deleteQueue(AmqpPermissionConsumer.QUEUE_NAME) await channel.close() }) @@ -154,13 +181,9 @@ describe('PermissionPublisher', () => { permissionPublisher.publish(message) - await waitAndRetry( - () => { - return receivedMessage !== null - }, - 40, - 30, - ) + await waitAndRetry(() => { + return receivedMessage !== null + }) expect(receivedMessage).toEqual({ messageType: 'add', @@ -168,5 +191,42 @@ describe('PermissionPublisher', () => { userIds: [100, 200, 300], }) }) + + it('reconnects on lost connection', async () => { + const users = Object.values(userPermissionMap) + expect(users).toHaveLength(0) + + userPermissionMap[100] = [] + userPermissionMap[200] = [] + userPermissionMap[300] = [] + + const { permissionPublisher, permissionConsumer } = diContainer.cradle + await permissionConsumer.start() + + const message = { + userIds, + messageType: 'add', + permissions: perms, + } satisfies PERMISSIONS_MESSAGE_TYPE + + await diContainer.cradle.amqpConnectionManager.getConnectionSync()!.close() + + const updatedUsersPermissions = await waitAndRetry( + () => { + permissionPublisher.publish(message) + + return checkPermissions(userIds) + }, + 100, + 20, + ) + + if (null === updatedUsersPermissions) { + throw new Error('Users permissions unexpectedly null') + } + + expect(updatedUsersPermissions).toBeDefined() + expect(updatedUsersPermissions[0]).toHaveLength(2) + }) }) }) diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts index 4445a90e..206ddffa 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts @@ -2,12 +2,12 @@ import type { ExistingAMQPConsumerOptions, NewAMQPConsumerOptions, } from '../../lib/AbstractAmqpBaseConsumer' -import { AbstractAmqpPublisher } from '../../lib/AbstractAmqpPublisher' +import { AbstractAmqpPublisherMonoSchema } from '../../lib/AbstractAmqpPublisherMonoSchema' import type { AMQPDependencies } from '../../lib/AbstractAmqpService' import type { PERMISSIONS_MESSAGE_TYPE } from '../consumers/userConsumerSchemas' import { PERMISSIONS_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas' -export class AmqpPermissionPublisher extends AbstractAmqpPublisher { +export class AmqpPermissionPublisher extends AbstractAmqpPublisherMonoSchema { public static QUEUE_NAME = 'user_permissions' constructor( diff --git a/packages/amqp/test/utils/testContext.ts b/packages/amqp/test/utils/testContext.ts index f21bbad7..eb3b171c 100644 --- a/packages/amqp/test/utils/testContext.ts +++ b/packages/amqp/test/utils/testContext.ts @@ -1,12 +1,11 @@ import type { ErrorReporter, ErrorResolver } from '@lokalise/node-core' import type { Logger, TransactionObservabilityManager } from '@message-queue-toolkit/core' -import type { Connection } from 'amqplib' import type { Resolver } from 'awilix' import { asClass, asFunction, createContainer, Lifetime } from 'awilix' import { AwilixManager } from 'awilix-manager' +import { AmqpConnectionManager } from '../../lib/AmqpConnectionManager' import type { AmqpConfig } from '../../lib/amqpConnectionResolver' -import { resolveAmqpConnection } from '../../lib/amqpConnectionResolver' import { AmqpConsumerErrorResolver } from '../../lib/errors/AmqpConsumerErrorResolver' import { AmqpPermissionConsumer } from '../consumers/AmqpPermissionConsumer' import { AmqpPermissionConsumerMultiSchema } from '../consumers/AmqpPermissionConsumerMultiSchema' @@ -27,7 +26,6 @@ export async function registerDependencies( const diContainer = createContainer({ injectionMode: 'PROXY', }) - const amqpConnection = await resolveAmqpConnection(config) const awilixManager = new AwilixManager({ diContainer, asyncDispose: true, @@ -42,15 +40,15 @@ export async function registerDependencies( awilixManager: asFunction(() => { return awilixManager }, SINGLETON_CONFIG), - amqpConnection: asFunction( - () => { - return amqpConnection + amqpConnectionManager: asFunction( + ({ logger }: Dependencies) => { + return new AmqpConnectionManager(config, logger) }, { lifetime: Lifetime.SINGLETON, - dispose: (connection) => { - return connection.close() - }, + asyncInit: 'init', + asyncDispose: 'close', + asyncDisposePriority: 1, }, ), consumerErrorResolver: asFunction(() => { @@ -109,7 +107,7 @@ type DiConfig = Record> export interface Dependencies { logger: Logger - amqpConnection: Connection + amqpConnectionManager: AmqpConnectionManager awilixManager: AwilixManager // vendor-specific dependencies diff --git a/packages/amqp/vitest.config.ts b/packages/amqp/vitest.config.ts index fed29f4b..916d1853 100644 --- a/packages/amqp/vitest.config.ts +++ b/packages/amqp/vitest.config.ts @@ -12,10 +12,10 @@ export default defineConfig({ exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*', 'lib/types/**/*.*'], reporter: ['text'], all: true, - lines: 85, + lines: 90, functions: 100, - branches: 75, - statements: 85, + branches: 80, + statements: 90, }, }, })