Skip to content

Commit

Permalink
test: AbstractSetMemoryQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Nov 21, 2023
1 parent 2824569 commit a3fde6f
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 63 deletions.
3 changes: 2 additions & 1 deletion packages/cubejs-backend-shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ export * from './time';
export * from './process';
export * from './platform';
export * from './FileRepository';
export * from './Semaphore';
export * from './semaphore';
export * from './queue';
64 changes: 64 additions & 0 deletions packages/cubejs-backend-shared/src/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { Semaphore } from './semaphore';

/**
* Special in-memory queue which handles execution in background with specify concurrency limit
* It has a capacity restriction, which block adding new items to queue when the queue is full
*/
export abstract class AbstractSetMemoryQueue {
protected readonly queue: Set<string> = new Set();

protected readonly executionSem: Semaphore;

protected readonly addSem: Semaphore;

public constructor(
protected readonly capacity: number,
concurrency: number,
) {
this.executionSem = new Semaphore(concurrency);
this.addSem = new Semaphore(capacity);
}

protected execution: boolean = false;

public async addToQueue(item: string) {
const next = this.addSem.acquire();
this.queue.add(item);

if (this.queue.size > this.capacity) {
this.onCapacity();
}

this.run().catch(e => console.error(e));
await next;
}

public async run(): Promise<void> {
if (this.execution) {
return;
}

this.execution = true;

try {
while (this.queue.size) {
const toExecute = this.queue[Symbol.iterator]().next().value;
if (toExecute) {
this.queue.delete(toExecute);
await this.executionSem.acquire();

this.execute(toExecute).finally(() => {
this.executionSem.release();
this.addSem.release();
});
}
}
} finally {
this.execution = false;
}
}

protected abstract onCapacity(): void;

protected abstract execute(item: string): Promise<void>;
}
File renamed without changes.
67 changes: 67 additions & 0 deletions packages/cubejs-backend-shared/test/queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { AbstractSetMemoryQueue, pausePromise } from '../src';

describe('AbstractSetMemoryQueue', () => {
it('concurrency(1)', async () => {
jest.setTimeout(10 * 1000);

let capacityFullWarnings = 0;
const executed: string[] = [];

class TestQueue extends AbstractSetMemoryQueue {
protected async execute(item: string): Promise<void> {
executed.push(item);

await pausePromise(15);
}

protected onCapacity(): void {
capacityFullWarnings++;
}
}

const all = [];

const queue = new TestQueue(
5,
1
);

all.push(queue.addToQueue('a'));
all.push(queue.addToQueue('ab')); all.push(queue.addToQueue('ab'));
all.push(queue.addToQueue('abc'));
all.push(queue.addToQueue('abcd'));
all.push(queue.addToQueue('abcde')); all.push(queue.addToQueue('abcde'));
all.push(queue.addToQueue('abcdef'));
all.push(queue.addToQueue('abcdefg'));
all.push(queue.addToQueue('abcdefgh'));
all.push(queue.addToQueue('abcdefghk')); all.push(queue.addToQueue('abcdefghk'));
all.push(queue.addToQueue('abcdefghkl'));
all.push(queue.addToQueue('abcdefghklm'));
all.push(queue.addToQueue('abcdefghklmn')); all.push(queue.addToQueue('abcdefghklmn'));
all.push(queue.addToQueue('abcdefghklmno'));
all.push(queue.addToQueue('abcdefghklmnop'));
all.push(queue.addToQueue('abcdefghklmnopw'));

await Promise.all(all);

expect(executed.length).toEqual(15);
expect(capacityFullWarnings).toBeGreaterThan(10);
expect(executed).toEqual([
'a',
'ab',
'abc',
'abcd',
'abcde',
'abcdef',
'abcdefg',
'abcdefgh',
'abcdefghk',
'abcdefghkl',
'abcdefghklm',
'abcdefghklmn',
'abcdefghklmno',
'abcdefghklmnop',
'abcdefghklmnopw',
]);
});
});
65 changes: 3 additions & 62 deletions packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts
Original file line number Diff line number Diff line change
@@ -1,65 +1,6 @@
import { Semaphore } from '@cubejs-backend/shared';
import { AbstractSetMemoryQueue } from '@cubejs-backend/shared';
import { QueryCache } from './QueryCache';

export abstract class AbstractSetMemoryQueue {
protected readonly queue: Set<string> = new Set();

protected readonly executionSem: Semaphore;

protected readonly addSem: Semaphore;

public constructor(
protected readonly capacity: number,
concurrency: number,
) {
this.executionSem = new Semaphore(concurrency);
this.addSem = new Semaphore(capacity);
}

protected execution: boolean = false;

public async addToQueue(item: string) {
const next = this.addSem.acquire();
this.queue.add(item);

if (this.queue.size > this.capacity) {
await this.onCapacity();
}

this.run().catch(e => console.log(e));
await next;
}

public async run(): Promise<void> {
if (this.execution) {
return;
}

this.execution = true;

try {
while (this.queue.size) {
const toExecute = this.queue[Symbol.iterator]().next().value;
if (toExecute) {
this.queue.delete(toExecute);
await this.executionSem.acquire();

this.execute(toExecute).finally(() => {
this.executionSem.release();
this.addSem.release();
});
}
}
} finally {
this.execution = false;
}
}

protected abstract onCapacity(): Promise<void>;

protected abstract execute(item: string): Promise<void>;
}

export class TableTouchMemoryQueue extends AbstractSetMemoryQueue {
public constructor(
capacity: number,
Expand All @@ -73,7 +14,7 @@ export class TableTouchMemoryQueue extends AbstractSetMemoryQueue {

protected lastWarningDate: Date | null = null;

protected async onCapacity(): Promise<void> {
protected onCapacity(): void {
let showWarning = false;

Check warning on line 18 in packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts#L17-L18

Added lines #L17 - L18 were not covered by tests

if (this.lastWarningDate) {
Expand Down Expand Up @@ -122,7 +63,7 @@ export class TableUsedMemoryQueue extends AbstractSetMemoryQueue {

protected lastWarningDate: Date | null = null;

protected async onCapacity(): Promise<void> {
protected onCapacity(): void {
let showWarning = false;

Check warning on line 67 in packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-query-orchestrator/src/orchestrator/MemoryQueue.ts#L66-L67

Added lines #L66 - L67 were not covered by tests

if (this.lastWarningDate) {
Expand Down

0 comments on commit a3fde6f

Please sign in to comment.