From a3fde6feb7c26ed21e569f0e8b024583951c403b Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 21 Nov 2023 15:18:57 +0100 Subject: [PATCH] test: AbstractSetMemoryQueue --- packages/cubejs-backend-shared/src/index.ts | 3 +- packages/cubejs-backend-shared/src/queue.ts | 64 ++++++++++++++++++ .../src/{Semaphore.ts => semaphore.ts} | 0 .../cubejs-backend-shared/test/queue.test.ts | 67 +++++++++++++++++++ .../src/orchestrator/MemoryQueue.ts | 65 +----------------- 5 files changed, 136 insertions(+), 63 deletions(-) create mode 100644 packages/cubejs-backend-shared/src/queue.ts rename packages/cubejs-backend-shared/src/{Semaphore.ts => semaphore.ts} (100%) create mode 100644 packages/cubejs-backend-shared/test/queue.test.ts diff --git a/packages/cubejs-backend-shared/src/index.ts b/packages/cubejs-backend-shared/src/index.ts index 1fea52502c7a6..191063d1be77f 100644 --- a/packages/cubejs-backend-shared/src/index.ts +++ b/packages/cubejs-backend-shared/src/index.ts @@ -20,4 +20,5 @@ export * from './time'; export * from './process'; export * from './platform'; export * from './FileRepository'; -export * from './Semaphore'; +export * from './semaphore'; +export * from './queue'; diff --git a/packages/cubejs-backend-shared/src/queue.ts b/packages/cubejs-backend-shared/src/queue.ts new file mode 100644 index 0000000000000..8bd9e9ed7602a --- /dev/null +++ b/packages/cubejs-backend-shared/src/queue.ts @@ -0,0 +1,64 @@ +import { Semaphore } from './semaphore'; + +/** + * Special in-memory queue which handles execution in background with specify concurrency limit + * It has a capacity restriction, which block adding new items to queue when the queue is full + */ +export abstract class AbstractSetMemoryQueue { + protected readonly queue: Set = new Set(); + + protected readonly executionSem: Semaphore; + + protected readonly addSem: Semaphore; + + public constructor( + protected readonly capacity: number, + concurrency: number, + ) { + this.executionSem = new Semaphore(concurrency); + this.addSem = new Semaphore(capacity); + } + + protected execution: boolean = false; + + public async addToQueue(item: string) { + const next = this.addSem.acquire(); + this.queue.add(item); + + if (this.queue.size > this.capacity) { + this.onCapacity(); + } + + this.run().catch(e => console.error(e)); + await next; + } + + public async run(): Promise { + if (this.execution) { + return; + } + + this.execution = true; + + try { + while (this.queue.size) { + const toExecute = this.queue[Symbol.iterator]().next().value; + if (toExecute) { + this.queue.delete(toExecute); + await this.executionSem.acquire(); + + this.execute(toExecute).finally(() => { + this.executionSem.release(); + this.addSem.release(); + }); + } + } + } finally { + this.execution = false; + } + } + + protected abstract onCapacity(): void; + + protected abstract execute(item: string): Promise; +} diff --git a/packages/cubejs-backend-shared/src/Semaphore.ts b/packages/cubejs-backend-shared/src/semaphore.ts similarity index 100% rename from packages/cubejs-backend-shared/src/Semaphore.ts rename to packages/cubejs-backend-shared/src/semaphore.ts diff --git a/packages/cubejs-backend-shared/test/queue.test.ts b/packages/cubejs-backend-shared/test/queue.test.ts new file mode 100644 index 0000000000000..5e0e12dc692b8 --- /dev/null +++ b/packages/cubejs-backend-shared/test/queue.test.ts @@ -0,0 +1,67 @@ +import { AbstractSetMemoryQueue, pausePromise } from '../src'; + +describe('AbstractSetMemoryQueue', () => { + it('concurrency(1)', async () => { + jest.setTimeout(10 * 1000); + + let capacityFullWarnings = 0; + const executed: string[] = []; + + class TestQueue extends AbstractSetMemoryQueue { + protected async execute(item: string): Promise { + executed.push(item); + + await pausePromise(15); + } + + protected onCapacity(): void { + capacityFullWarnings++; + } + } + + const all = []; + + const queue = new TestQueue( + 5, + 1 + ); + + all.push(queue.addToQueue('a')); + all.push(queue.addToQueue('ab')); all.push(queue.addToQueue('ab')); + all.push(queue.addToQueue('abc')); + all.push(queue.addToQueue('abcd')); + all.push(queue.addToQueue('abcde')); all.push(queue.addToQueue('abcde')); + all.push(queue.addToQueue('abcdef')); + all.push(queue.addToQueue('abcdefg')); + all.push(queue.addToQueue('abcdefgh')); + all.push(queue.addToQueue('abcdefghk')); all.push(queue.addToQueue('abcdefghk')); + all.push(queue.addToQueue('abcdefghkl')); + all.push(queue.addToQueue('abcdefghklm')); + all.push(queue.addToQueue('abcdefghklmn')); all.push(queue.addToQueue('abcdefghklmn')); + all.push(queue.addToQueue('abcdefghklmno')); + all.push(queue.addToQueue('abcdefghklmnop')); + all.push(queue.addToQueue('abcdefghklmnopw')); + + await Promise.all(all); + + expect(executed.length).toEqual(15); + expect(capacityFullWarnings).toBeGreaterThan(10); + expect(executed).toEqual([ + 'a', + 'ab', + 'abc', + 'abcd', + 'abcde', + 'abcdef', + 'abcdefg', + 'abcdefgh', + 'abcdefghk', + 'abcdefghkl', + 'abcdefghklm', + 'abcdefghklmn', + 'abcdefghklmno', + 'abcdefghklmnop', + 'abcdefghklmnopw', + ]); + }); +}); diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts b/packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts index 3a555ebe9898b..7ea88dbe1e0a1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts @@ -1,65 +1,6 @@ -import { Semaphore } from '@cubejs-backend/shared'; +import { AbstractSetMemoryQueue } from '@cubejs-backend/shared'; import { QueryCache } from './QueryCache'; -export abstract class AbstractSetMemoryQueue { - protected readonly queue: Set = new Set(); - - protected readonly executionSem: Semaphore; - - protected readonly addSem: Semaphore; - - public constructor( - protected readonly capacity: number, - concurrency: number, - ) { - this.executionSem = new Semaphore(concurrency); - this.addSem = new Semaphore(capacity); - } - - protected execution: boolean = false; - - public async addToQueue(item: string) { - const next = this.addSem.acquire(); - this.queue.add(item); - - if (this.queue.size > this.capacity) { - await this.onCapacity(); - } - - this.run().catch(e => console.log(e)); - await next; - } - - public async run(): Promise { - if (this.execution) { - return; - } - - this.execution = true; - - try { - while (this.queue.size) { - const toExecute = this.queue[Symbol.iterator]().next().value; - if (toExecute) { - this.queue.delete(toExecute); - await this.executionSem.acquire(); - - this.execute(toExecute).finally(() => { - this.executionSem.release(); - this.addSem.release(); - }); - } - } - } finally { - this.execution = false; - } - } - - protected abstract onCapacity(): Promise; - - protected abstract execute(item: string): Promise; -} - export class TableTouchMemoryQueue extends AbstractSetMemoryQueue { public constructor( capacity: number, @@ -73,7 +14,7 @@ export class TableTouchMemoryQueue extends AbstractSetMemoryQueue { protected lastWarningDate: Date | null = null; - protected async onCapacity(): Promise { + protected onCapacity(): void { let showWarning = false; if (this.lastWarningDate) { @@ -122,7 +63,7 @@ export class TableUsedMemoryQueue extends AbstractSetMemoryQueue { protected lastWarningDate: Date | null = null; - protected async onCapacity(): Promise { + protected onCapacity(): void { let showWarning = false; if (this.lastWarningDate) {