From f0f58b7d8ebbbdb5ce185ed7d67280f27df456f8 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 22:08:30 +0300 Subject: [PATCH 01/16] Implement automatic reconnection propagation --- README.md | 49 ++++++++ packages/amqp/index.ts | 4 +- packages/amqp/lib/AbstractAmqpBaseConsumer.ts | 21 +++- ...r.ts => AbstractAmqpConsumerMonoSchema.ts} | 2 +- ....ts => AbstractAmqpPublisherMonoSchema.ts} | 11 +- .../lib/AbstractAmqpPublisherMultiSchema.ts | 9 +- packages/amqp/lib/AbstractAmqpService.ts | 106 ++++++++++++------ packages/amqp/lib/AmqpConnectionManager.ts | 106 ++++++++++++++++++ packages/amqp/lib/amqpConnectionResolver.ts | 12 +- .../test/consumers/AmqpPermissionConsumer.ts | 4 +- .../consumers/AmqpPermissionsConsumer.spec.ts | 78 ++++++++++--- ...AmqpPermissionsConsumerMultiSchema.spec.ts | 28 ++++- packages/amqp/test/fakes/FakeConsumer.ts | 4 +- .../AmqpPermissionPublisher.spec.ts | 67 +++++++++-- .../publishers/AmqpPermissionPublisher.ts | 4 +- packages/amqp/test/utils/testContext.ts | 18 ++- packages/amqp/vitest.config.ts | 6 +- 17 files changed, 427 insertions(+), 102 deletions(-) rename packages/amqp/lib/{AbstractAmqpConsumer.ts => AbstractAmqpConsumerMonoSchema.ts} (93%) rename packages/amqp/lib/{AbstractAmqpPublisher.ts => AbstractAmqpPublisherMonoSchema.ts} (84%) create mode 100644 packages/amqp/lib/AmqpConnectionManager.ts diff --git a/README.md b/README.md index 06d03d43..185340c0 100644 --- a/README.md +++ b/README.md @@ -198,3 +198,52 @@ SQS queues are built in a way that every message is only consumed once, and then Both publishers and consumers accept a queue name and configuration as parameters. If the referenced queue (or SNS topic) does not exist at the moment the publisher or the consumer is instantiated, it is automatically created. Similarly, if the referenced topic does not exist during instantiation, it is also automatically created. If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist. + +## Automatic Reconnects (RabbitMQ) + +`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism. + +Example: + +```ts +export const TEST_AMQP_CONFIG: AmqpConfig = { + vhost: '', + hostname: 'localhost', + username: 'guest', + password: 'guest', + port: 5672, + useTls: false, +} + +const amqpConnectionManager = new AmqpConnectionManager(config, logger) +await amqpConnectionManager.init() + +const publisher = new TestAmqpPublisher( + { amqpConnectionManager }, + { + /// other amqp options + }) +await publisher.init() + +const consumer = new TestAmqpConsumer( + { amqpConnectionManager }, + { + /// other amqp options + }) +await consumer.start() + +// break connection, to simulate unexpected disconnection in production +await (await amqpConnectionManager.getConnection()).close() + +const message = { + // some test message +} + +// This will fail, but will trigger reconnection within amqpConnectionManager +publisher.publish(message) + +// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager + +// This will succeed and consumer, which also received new connection, will be able to consume it +publisher.publish(message) +``` diff --git a/packages/amqp/index.ts b/packages/amqp/index.ts index 096e4a92..03fd714d 100644 --- a/packages/amqp/index.ts +++ b/packages/amqp/index.ts @@ -2,11 +2,11 @@ export type { CommonMessage } from './lib/types/MessageTypes' export type { AMQPQueueConfig } from './lib/AbstractAmqpService' -export { AbstractAmqpConsumer } from './lib/AbstractAmqpConsumer' +export { AbstractAmqpConsumerMonoSchema } from './lib/AbstractAmqpConsumerMonoSchema' export { AbstractAmqpConsumerMultiSchema } from './lib/AbstractAmqpConsumerMultiSchema' export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver' -export { AbstractAmqpPublisher } from './lib/AbstractAmqpPublisher' +export { AbstractAmqpPublisherMonoSchema } from './lib/AbstractAmqpPublisherMonoSchema' export { AbstractAmqpPublisherMultiSchema } from './lib/AbstractAmqpPublisherMultiSchema' export type { AmqpConfig } from './lib/amqpConnectionResolver' diff --git a/packages/amqp/lib/AbstractAmqpBaseConsumer.ts b/packages/amqp/lib/AbstractAmqpBaseConsumer.ts index d2dbc11d..096e9c0c 100644 --- a/packages/amqp/lib/AbstractAmqpBaseConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpBaseConsumer.ts @@ -6,7 +6,7 @@ import type { ExistingQueueOptions, } from '@message-queue-toolkit/core' import { isMessageError, parseMessage } from '@message-queue-toolkit/core' -import type { Message } from 'amqplib' +import type { Connection, Message } from 'amqplib' import type { AMQPConsumerDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService' import { AbstractAmqpService } from './AbstractAmqpService' @@ -108,12 +108,12 @@ export abstract class AbstractAmqpBaseConsumer { + await super.receiveNewConnection(connection) + await this.consume() + } + private async consume() { await this.channel.consume(this.queueName, (message) => { if (message === null) { return @@ -158,6 +158,15 @@ export abstract class AbstractAmqpBaseConsumer +export abstract class AbstractAmqpConsumerMonoSchema extends AbstractAmqpBaseConsumer implements QueueConsumer { diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts similarity index 84% rename from packages/amqp/lib/AbstractAmqpPublisher.ts rename to packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts index c8ea4439..a18db33b 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts @@ -14,7 +14,7 @@ import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer' import { AbstractAmqpService } from './AbstractAmqpService' import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService' -export abstract class AbstractAmqpPublisher +export abstract class AbstractAmqpPublisherMonoSchema extends AbstractAmqpService implements SyncPublisher { @@ -39,7 +39,14 @@ export abstract class AbstractAmqpPublisher this.logMessage(resolvedLogMessage) } - this.channel.sendToQueue(this.queueName, objectToBuffer(message)) + try { + this.channel.sendToQueue(this.queueName, objectToBuffer(message)) + } catch (err) { + // @ts-ignore + if (err.message === 'Channel closed') { + void this.reconnect() + } + } } /* c8 ignore start */ diff --git a/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts b/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts index 5464cc38..426e7067 100644 --- a/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts +++ b/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts @@ -47,7 +47,14 @@ export abstract class AbstractAmqpPublisherMultiSchema extends AbstractQueueService< - MessagePayloadType, - Message, - DependenciesType, - CreateAMQPQueueOptions, - AMQPQueueLocatorType, - NewQueueOptions | ExistingQueueOptions -> { - protected readonly connection: Connection + MessagePayloadType extends object, + DependenciesType extends AMQPDependencies = AMQPDependencies, + > + extends AbstractQueueService< + MessagePayloadType, + Message, + DependenciesType, + CreateAMQPQueueOptions, + AMQPQueueLocatorType, + NewQueueOptions | ExistingQueueOptions + > + implements ConnectionReceiver +{ + protected connection?: Connection + private connectionManager: AmqpConnectionManager // @ts-ignore protected channel: Channel private isShuttingDown: boolean @@ -53,32 +58,31 @@ export abstract class AbstractAmqpService< this.queueName = options.locatorConfig ? options.locatorConfig.queueName : options.creationConfig?.queueName - this.connection = dependencies.amqpConnection this.isShuttingDown = false + this.connectionManager = dependencies.amqpConnectionManager + this.connection = this.connectionManager.getConnectionSync() + this.connectionManager.subscribeConnectionReceiver(this) } - private async destroyConnection(): Promise { - if (this.channel) { - try { - await this.channel.close() - } finally { - // @ts-ignore - this.channel = undefined - } - } - } + async receiveNewConnection(connection: Connection) { + this.connection = connection - public async init() { this.isShuttingDown = false - // If channel exists, recreate it if (this.channel) { this.isShuttingDown = true - await this.destroyConnection() + await this.destroyChannel() this.isShuttingDown = false } - this.channel = await this.connection.createChannel() + try { + this.channel = await this.connection.createChannel() + } catch (err) { + // @ts-ignore + this.logger.error(`Error creating channel: ${err.message}`) + await this.connectionManager.reconnect() + return + } if (this.deletionConfig && this.creationConfig) { await deleteAmqp(this.channel, this.deletionConfig, this.creationConfig) @@ -87,7 +91,7 @@ export abstract class AbstractAmqpService< this.channel.on('close', () => { if (!this.isShuttingDown) { this.logger.error(`AMQP connection lost!`) - this.init().catch((err) => { + this.reconnect().catch((err) => { this.handleError(err) throw err }) @@ -103,22 +107,50 @@ export abstract class AbstractAmqpService< this.creationConfig.queueOptions, ) } else { - // queue check breaks channel if not successful - const checkChannel = await this.connection.createChannel() - checkChannel.on('error', () => { - // it's OK - }) + await this.checkQueueExists() + } + } + + private async checkQueueExists() { + // queue check breaks channel if not successful + const checkChannel = await this.connection!.createChannel() + checkChannel.on('error', () => { + // it's OK + }) + try { + await checkChannel.checkQueue(this.locatorConfig!.queueName) + await checkChannel.close() + } catch (err) { + throw new Error(`Queue with queueName ${this.locatorConfig!.queueName} does not exist.`) + } + } + + private async destroyChannel(): Promise { + if (this.channel) { try { - await checkChannel.checkQueue(this.locatorConfig!.queueName) - await checkChannel.close() + await this.channel.close() } catch (err) { - throw new Error(`Queue with queueName ${this.locatorConfig!.queueName} does not exist.`) + // We don't care about connection closing errors + } finally { + // @ts-ignore + this.channel = undefined } } } + public async init() { + // if we don't have connection yet, it's fine, we'll wait for a later receiveNewConnection() call + if (this.connection) { + await this.receiveNewConnection(this.connection) + } + } + + public async reconnect() { + await this.connectionManager.reconnect() + } + async close(): Promise { this.isShuttingDown = true - await this.destroyConnection() + await this.destroyChannel() } } diff --git a/packages/amqp/lib/AmqpConnectionManager.ts b/packages/amqp/lib/AmqpConnectionManager.ts new file mode 100644 index 00000000..2bf4b9dc --- /dev/null +++ b/packages/amqp/lib/AmqpConnectionManager.ts @@ -0,0 +1,106 @@ +import type { Logger } from '@message-queue-toolkit/core' +import type { Connection } from 'amqplib' + +import type { AmqpConfig } from './amqpConnectionResolver' +import { resolveAmqpConnection } from './amqpConnectionResolver' + +export type ConnectionReceiver = { + receiveNewConnection(connection: Connection): Promise +} + +export class AmqpConnectionManager { + private readonly config: AmqpConfig + private readonly logger: Logger + private readonly connectionReceivers: ConnectionReceiver[] + private connection?: Connection + public reconnectsActive: boolean + + public isReconnecting: boolean + + constructor(config: AmqpConfig, logger: Logger) { + this.config = config + this.connectionReceivers = [] + this.reconnectsActive = true + this.isReconnecting = false + this.logger = logger + } + + private async createConnection() { + const connection = await resolveAmqpConnection(this.config) + connection.on('error', (err) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + this.logger.error(`AmqpConnectionManager: Connection error: ${err.message}`) + this.connection = undefined + if (this.reconnectsActive) { + void this.reconnect() + } + }) + connection.on('close', () => { + if (this.reconnectsActive && !this.isReconnecting) { + this.logger.error(`AmqpConnectionManager: Connection closed unexpectedly`) + } + if (this.reconnectsActive) { + void this.reconnect() + } + }) + + const promises: Promise[] = [] + + this.logger.info( + `Propagating new connection across ${this.connectionReceivers.length} receivers`, + ) + for (const receiver of this.connectionReceivers) { + promises.push(receiver.receiveNewConnection(connection)) + } + await Promise.all(promises) + + return connection + } + + getConnectionSync() { + return this.connection + } + + async getConnection(): Promise { + if (!this.connection) { + this.connection = await this.createConnection() + } + return this.connection + } + + async reconnect() { + if (this.isReconnecting) { + return + } + this.logger.info('AmqpConnectionManager: Start reconnecting') + + this.isReconnecting = true + const oldConnection = this.connection + this.connection = await this.createConnection() + if (oldConnection) { + try { + await oldConnection.close() + } catch { + // this can fail + } + } + this.isReconnecting = false + + console.log('Finish reconnecting') + } + + async init() { + this.reconnectsActive = true + await this.getConnection() + } + + async close() { + this.reconnectsActive = false + await this.connection?.close() + this.connection = undefined + } + + subscribeConnectionReceiver(connectionReceiver: ConnectionReceiver) { + this.connectionReceivers.push(connectionReceiver) + } +} diff --git a/packages/amqp/lib/amqpConnectionResolver.ts b/packages/amqp/lib/amqpConnectionResolver.ts index ff50206e..56a5c914 100644 --- a/packages/amqp/lib/amqpConnectionResolver.ts +++ b/packages/amqp/lib/amqpConnectionResolver.ts @@ -4,7 +4,7 @@ import { globalLogger } from '@lokalise/node-core' import { connect } from 'amqplib' const CONNECT_RETRY_SECONDS = 10 -const MAX_RETRY_ATTEMPTS = 5 +const MAX_RETRY_ATTEMPTS = 10 export type AmqpConfig = { hostname: string @@ -22,14 +22,18 @@ export async function resolveAmqpConnection(config: AmqpConfig) { while (true) { const url = `${protocol}://${config.username}:${config.password}@${config.hostname}:${config.port}/${config.vhost}` + const retryTime = CONNECT_RETRY_SECONDS * 1000 * (counter + 1) try { - return await connect(url) + const connection = await connect(url) + return connection } catch (e) { globalLogger.error( - `Failed to connect to AMQP broker at ${config.hostname}:${config.port}. Retrying in ${CONNECT_RETRY_SECONDS} seconds...`, + `Failed to connect to AMQP broker at ${config.hostname}:${config.port}. Retrying in ${ + retryTime / 1000 + } seconds...`, ) } - await setTimeout(CONNECT_RETRY_SECONDS * 1000) + await setTimeout(retryTime) counter++ if (counter > MAX_RETRY_ATTEMPTS) { diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.ts index 6bce12d2..15956ffe 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.ts @@ -1,13 +1,13 @@ import type { Either } from '@lokalise/node-core' -import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer' +import { AbstractAmqpConsumerMonoSchema } from '../../lib/AbstractAmqpConsumerMonoSchema' import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService' import { userPermissionMap } from '../repositories/PermissionRepository' import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas' import { PERMISSIONS_MESSAGE_SCHEMA } from './userConsumerSchemas' -export class AmqpPermissionConsumer extends AbstractAmqpConsumer { +export class AmqpPermissionConsumer extends AbstractAmqpConsumerMonoSchema { public static QUEUE_NAME = 'user_permissions' constructor(dependencies: AMQPConsumerDependencies) { diff --git a/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts index a0724b2a..f9089e92 100644 --- a/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts @@ -6,6 +6,7 @@ import { describe, beforeEach, afterEach, expect, it } from 'vitest' import { objectToBuffer } from '../../../core/lib/utils/queueUtils' import { waitAndRetry } from '../../../core/lib/utils/waitUtils' import { FakeConsumerErrorResolver } from '../fakes/FakeConsumerErrorResolver' +import type { AmqpPermissionPublisher } from '../publishers/AmqpPermissionPublisher' import { userPermissionMap } from '../repositories/PermissionRepository' import { TEST_AMQP_CONFIG } from '../utils/testAmqpConfig' import type { Dependencies } from '../utils/testContext' @@ -17,25 +18,29 @@ import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas' const userIds = [100, 200, 300] const perms: [string, ...string[]] = ['perm1', 'perm2'] -async function waitForPermissions(userIds: number[]) { - return await waitAndRetry(async () => { - const usersPerms = userIds.reduce((acc, userId) => { - if (userPermissionMap[userId]) { - acc.push(userPermissionMap[userId]) - } - return acc - }, [] as string[][]) +function checkPermissions(userIds: number[]) { + const usersPerms = userIds.reduce((acc, userId) => { + if (userPermissionMap[userId]) { + acc.push(userPermissionMap[userId]) + } + return acc + }, [] as string[][]) + + if (usersPerms && usersPerms.length !== userIds.length) { + return null + } - if (usersPerms && usersPerms.length !== userIds.length) { + for (const userPerms of usersPerms) + if (userPerms.length !== perms.length) { return null } - for (const userPerms of usersPerms) - if (userPerms.length !== perms.length) { - return null - } + return usersPerms +} - return usersPerms +async function waitForPermissions(userIds: number[]) { + return await waitAndRetry(async () => { + return checkPermissions(userIds) }) } @@ -43,6 +48,8 @@ describe('PermissionsConsumer', () => { describe('consume', () => { let diContainer: AwilixContainer let channel: Channel + let publisher: AmqpPermissionPublisher + let consumer: AmqpPermissionConsumer beforeEach(async () => { delete userPermissionMap[100] delete userPermissionMap[200] @@ -51,11 +58,16 @@ describe('PermissionsConsumer', () => { consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG), }) - channel = await diContainer.cradle.amqpConnection.createChannel() + channel = await diContainer.cradle.amqpConnectionManager.getConnectionSync()!.createChannel() + publisher = diContainer.cradle.permissionPublisher + consumer = diContainer.cradle.permissionConsumer await diContainer.cradle.permissionConsumer.start() }) afterEach(async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + channel = await connection.createChannel() + await channel.deleteQueue(AmqpPermissionConsumer.QUEUE_NAME) await channel.close() const { awilixManager } = diContainer.cradle @@ -90,6 +102,42 @@ describe('PermissionsConsumer', () => { expect(updatedUsersPermissions[0]).toHaveLength(2) }) + it('Reconnects if connection is lost', async () => { + await (await diContainer.cradle.amqpConnectionManager.getConnection()).close() + + const users = Object.values(userPermissionMap) + expect(users).toHaveLength(0) + + userPermissionMap[100] = [] + userPermissionMap[200] = [] + userPermissionMap[300] = [] + + publisher.publish({ + messageType: 'add', + userIds, + permissions: perms, + }) + + const updatedUsersPermissions = await waitAndRetry(() => { + publisher.publish({ + messageType: 'add', + userIds, + permissions: perms, + }) + + return checkPermissions(userIds) + }) + + if (null === updatedUsersPermissions) { + throw new Error('Users permissions unexpectedly null') + } + + expect(updatedUsersPermissions).toBeDefined() + expect(updatedUsersPermissions[0]).toHaveLength(2) + + await consumer.close() + }) + it('Wait for users to be created and then create permissions', async () => { const users = Object.values(userPermissionMap) expect(users).toHaveLength(0) diff --git a/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts b/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts index 4e72059b..448deba0 100644 --- a/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts @@ -37,10 +37,12 @@ describe('PermissionsConsumerMultiSchema', () => { }) await waitAndRetry(() => { - return logger.loggedMessages.length === 1 + return logger.loggedMessages.length === 3 }) - expect(logger.loggedMessages.length).toBe(1) + expect(logger.loggedMessages.length).toBe(3) + expect(logger.loggedMessages[1]).toEqual({ messageType: 'add' }) + expect(logger.loggedMessages[2]).toEqual({ messageType: 'add' }) }) }) @@ -114,13 +116,13 @@ describe('PermissionsConsumerMultiSchema', () => { publisher = diContainer.cradle.permissionPublisherMultiSchema consumer = diContainer.cradle.permissionConsumerMultiSchema - }) + }, 99999) afterEach(async () => { const { awilixManager } = diContainer.cradle await awilixManager.executeDispose() await diContainer.dispose() - }) + }, 99999) it('Processes messages', async () => { publisher.publish({ @@ -140,5 +142,23 @@ describe('PermissionsConsumerMultiSchema', () => { expect(consumer.addCounter).toBe(1) expect(consumer.removeCounter).toBe(2) }) + + it('Reconnects if connection is lost', async () => { + await (await diContainer.cradle.amqpConnectionManager.getConnection()).close() + publisher.publish({ + messageType: 'add', + }) + + await waitAndRetry(() => { + publisher.publish({ + messageType: 'add', + }) + + return consumer.addCounter > 0 + }) + + expect(consumer.addCounter > 0).toBe(true) + await consumer.close() + }) }) }) diff --git a/packages/amqp/test/fakes/FakeConsumer.ts b/packages/amqp/test/fakes/FakeConsumer.ts index 726173aa..4e1f4449 100644 --- a/packages/amqp/test/fakes/FakeConsumer.ts +++ b/packages/amqp/test/fakes/FakeConsumer.ts @@ -1,11 +1,11 @@ import type { Either } from '@lokalise/node-core' import type { ZodType } from 'zod' -import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer' +import { AbstractAmqpConsumerMonoSchema } from '../../lib/AbstractAmqpConsumerMonoSchema' import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService' import type { CommonMessage } from '../../lib/types/MessageTypes' -export class FakeConsumer extends AbstractAmqpConsumer { +export class FakeConsumer extends AbstractAmqpConsumerMonoSchema { constructor(dependencies: AMQPConsumerDependencies, queueName = 'dummy', messageSchema: ZodType) { super(dependencies, { creationConfig: { diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 1c52ce64..fcc86036 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -45,10 +45,14 @@ describe('PermissionPublisher', () => { publisher.publish(message) await waitAndRetry(() => { - return logger.loggedMessages.length === 1 + return logger.loggedMessages.length === 2 }) - expect(logger.loggedMessages.length).toBe(1) + expect(logger.loggedMessages[1]).toEqual({ + messageType: 'add', + permissions: ['perm1', 'perm2'], + userIds: [100, 200, 300], + }) }) }) @@ -68,7 +72,8 @@ describe('PermissionPublisher', () => { }) beforeEach(async () => { - channel = await diContainer.cradle.amqpConnection.createChannel() + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + channel = await connection.createChannel() }) afterEach(async () => { @@ -116,10 +121,13 @@ describe('PermissionPublisher', () => { }) beforeEach(async () => { - channel = await diContainer.cradle.amqpConnection.createChannel() + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + channel = await connection.createChannel() }) afterEach(async () => { + const connection = await diContainer.cradle.amqpConnectionManager.getConnection() + channel = await connection.createChannel() await channel.deleteQueue(AmqpPermissionConsumer.QUEUE_NAME) await channel.close() }) @@ -154,13 +162,50 @@ describe('PermissionPublisher', () => { permissionPublisher.publish(message) - await waitAndRetry( - () => { - return receivedMessage !== null - }, - 40, - 30, - ) + await waitAndRetry(() => { + return receivedMessage !== null + }) + + expect(receivedMessage).toEqual({ + messageType: 'add', + permissions: ['perm1', 'perm2'], + userIds: [100, 200, 300], + }) + }) + + it('reconnects on lost connection', async () => { + const { permissionPublisher } = diContainer.cradle + + const message = { + userIds, + messageType: 'add', + permissions: perms, + } satisfies PERMISSIONS_MESSAGE_TYPE + + permissionPublisher.publish(message) + + await diContainer.cradle.amqpConnectionManager.close() + await diContainer.cradle.amqpConnectionManager.init() + + let receivedMessage: PERMISSIONS_MESSAGE_TYPE | null = null + channel = await diContainer.cradle.amqpConnectionManager.getConnectionSync()!.createChannel() + await channel.consume(AmqpPermissionPublisher.QUEUE_NAME, (message) => { + if (message === null) { + return + } + const decodedMessage = deserializeAmqpMessage( + message, + PERMISSIONS_MESSAGE_SCHEMA, + new FakeConsumerErrorResolver(), + ) + receivedMessage = decodedMessage.result! + }) + + permissionPublisher.publish(message) + + await waitAndRetry(() => { + return receivedMessage !== null + }) expect(receivedMessage).toEqual({ messageType: 'add', diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts index 4445a90e..206ddffa 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts @@ -2,12 +2,12 @@ import type { ExistingAMQPConsumerOptions, NewAMQPConsumerOptions, } from '../../lib/AbstractAmqpBaseConsumer' -import { AbstractAmqpPublisher } from '../../lib/AbstractAmqpPublisher' +import { AbstractAmqpPublisherMonoSchema } from '../../lib/AbstractAmqpPublisherMonoSchema' import type { AMQPDependencies } from '../../lib/AbstractAmqpService' import type { PERMISSIONS_MESSAGE_TYPE } from '../consumers/userConsumerSchemas' import { PERMISSIONS_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas' -export class AmqpPermissionPublisher extends AbstractAmqpPublisher { +export class AmqpPermissionPublisher extends AbstractAmqpPublisherMonoSchema { public static QUEUE_NAME = 'user_permissions' constructor( diff --git a/packages/amqp/test/utils/testContext.ts b/packages/amqp/test/utils/testContext.ts index f21bbad7..ae54a558 100644 --- a/packages/amqp/test/utils/testContext.ts +++ b/packages/amqp/test/utils/testContext.ts @@ -1,12 +1,11 @@ import type { ErrorReporter, ErrorResolver } from '@lokalise/node-core' import type { Logger, TransactionObservabilityManager } from '@message-queue-toolkit/core' -import type { Connection } from 'amqplib' import type { Resolver } from 'awilix' import { asClass, asFunction, createContainer, Lifetime } from 'awilix' import { AwilixManager } from 'awilix-manager' +import { AmqpConnectionManager } from '../../lib/AmqpConnectionManager' import type { AmqpConfig } from '../../lib/amqpConnectionResolver' -import { resolveAmqpConnection } from '../../lib/amqpConnectionResolver' import { AmqpConsumerErrorResolver } from '../../lib/errors/AmqpConsumerErrorResolver' import { AmqpPermissionConsumer } from '../consumers/AmqpPermissionConsumer' import { AmqpPermissionConsumerMultiSchema } from '../consumers/AmqpPermissionConsumerMultiSchema' @@ -27,7 +26,6 @@ export async function registerDependencies( const diContainer = createContainer({ injectionMode: 'PROXY', }) - const amqpConnection = await resolveAmqpConnection(config) const awilixManager = new AwilixManager({ diContainer, asyncDispose: true, @@ -42,15 +40,15 @@ export async function registerDependencies( awilixManager: asFunction(() => { return awilixManager }, SINGLETON_CONFIG), - amqpConnection: asFunction( - () => { - return amqpConnection + amqpConnectionManager: asFunction( + ({ logger }: Dependencies) => { + return new AmqpConnectionManager(config, logger) }, { lifetime: Lifetime.SINGLETON, - dispose: (connection) => { - return connection.close() - }, + asyncInit: 'init', + asyncDispose: 'close', + asyncDisposePriority: 100, }, ), consumerErrorResolver: asFunction(() => { @@ -109,7 +107,7 @@ type DiConfig = Record> export interface Dependencies { logger: Logger - amqpConnection: Connection + amqpConnectionManager: AmqpConnectionManager awilixManager: AwilixManager // vendor-specific dependencies diff --git a/packages/amqp/vitest.config.ts b/packages/amqp/vitest.config.ts index fed29f4b..916d1853 100644 --- a/packages/amqp/vitest.config.ts +++ b/packages/amqp/vitest.config.ts @@ -12,10 +12,10 @@ export default defineConfig({ exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*', 'lib/types/**/*.*'], reporter: ['text'], all: true, - lines: 85, + lines: 90, functions: 100, - branches: 75, - statements: 85, + branches: 80, + statements: 90, }, }, }) From a744c04e7378ae0e1b0a26baf91017e9536e531f Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 22:12:56 +0300 Subject: [PATCH 02/16] Add explanation comment --- packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts | 3 +++ packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts | 3 +++ 2 files changed, 6 insertions(+) diff --git a/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts index a18db33b..993c2830 100644 --- a/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts +++ b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts @@ -42,6 +42,9 @@ export abstract class AbstractAmqpPublisherMonoSchema Date: Wed, 27 Sep 2023 22:19:54 +0300 Subject: [PATCH 03/16] Cleanup --- packages/amqp/lib/AmqpConnectionManager.ts | 8 ++++---- .../consumers/AmqpPermissionsConsumerMultiSchema.spec.ts | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/amqp/lib/AmqpConnectionManager.ts b/packages/amqp/lib/AmqpConnectionManager.ts index 2bf4b9dc..c940668a 100644 --- a/packages/amqp/lib/AmqpConnectionManager.ts +++ b/packages/amqp/lib/AmqpConnectionManager.ts @@ -31,16 +31,16 @@ export class AmqpConnectionManager { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access this.logger.error(`AmqpConnectionManager: Connection error: ${err.message}`) this.connection = undefined - if (this.reconnectsActive) { + if (this.reconnectsActive && !this.isReconnecting) { void this.reconnect() } }) connection.on('close', () => { if (this.reconnectsActive && !this.isReconnecting) { this.logger.error(`AmqpConnectionManager: Connection closed unexpectedly`) - } - if (this.reconnectsActive) { - void this.reconnect() + if (this.reconnectsActive) { + void this.reconnect() + } } }) diff --git a/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts b/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts index 448deba0..c0481ab4 100644 --- a/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionsConsumerMultiSchema.spec.ts @@ -116,13 +116,13 @@ describe('PermissionsConsumerMultiSchema', () => { publisher = diContainer.cradle.permissionPublisherMultiSchema consumer = diContainer.cradle.permissionConsumerMultiSchema - }, 99999) + }) afterEach(async () => { const { awilixManager } = diContainer.cradle await awilixManager.executeDispose() await diContainer.dispose() - }, 99999) + }) it('Processes messages', async () => { publisher.publish({ From decb5e20cdd52a7f5ab4b1e520c304e4b10d2d31 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 22:21:57 +0300 Subject: [PATCH 04/16] Remove console.log --- packages/amqp/lib/AmqpConnectionManager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/amqp/lib/AmqpConnectionManager.ts b/packages/amqp/lib/AmqpConnectionManager.ts index c940668a..82a6ecf7 100644 --- a/packages/amqp/lib/AmqpConnectionManager.ts +++ b/packages/amqp/lib/AmqpConnectionManager.ts @@ -86,7 +86,7 @@ export class AmqpConnectionManager { } this.isReconnecting = false - console.log('Finish reconnecting') + this.logger.info('AmqpConnectionManager: Finish reconnecting') } async init() { From be376ab51e18dbe022540228a1b21503056d6ad6 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 22:31:31 +0300 Subject: [PATCH 05/16] Cleanup --- packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts | 3 +++ packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts | 3 +++ .../test/consumers/AmqpPermissionsConsumer.spec.ts | 10 +++++++--- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts index 993c2830..ea7d66c2 100644 --- a/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts +++ b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts @@ -47,7 +47,10 @@ export abstract class AbstractAmqpPublisherMonoSchema { }) const updatedUsersPermissions = await waitAndRetry(() => { + const checkResult = checkPermissions(userIds) + if (checkResult) { + return checkResult + } + publisher.publish({ messageType: 'add', userIds, permissions: perms, }) - - return checkPermissions(userIds) - }) + return false + }, 50) if (null === updatedUsersPermissions) { throw new Error('Users permissions unexpectedly null') From 27a52beb5af6e59c6c9dd661fb6017e8416cde00 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 22:34:53 +0300 Subject: [PATCH 06/16] Fix build --- packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts index 00d467ec..28589d98 100644 --- a/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts @@ -129,7 +129,7 @@ describe('PermissionsConsumer', () => { userIds, permissions: perms, }) - return false + return null }, 50) if (null === updatedUsersPermissions) { From f9e1bc686f74aecfe0dc343592005e3644c462e5 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 22:39:33 +0300 Subject: [PATCH 07/16] Try closing publisher --- packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts index 28589d98..abeddbe0 100644 --- a/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionsConsumer.spec.ts @@ -139,6 +139,7 @@ describe('PermissionsConsumer', () => { expect(updatedUsersPermissions).toBeDefined() expect(updatedUsersPermissions[0]).toHaveLength(2) + await publisher.close() await consumer.close() }) From 64aa1df94ab29d52e2f01c82ad1ffb01a2f34679 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 22:49:16 +0300 Subject: [PATCH 08/16] Defer closing old channel --- packages/amqp/lib/AbstractAmqpService.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/amqp/lib/AbstractAmqpService.ts b/packages/amqp/lib/AbstractAmqpService.ts index c3c6b18b..03f1b5bb 100644 --- a/packages/amqp/lib/AbstractAmqpService.ts +++ b/packages/amqp/lib/AbstractAmqpService.ts @@ -68,12 +68,8 @@ export abstract class AbstractAmqpService< this.connection = connection this.isShuttingDown = false - // If channel exists, recreate it - if (this.channel) { - this.isShuttingDown = true - await this.destroyChannel() - this.isShuttingDown = false - } + // If channel already exists, recreate it + const oldChannel = this.channel try { this.channel = await this.connection.createChannel() @@ -84,6 +80,16 @@ export abstract class AbstractAmqpService< return } + if (oldChannel) { + this.isShuttingDown = true + try { + await oldChannel.close() + } catch { + // errors are ok + } + this.isShuttingDown = false + } + if (this.deletionConfig && this.creationConfig) { await deleteAmqp(this.channel, this.deletionConfig, this.creationConfig) } From b84477d3556e5ed6fdce99d13e9cc5c76529068a Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 22:53:38 +0300 Subject: [PATCH 09/16] Try to fix flaky test --- packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index fcc86036..7e3a43f9 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -212,6 +212,9 @@ describe('PermissionPublisher', () => { permissions: ['perm1', 'perm2'], userIds: [100, 200, 300], }) + + await permissionPublisher.close() + await channel.close() }) }) }) From c3d19fc8b85f667288330b030c8f5706b173b66f Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 23:02:15 +0300 Subject: [PATCH 10/16] Address code review comments --- .../amqp/lib/AbstractAmqpBasePublisher.ts | 32 +++++++++++++++++++ .../lib/AbstractAmqpPublisherMonoSchema.ts | 27 ++-------------- .../lib/AbstractAmqpPublisherMultiSchema.ts | 30 +++-------------- 3 files changed, 39 insertions(+), 50 deletions(-) create mode 100644 packages/amqp/lib/AbstractAmqpBasePublisher.ts diff --git a/packages/amqp/lib/AbstractAmqpBasePublisher.ts b/packages/amqp/lib/AbstractAmqpBasePublisher.ts new file mode 100644 index 00000000..63e81279 --- /dev/null +++ b/packages/amqp/lib/AbstractAmqpBasePublisher.ts @@ -0,0 +1,32 @@ +import type { Either } from '@lokalise/node-core' +import type { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core' +import { objectToBuffer } from '@message-queue-toolkit/core' + +import { AbstractAmqpService } from './AbstractAmqpService' + +export abstract class AbstractAmqpBasePublisher< + MessagePayloadType extends object, +> extends AbstractAmqpService { + protected sendToQueue(message: MessagePayloadType): void { + try { + this.channel.sendToQueue(this.queueName, objectToBuffer(message)) + } catch (err) { + // Unfortunately, reliable retry mechanism can't be implemented with try-catch block, + // as not all failures end up here. If connection is closed programmatically, it works fine, + // but if server closes connection unexpectedly (e. g. RabbitMQ is shut down), then we don't land here + // @ts-ignore + if (err.message === 'Channel closed') { + this.logger.error(`AMQP channel closed`) + void this.reconnect() + } else { + throw err + } + } + } + + /* c8 ignore start */ + protected resolveMessage(): Either { + throw new Error('Not implemented for publisher') + } + /* c8 ignore stop */ +} diff --git a/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts index ea7d66c2..05f2aea4 100644 --- a/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts +++ b/packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts @@ -1,21 +1,18 @@ import type { Either } from '@lokalise/node-core' import type { ExistingQueueOptions, - MessageInvalidFormatError, - MessageValidationError, MonoSchemaQueueOptions, NewQueueOptions, SyncPublisher, } from '@message-queue-toolkit/core' -import { objectToBuffer } from '@message-queue-toolkit/core' import type { ZodSchema } from 'zod' import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer' -import { AbstractAmqpService } from './AbstractAmqpService' +import { AbstractAmqpBasePublisher } from './AbstractAmqpBasePublisher' import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService' export abstract class AbstractAmqpPublisherMonoSchema - extends AbstractAmqpService + extends AbstractAmqpBasePublisher implements SyncPublisher { private readonly messageSchema: ZodSchema @@ -39,30 +36,12 @@ export abstract class AbstractAmqpPublisherMonoSchema { - throw new Error('Not implemented for publisher') - } - protected override resolveSchema(): Either> { throw new Error('Not implemented for publisher') } - /* c8 ignore stop */ } diff --git a/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts b/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts index 044130e2..e196d132 100644 --- a/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts +++ b/packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts @@ -1,21 +1,19 @@ import type { Either } from '@lokalise/node-core' import type { ExistingQueueOptions, - MessageInvalidFormatError, - MessageValidationError, NewQueueOptions, SyncPublisher, MultiSchemaPublisherOptions, } from '@message-queue-toolkit/core' -import { MessageSchemaContainer, objectToBuffer } from '@message-queue-toolkit/core' +import { MessageSchemaContainer } from '@message-queue-toolkit/core' import type { ZodSchema } from 'zod' import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer' -import { AbstractAmqpService } from './AbstractAmqpService' +import { AbstractAmqpBasePublisher } from './AbstractAmqpBasePublisher' import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService' export abstract class AbstractAmqpPublisherMultiSchema - extends AbstractAmqpService + extends AbstractAmqpBasePublisher implements SyncPublisher { private readonly messageSchemaContainer: MessageSchemaContainer @@ -47,29 +45,9 @@ export abstract class AbstractAmqpPublisherMultiSchema { - throw new Error('Not implemented for publisher') - } - - /* c8 ignore stop */ - protected override resolveSchema( message: MessagePayloadType, ): Either> { From ab9461ceb201fcaa9c0ac513f6b884b000f826e7 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 23:24:34 +0300 Subject: [PATCH 11/16] Try to fix a test --- .../amqp/test/publishers/AmqpPermissionPublisher.spec.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 7e3a43f9..14493aab 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -214,7 +214,11 @@ describe('PermissionPublisher', () => { }) await permissionPublisher.close() - await channel.close() + try { + await channel.close() + } catch { + // it's ok + } }) }) }) From 0371f491023ec342140a17cf7e22884a43eaabe7 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 23:33:18 +0300 Subject: [PATCH 12/16] Use silent channel --- .../test/publishers/AmqpPermissionPublisher.spec.ts | 9 ++++++--- packages/amqp/test/utils/channelUtils.ts | 13 +++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 packages/amqp/test/utils/channelUtils.ts diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 14493aab..50bdf857 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -11,6 +11,7 @@ import { PERMISSIONS_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas' import { FakeConsumer } from '../fakes/FakeConsumer' import { FakeConsumerErrorResolver } from '../fakes/FakeConsumerErrorResolver' import { FakeLogger } from '../fakes/FakeLogger' +import { createSilentChannel } from '../utils/channelUtils' import { TEST_AMQP_CONFIG } from '../utils/testAmqpConfig' import type { Dependencies } from '../utils/testContext' import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext' @@ -188,8 +189,10 @@ describe('PermissionPublisher', () => { await diContainer.cradle.amqpConnectionManager.init() let receivedMessage: PERMISSIONS_MESSAGE_TYPE | null = null - channel = await diContainer.cradle.amqpConnectionManager.getConnectionSync()!.createChannel() - await channel.consume(AmqpPermissionPublisher.QUEUE_NAME, (message) => { + const consumerChannel = await createSilentChannel( + diContainer.cradle.amqpConnectionManager.getConnectionSync()!, + ) + await consumerChannel.consume(AmqpPermissionPublisher.QUEUE_NAME, (message) => { if (message === null) { return } @@ -215,7 +218,7 @@ describe('PermissionPublisher', () => { await permissionPublisher.close() try { - await channel.close() + await consumerChannel.close() } catch { // it's ok } diff --git a/packages/amqp/test/utils/channelUtils.ts b/packages/amqp/test/utils/channelUtils.ts new file mode 100644 index 00000000..14f93408 --- /dev/null +++ b/packages/amqp/test/utils/channelUtils.ts @@ -0,0 +1,13 @@ +import type { Connection } from 'amqplib' + +export async function createSilentChannel(connection: Connection) { + const channel = await connection.createChannel() + channel.on('close', () => { + // consume event + }) + channel.on('error', (err) => { + // consume event + }) + + return channel +} From 696f03c3ecc223c5221f929231bbbb81ae488db6 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 23:40:49 +0300 Subject: [PATCH 13/16] Try to fix test --- .../amqp/test/publishers/AmqpPermissionPublisher.spec.ts | 5 +++++ packages/amqp/test/utils/channelUtils.ts | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 50bdf857..a282d30e 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -188,6 +188,11 @@ describe('PermissionPublisher', () => { await diContainer.cradle.amqpConnectionManager.close() await diContainer.cradle.amqpConnectionManager.init() + // wait till we are done reconnecting + await waitAndRetry(() => { + return diContainer.cradle.amqpConnectionManager.getConnectionSync() + }) + let receivedMessage: PERMISSIONS_MESSAGE_TYPE | null = null const consumerChannel = await createSilentChannel( diContainer.cradle.amqpConnectionManager.getConnectionSync()!, diff --git a/packages/amqp/test/utils/channelUtils.ts b/packages/amqp/test/utils/channelUtils.ts index 14f93408..df8b47de 100644 --- a/packages/amqp/test/utils/channelUtils.ts +++ b/packages/amqp/test/utils/channelUtils.ts @@ -5,7 +5,7 @@ export async function createSilentChannel(connection: Connection) { channel.on('close', () => { // consume event }) - channel.on('error', (err) => { + channel.on('error', () => { // consume event }) From 61ee5ce47d7470fcfa4881c9f68588eadeeed2c4 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 27 Sep 2023 23:45:40 +0300 Subject: [PATCH 14/16] One more catch --- packages/amqp/lib/AmqpConnectionManager.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/amqp/lib/AmqpConnectionManager.ts b/packages/amqp/lib/AmqpConnectionManager.ts index 82a6ecf7..6bd04012 100644 --- a/packages/amqp/lib/AmqpConnectionManager.ts +++ b/packages/amqp/lib/AmqpConnectionManager.ts @@ -96,7 +96,11 @@ export class AmqpConnectionManager { async close() { this.reconnectsActive = false - await this.connection?.close() + try { + await this.connection?.close() + } catch { + // it's OK + } this.connection = undefined } From 04c4fc5ae581872e71c2259b884f8eaf690017ba Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 28 Sep 2023 00:16:43 +0300 Subject: [PATCH 15/16] Use real'er consumer --- packages/amqp/lib/AmqpConnectionManager.ts | 2 +- .../AmqpPermissionPublisher.spec.ts | 89 ++++++++++--------- packages/amqp/test/utils/channelUtils.ts | 13 --- 3 files changed, 48 insertions(+), 56 deletions(-) delete mode 100644 packages/amqp/test/utils/channelUtils.ts diff --git a/packages/amqp/lib/AmqpConnectionManager.ts b/packages/amqp/lib/AmqpConnectionManager.ts index 6bd04012..d6a27252 100644 --- a/packages/amqp/lib/AmqpConnectionManager.ts +++ b/packages/amqp/lib/AmqpConnectionManager.ts @@ -86,7 +86,7 @@ export class AmqpConnectionManager { } this.isReconnecting = false - this.logger.info('AmqpConnectionManager: Finish reconnecting') + this.logger.info('AmqpConnectionManager: Reconnect complete') } async init() { diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index a282d30e..aef4ab96 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -11,7 +11,7 @@ import { PERMISSIONS_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas' import { FakeConsumer } from '../fakes/FakeConsumer' import { FakeConsumerErrorResolver } from '../fakes/FakeConsumerErrorResolver' import { FakeLogger } from '../fakes/FakeLogger' -import { createSilentChannel } from '../utils/channelUtils' +import { userPermissionMap } from '../repositories/PermissionRepository' import { TEST_AMQP_CONFIG } from '../utils/testAmqpConfig' import type { Dependencies } from '../utils/testContext' import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext' @@ -22,6 +22,30 @@ import type { AmqpPermissionPublisherMultiSchema } from './AmqpPermissionPublish const perms: [string, ...string[]] = ['perm1', 'perm2'] const userIds = [100, 200, 300] +function checkPermissions(userIds: number[]) { + const usersPerms = userIds.reduce((acc, userId) => { + if (userPermissionMap[userId]) { + acc.push(userPermissionMap[userId]) + } + return acc + }, [] as string[][]) + + if (usersPerms.length > userIds.length) { + return usersPerms.slice(0, userIds.length - 1) + } + + if (usersPerms && usersPerms.length !== userIds.length) { + return null + } + + for (const userPerms of usersPerms) + if (userPerms.length !== perms.length) { + return null + } + + return usersPerms +} + describe('PermissionPublisher', () => { describe('logging', () => { let logger: FakeLogger @@ -112,12 +136,6 @@ describe('PermissionPublisher', () => { beforeAll(async () => { diContainer = await registerDependencies(TEST_AMQP_CONFIG, { consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG), - permissionConsumer: asClass(FakeConsumer, { - lifetime: Lifetime.SINGLETON, - asyncInit: 'start', - asyncDispose: 'close', - asyncDisposePriority: 10, - }), }) }) @@ -175,7 +193,15 @@ describe('PermissionPublisher', () => { }) it('reconnects on lost connection', async () => { - const { permissionPublisher } = diContainer.cradle + const users = Object.values(userPermissionMap) + expect(users).toHaveLength(0) + + userPermissionMap[100] = [] + userPermissionMap[200] = [] + userPermissionMap[300] = [] + + const { permissionPublisher, permissionConsumer } = diContainer.cradle + await permissionConsumer.start() const message = { userIds, @@ -183,50 +209,29 @@ describe('PermissionPublisher', () => { permissions: perms, } satisfies PERMISSIONS_MESSAGE_TYPE - permissionPublisher.publish(message) - await diContainer.cradle.amqpConnectionManager.close() - await diContainer.cradle.amqpConnectionManager.init() // wait till we are done reconnecting await waitAndRetry(() => { return diContainer.cradle.amqpConnectionManager.getConnectionSync() }) - let receivedMessage: PERMISSIONS_MESSAGE_TYPE | null = null - const consumerChannel = await createSilentChannel( - diContainer.cradle.amqpConnectionManager.getConnectionSync()!, - ) - await consumerChannel.consume(AmqpPermissionPublisher.QUEUE_NAME, (message) => { - if (message === null) { - return - } - const decodedMessage = deserializeAmqpMessage( - message, - PERMISSIONS_MESSAGE_SCHEMA, - new FakeConsumerErrorResolver(), - ) - receivedMessage = decodedMessage.result! - }) - - permissionPublisher.publish(message) + const updatedUsersPermissions = await waitAndRetry( + () => { + permissionPublisher.publish(message) - await waitAndRetry(() => { - return receivedMessage !== null - }) - - expect(receivedMessage).toEqual({ - messageType: 'add', - permissions: ['perm1', 'perm2'], - userIds: [100, 200, 300], - }) + return checkPermissions(userIds) + }, + 100, + 20, + ) - await permissionPublisher.close() - try { - await consumerChannel.close() - } catch { - // it's ok + if (null === updatedUsersPermissions) { + throw new Error('Users permissions unexpectedly null') } + + expect(updatedUsersPermissions).toBeDefined() + expect(updatedUsersPermissions[0]).toHaveLength(2) }) }) }) diff --git a/packages/amqp/test/utils/channelUtils.ts b/packages/amqp/test/utils/channelUtils.ts deleted file mode 100644 index df8b47de..00000000 --- a/packages/amqp/test/utils/channelUtils.ts +++ /dev/null @@ -1,13 +0,0 @@ -import type { Connection } from 'amqplib' - -export async function createSilentChannel(connection: Connection) { - const channel = await connection.createChannel() - channel.on('close', () => { - // consume event - }) - channel.on('error', () => { - // consume event - }) - - return channel -} From 004ef94a286ac0d09676aade9dd1b16bab2e6d5f Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 28 Sep 2023 10:16:55 +0300 Subject: [PATCH 16/16] Tweaks --- packages/amqp/index.ts | 2 ++ packages/amqp/lib/AmqpConnectionManager.ts | 6 ++++++ .../amqp/test/publishers/AmqpPermissionPublisher.spec.ts | 7 +------ packages/amqp/test/utils/testContext.ts | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/amqp/index.ts b/packages/amqp/index.ts index 03fd714d..7b1078bf 100644 --- a/packages/amqp/index.ts +++ b/packages/amqp/index.ts @@ -12,4 +12,6 @@ export { AbstractAmqpPublisherMultiSchema } from './lib/AbstractAmqpPublisherMul export type { AmqpConfig } from './lib/amqpConnectionResolver' export { resolveAmqpConnection } from './lib/amqpConnectionResolver' +export { AmqpConnectionManager } from './lib/AmqpConnectionManager' +export type { ConnectionReceiver } from './lib/AmqpConnectionManager' export { deserializeAmqpMessage } from './lib/amqpMessageDeserializer' diff --git a/packages/amqp/lib/AmqpConnectionManager.ts b/packages/amqp/lib/AmqpConnectionManager.ts index d6a27252..31349bb8 100644 --- a/packages/amqp/lib/AmqpConnectionManager.ts +++ b/packages/amqp/lib/AmqpConnectionManager.ts @@ -6,6 +6,7 @@ import { resolveAmqpConnection } from './amqpConnectionResolver' export type ConnectionReceiver = { receiveNewConnection(connection: Connection): Promise + close(): Promise } export class AmqpConnectionManager { @@ -96,6 +97,11 @@ export class AmqpConnectionManager { async close() { this.reconnectsActive = false + + for (const receiver of this.connectionReceivers) { + await receiver.close() + } + try { await this.connection?.close() } catch { diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index aef4ab96..f3b19941 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -209,12 +209,7 @@ describe('PermissionPublisher', () => { permissions: perms, } satisfies PERMISSIONS_MESSAGE_TYPE - await diContainer.cradle.amqpConnectionManager.close() - - // wait till we are done reconnecting - await waitAndRetry(() => { - return diContainer.cradle.amqpConnectionManager.getConnectionSync() - }) + await diContainer.cradle.amqpConnectionManager.getConnectionSync()!.close() const updatedUsersPermissions = await waitAndRetry( () => { diff --git a/packages/amqp/test/utils/testContext.ts b/packages/amqp/test/utils/testContext.ts index ae54a558..eb3b171c 100644 --- a/packages/amqp/test/utils/testContext.ts +++ b/packages/amqp/test/utils/testContext.ts @@ -48,7 +48,7 @@ export async function registerDependencies( lifetime: Lifetime.SINGLETON, asyncInit: 'init', asyncDispose: 'close', - asyncDisposePriority: 100, + asyncDisposePriority: 1, }, ), consumerErrorResolver: asFunction(() => {