From 1d4dcbfbe9496f7bb81f0b8112eba8cce2ce3c41 Mon Sep 17 00:00:00 2001 From: kisshan13 Date: Thu, 23 Jan 2025 20:38:24 +0530 Subject: [PATCH 1/2] Fix (#214) : added typing for consistent return type for add() method --- source/index.ts | 157 +++++++++++++++++++++++++++++----------------- source/options.ts | 28 ++++++--- 2 files changed, 122 insertions(+), 63 deletions(-) diff --git a/source/index.ts b/source/index.ts index 9a16cdb..44e2ace 100644 --- a/source/index.ts +++ b/source/index.ts @@ -1,19 +1,38 @@ -import {EventEmitter} from 'eventemitter3'; -import pTimeout, {TimeoutError} from 'p-timeout'; -import {type Queue, type RunFunction} from './queue.js'; -import PriorityQueue from './priority-queue.js'; -import {type QueueAddOptions, type Options, type TaskOptions} from './options.js'; +import { EventEmitter } from "eventemitter3"; +import pTimeout, { TimeoutError } from "p-timeout"; +import { type Queue, type RunFunction } from "./queue.js"; +import PriorityQueue from "./priority-queue.js"; +import { + type QueueAddOptions, + type Options, + type TaskOptions, + Truthy, + Falsy, + BooleanTypeReturn, +} from "./options.js"; type Task = | ((options: TaskOptions) => PromiseLike) | ((options: TaskOptions) => TaskResultType); -type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error'; +type EventName = + | "active" + | "idle" + | "empty" + | "add" + | "next" + | "completed" + | "error"; /** Promise queue with concurrency control. */ -export default class PQueue = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter { // eslint-disable-line @typescript-eslint/naming-convention, unicorn/prefer-event-target +export default class PQueue< + T extends Truthy | Falsy, + QueueType extends Queue = PriorityQueue, + EnqueueOptionsType extends QueueAddOptions = QueueAddOptions +> extends EventEmitter { + // eslint-disable-line @typescript-eslint/naming-convention, unicorn/prefer-event-target readonly #carryoverConcurrencyCount: boolean; readonly #isIntervalIgnored: boolean; @@ -54,7 +73,7 @@ export default class PQueue) { + constructor(options?: Options) { super(); // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -66,18 +85,33 @@ export default class PQueue; - - if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) { - throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${options.intervalCap?.toString() ?? ''}\` (${typeof options.intervalCap})`); + } as Options; + + if ( + !(typeof options.intervalCap === "number" && options.intervalCap >= 1) + ) { + throw new TypeError( + `Expected \`intervalCap\` to be a number from 1 and up, got \`${ + options.intervalCap?.toString() ?? "" + }\` (${typeof options.intervalCap})` + ); } - if (options.interval === undefined || !(Number.isFinite(options.interval) && options.interval >= 0)) { - throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${options.interval?.toString() ?? ''}\` (${typeof options.interval})`); + if ( + options.interval === undefined || + !(Number.isFinite(options.interval) && options.interval >= 0) + ) { + throw new TypeError( + `Expected \`interval\` to be a finite number >= 0, got \`${ + options.interval?.toString() ?? "" + }\` (${typeof options.interval})` + ); } this.#carryoverConcurrencyCount = options.carryoverConcurrencyCount!; - this.#isIntervalIgnored = options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0; + this.#isIntervalIgnored = + options.intervalCap === Number.POSITIVE_INFINITY || + options.interval === 0; this.#intervalCap = options.intervalCap; this.#interval = options.interval; this.#queue = new options.queueClass!(); @@ -99,7 +133,7 @@ export default class PQueue { - this.#onResumeInterval(); - }, - delay, - ); + this.#timeoutId = setTimeout(() => { + this.#onResumeInterval(); + }, delay); } return true; @@ -145,10 +178,10 @@ export default class PQueue { - this.#onInterval(); - }, - this.#interval, - ); + this.#intervalId = setInterval(() => { + this.#onInterval(); + }, this.#interval); this.#intervalEnd = Date.now() + this.#interval; } @@ -214,8 +244,10 @@ export default class PQueue= 1)) { - throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})`); + if (!(typeof newConcurrency === "number" && newConcurrency >= 1)) { + throw new TypeError( + `Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})` + ); } this.#concurrency = newConcurrency; @@ -225,9 +257,13 @@ export default class PQueue { return new Promise((_resolve, reject) => { - signal.addEventListener('abort', () => { - reject(signal.reason); - }, {once: true}); + signal.addEventListener( + "abort", + () => { + reject(signal.reason); + }, + { once: true } + ); }); } @@ -274,9 +310,9 @@ export default class PQueue(function_: Task, options: {throwOnTimeout: true} & Exclude): Promise; - async add(function_: Task, options?: Partial): Promise; - async add(function_: Task, options: Partial = {}): Promise { + async add(function_: Task, options: { throwOnTimeout: true } & Exclude): Promise; + async add(function_: Task, options?: Partial): Promise>; + async add(function_: Task, options: Partial = {}): Promise> { // In case `id` is not defined. options.id ??= (this.#idAssigner++).toString(); @@ -294,19 +330,24 @@ export default class PQueue( functions: ReadonlyArray>, - options?: {throwOnTimeout: true} & Partial>, + options?: { throwOnTimeout: true } & Partial< + Exclude + > ): Promise; async addAll( functions: ReadonlyArray>, - options?: Partial, + options?: Partial ): Promise>; async addAll( functions: ReadonlyArray>, - options?: Partial, + options?: Partial ): Promise> { - return Promise.all(functions.map(async function_ => this.add(function_, options))); + return Promise.all( + functions.map(async (function_) => this.add(function_, options)) + ); } /** @@ -385,7 +430,7 @@ export default class PQueue this.#queue.size < limit); + await this.#onEvent("next", () => this.#queue.size < limit); } /** @@ -415,11 +460,11 @@ export default class PQueue boolean): Promise { - return new Promise(resolve => { + return new Promise((resolve) => { const listener = () => { if (filter && !filter()) { return; @@ -465,5 +510,5 @@ export default class PQueue = T extends Truthy ? TruthyReturn : FalsyReturn; + +type TimeoutOptions = { /** Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already. */ @@ -11,10 +20,14 @@ type TimeoutOptions = { @default false */ - throwOnTimeout?: boolean; + throwOnTimeout?: T; }; -export type Options, QueueOptions extends QueueAddOptions> = { +export type Options< + QueueType extends Queue, + QueueOptions extends QueueAddOptions, + Throw extends Falsy | Truthy +> = { /** Concurrency limit. @@ -60,9 +73,9 @@ export type Options, QueueOpt @default false */ readonly carryoverConcurrencyCount?: boolean; -} & TimeoutOptions; +} & TimeoutOptions; -export type QueueAddOptions = { +export type QueueAddOptions = { /** Priority of operation. Operations with greater priority will be scheduled first. @@ -74,7 +87,8 @@ export type QueueAddOptions = { Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`. */ id?: string; -} & TaskOptions & TimeoutOptions; +} & TaskOptions & + TimeoutOptions; export type TaskOptions = { /** From f45a83573d056d10259393284ffad67f623125fd Mon Sep 17 00:00:00 2001 From: kisshan13 Date: Thu, 23 Jan 2025 22:20:11 +0530 Subject: [PATCH 2/2] Fix added return types of .addAll() --- source/index.ts | 42 ++- test/test.ts | 820 +++++++++++++++++++++++++++--------------------- 2 files changed, 490 insertions(+), 372 deletions(-) diff --git a/source/index.ts b/source/index.ts index 44e2ace..3381969 100644 --- a/source/index.ts +++ b/source/index.ts @@ -30,7 +30,7 @@ Promise queue with concurrency control. export default class PQueue< T extends Truthy | Falsy, QueueType extends Queue = PriorityQueue, - EnqueueOptionsType extends QueueAddOptions = QueueAddOptions + EnqueueOptionsType extends QueueAddOptions = QueueAddOptions, > extends EventEmitter { // eslint-disable-line @typescript-eslint/naming-convention, unicorn/prefer-event-target readonly #carryoverConcurrencyCount: boolean; @@ -93,7 +93,7 @@ export default class PQueue< throw new TypeError( `Expected \`intervalCap\` to be a number from 1 and up, got \`${ options.intervalCap?.toString() ?? "" - }\` (${typeof options.intervalCap})` + }\` (${typeof options.intervalCap})`, ); } @@ -104,7 +104,7 @@ export default class PQueue< throw new TypeError( `Expected \`interval\` to be a finite number >= 0, got \`${ options.interval?.toString() ?? "" - }\` (${typeof options.interval})` + }\` (${typeof options.interval})`, ); } @@ -246,7 +246,7 @@ export default class PQueue< set concurrency(newConcurrency: number) { if (!(typeof newConcurrency === "number" && newConcurrency >= 1)) { throw new TypeError( - `Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})` + `Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})`, ); } @@ -262,7 +262,7 @@ export default class PQueue< () => { reject(signal.reason); }, - { once: true } + { once: true }, ); }); } @@ -310,9 +310,21 @@ export default class PQueue< /** Adds a sync or async task to the queue. Always returns a promise. */ - async add(function_: Task, options: { throwOnTimeout: true } & Exclude): Promise; - async add(function_: Task, options?: Partial): Promise>; - async add(function_: Task, options: Partial = {}): Promise> { + async add( + function_: Task, + options: { throwOnTimeout: true } & Exclude< + EnqueueOptionsType, + "throwOnTimeout" + >, + ): Promise; + async add( + function_: Task, + options?: Partial, + ): Promise>; + async add( + function_: Task, + options: Partial = {}, + ): Promise> { // In case `id` is not defined. options.id ??= (this.#idAssigner++).toString(); @@ -376,18 +388,18 @@ export default class PQueue< functions: ReadonlyArray>, options?: { throwOnTimeout: true } & Partial< Exclude - > - ): Promise; + >, + ): Promise[]>; async addAll( functions: ReadonlyArray>, - options?: Partial - ): Promise>; + options?: Partial, + ): Promise[]>; async addAll( functions: ReadonlyArray>, - options?: Partial - ): Promise> { + options?: Partial, + ): Promise[]> { return Promise.all( - functions.map(async (function_) => this.add(function_, options)) + functions.map(async (function_) => this.add(function_, options)), ); } diff --git a/test/test.ts b/test/test.ts index 748ee29..e43ed1b 100644 --- a/test/test.ts +++ b/test/test.ts @@ -1,16 +1,16 @@ /* eslint-disable no-new */ -import EventEmitter from 'eventemitter3'; -import test from 'ava'; -import delay from 'delay'; -import inRange from 'in-range'; -import timeSpan from 'time-span'; -import randomInt from 'random-int'; -import pDefer from 'p-defer'; -import PQueue from '../source/index.js'; - -const fixture = Symbol('fixture'); - -test('.add()', async t => { +import EventEmitter from "eventemitter3"; +import test from "ava"; +import delay from "delay"; +import inRange from "in-range"; +import timeSpan from "time-span"; +import randomInt from "random-int"; +import pDefer from "p-defer"; +import PQueue from "../source/index.js"; + +const fixture = Symbol("fixture"); + +test(".add()", async (t) => { const queue = new PQueue(); const promise = queue.add(async () => fixture); t.is(queue.size, 0); @@ -18,8 +18,8 @@ test('.add()', async t => { t.is(await promise, fixture); }); -test('.add() - limited concurrency', async t => { - const queue = new PQueue({concurrency: 2}); +test(".add() - limited concurrency", async (t) => { + const queue = new PQueue({ concurrency: 2 }); const promise = queue.add(async () => fixture); const promise2 = queue.add(async () => { await delay(100); @@ -33,7 +33,7 @@ test('.add() - limited concurrency', async t => { t.is(await promise3, fixture); }); -test('.add() - concurrency: 1', async t => { +test(".add() - concurrency: 1", async (t) => { const input = [ [10, 300], [20, 200], @@ -41,149 +41,164 @@ test('.add() - concurrency: 1', async t => { ]; const end = timeSpan(); - const queue = new PQueue({concurrency: 1}); + const queue = new PQueue({ concurrency: 1 }); - const mapper = async ([value, ms]: readonly number[]) => queue.add(async () => { - await delay(ms!); - return value!; - }); + const mapper = async ([value, ms]: readonly number[]) => + queue.add(async () => { + await delay(ms!); + return value!; + }); // eslint-disable-next-line unicorn/no-array-callback-reference t.deepEqual(await Promise.all(input.map(mapper)), [10, 20, 30]); - t.true(inRange(end(), {start: 590, end: 650})); + t.true(inRange(end(), { start: 590, end: 650 })); }); -test('.add() - concurrency: 5', async t => { +test(".add() - concurrency: 5", async (t) => { const concurrency = 5; - const queue = new PQueue({concurrency}); + const queue = new PQueue({ concurrency }); let running = 0; - const input = Array.from({length: 100}).fill(0).map(async () => queue.add(async () => { - running++; - t.true(running <= concurrency); - t.true(queue.pending <= concurrency); - await delay(randomInt(30, 200)); - running--; - })); + const input = Array.from({ length: 100 }) + .fill(0) + .map(async () => + queue.add(async () => { + running++; + t.true(running <= concurrency); + t.true(queue.pending <= concurrency); + await delay(randomInt(30, 200)); + running--; + }), + ); await Promise.all(input); }); -test('.add() - update concurrency', async t => { +test(".add() - update concurrency", async (t) => { let concurrency = 5; - const queue = new PQueue({concurrency}); + const queue = new PQueue({ concurrency }); let running = 0; - const input = Array.from({length: 100}).fill(0).map(async (_value, index) => queue.add(async () => { - running++; + const input = Array.from({ length: 100 }) + .fill(0) + .map(async (_value, index) => + queue.add(async () => { + running++; - t.true(running <= concurrency); - t.true(queue.pending <= concurrency); + t.true(running <= concurrency); + t.true(queue.pending <= concurrency); - await delay(randomInt(30, 200)); - running--; + await delay(randomInt(30, 200)); + running--; - if (index % 30 === 0) { - queue.concurrency = --concurrency; - t.is(queue.concurrency, concurrency); - } - })); + if (index % 30 === 0) { + queue.concurrency = --concurrency; + t.is(queue.concurrency, concurrency); + } + }), + ); await Promise.all(input); }); -test('.add() - priority', async t => { +test(".add() - priority", async (t) => { const result: number[] = []; - const queue = new PQueue({concurrency: 1}); - queue.add(async () => result.push(1), {priority: 1}); - queue.add(async () => result.push(0), {priority: 0}); - queue.add(async () => result.push(1), {priority: 1}); - queue.add(async () => result.push(2), {priority: 1}); - queue.add(async () => result.push(3), {priority: 2}); - queue.add(async () => result.push(0), {priority: -1}); + const queue = new PQueue({ concurrency: 1 }); + queue.add(async () => result.push(1), { priority: 1 }); + queue.add(async () => result.push(0), { priority: 0 }); + queue.add(async () => result.push(1), { priority: 1 }); + queue.add(async () => result.push(2), { priority: 1 }); + queue.add(async () => result.push(3), { priority: 2 }); + queue.add(async () => result.push(0), { priority: -1 }); await queue.onEmpty(); t.deepEqual(result, [1, 3, 1, 2, 0, 0]); }); -test('.sizeBy() - priority', async t => { +test(".sizeBy() - priority", async (t) => { const queue = new PQueue(); queue.pause(); - queue.add(async () => 0, {priority: 1}); - queue.add(async () => 0, {priority: 0}); - queue.add(async () => 0, {priority: 1}); - t.is(queue.sizeBy({priority: 1}), 2); - t.is(queue.sizeBy({priority: 0}), 1); + queue.add(async () => 0, { priority: 1 }); + queue.add(async () => 0, { priority: 0 }); + queue.add(async () => 0, { priority: 1 }); + t.is(queue.sizeBy({ priority: 1 }), 2); + t.is(queue.sizeBy({ priority: 0 }), 1); queue.clear(); await queue.onEmpty(); - t.is(queue.sizeBy({priority: 1}), 0); - t.is(queue.sizeBy({priority: 0}), 0); + t.is(queue.sizeBy({ priority: 1 }), 0); + t.is(queue.sizeBy({ priority: 0 }), 0); }); -test('.add() - timeout without throwing', async t => { +test(".add() - timeout without throwing", async (t) => { const result: string[] = []; - const queue = new PQueue({timeout: 300, throwOnTimeout: false}); + const queue = new PQueue({ timeout: 300, throwOnTimeout: false }); queue.add(async () => { await delay(400); - result.push('🐌'); + result.push("🐌"); }); queue.add(async () => { await delay(250); - result.push('🦆'); + result.push("🦆"); }); queue.add(async () => { await delay(310); - result.push('🐢'); + result.push("🐢"); }); queue.add(async () => { await delay(100); - result.push('🐅'); + result.push("🐅"); }); queue.add(async () => { - result.push('⚡️'); + result.push("⚡️"); }); await queue.onIdle(); - t.deepEqual(result, ['⚡️', '🐅', '🦆']); + t.deepEqual(result, ["⚡️", "🐅", "🦆"]); }); -test.failing('.add() - timeout with throwing', async t => { +test.failing(".add() - timeout with throwing", async (t) => { const result: string[] = []; - const queue = new PQueue({timeout: 300, throwOnTimeout: true}); - await t.throwsAsync(queue.add(async () => { - await delay(400); - result.push('🐌'); - })); + const queue = new PQueue({ timeout: 300, throwOnTimeout: true }); + await t.throwsAsync( + queue.add(async () => { + await delay(400); + result.push("🐌"); + }), + ); queue.add(async () => { await delay(200); - result.push('🦆'); + result.push("🦆"); }); await queue.onIdle(); - t.deepEqual(result, ['🦆']); + t.deepEqual(result, ["🦆"]); }); -test('.add() - change timeout in between', async t => { +test(".add() - change timeout in between", async (t) => { const result: string[] = []; const initialTimeout = 50; const newTimeout = 200; - const queue = new PQueue({timeout: initialTimeout, throwOnTimeout: false, concurrency: 2}); + const queue = new PQueue({ + timeout: initialTimeout, + throwOnTimeout: false, + concurrency: 2, + }); queue.add(async () => { - const {timeout} = queue; + const { timeout } = queue; t.deepEqual(timeout, initialTimeout); await delay(300); - result.push('🐌'); + result.push("🐌"); }); queue.timeout = newTimeout; queue.add(async () => { - const {timeout} = queue; + const { timeout } = queue; t.deepEqual(timeout, newTimeout); await delay(100); - result.push('🐅'); + result.push("🐅"); }); await queue.onIdle(); - t.deepEqual(result, ['🐅']); + t.deepEqual(result, ["🐅"]); }); -test('.onEmpty()', async t => { - const queue = new PQueue({concurrency: 1}); +test(".onEmpty()", async (t) => { + const queue = new PQueue({ concurrency: 1 }); queue.add(async () => 0); queue.add(async () => 0); @@ -204,8 +219,8 @@ test('.onEmpty()', async t => { t.is(queue.size, 0); }); -test('.onIdle()', async t => { - const queue = new PQueue({concurrency: 2}); +test(".onIdle()", async (t) => { + const queue = new PQueue({ concurrency: 2 }); queue.add(async () => delay(100)); queue.add(async () => delay(100)); @@ -226,8 +241,8 @@ test('.onIdle()', async t => { t.is(queue.pending, 0); }); -test('.onSizeLessThan()', async t => { - const queue = new PQueue({concurrency: 1}); +test(".onSizeLessThan()", async (t) => { + const queue = new PQueue({ concurrency: 1 }); queue.add(async () => delay(100)); queue.add(async () => delay(100)); @@ -252,7 +267,7 @@ test('.onSizeLessThan()', async t => { t.is(queue.pending, 1); }); -test('.onIdle() - no pending', async t => { +test(".onIdle() - no pending", async (t) => { const queue = new PQueue(); t.is(queue.size, 0); t.is(queue.pending, 0); @@ -261,8 +276,8 @@ test('.onIdle() - no pending', async t => { t.is(await queue.onIdle(), undefined); }); -test('.clear()', t => { - const queue = new PQueue({concurrency: 2}); +test(".clear()", (t) => { + const queue = new PQueue({ concurrency: 2 }); queue.add(async () => delay(20_000)); queue.add(async () => delay(20_000)); queue.add(async () => delay(20_000)); @@ -275,7 +290,7 @@ test('.clear()', t => { t.is(queue.size, 0); }); -test('.addAll()', async t => { +test(".addAll()", async (t) => { const queue = new PQueue(); const fn = async (): Promise => fixture; const functions = [fn, fn]; @@ -285,125 +300,125 @@ test('.addAll()', async t => { t.deepEqual(await promise, [fixture, fixture]); }); -test('enforce number in options.concurrency', t => { +test("enforce number in options.concurrency", (t) => { t.throws( () => { - new PQueue({concurrency: 0}); + new PQueue({ concurrency: 0 }); }, - {instanceOf: TypeError}, + { instanceOf: TypeError }, ); t.throws( () => { - new PQueue({concurrency: undefined}); + new PQueue({ concurrency: undefined }); }, - {instanceOf: TypeError}, + { instanceOf: TypeError }, ); t.notThrows(() => { - new PQueue({concurrency: 1}); + new PQueue({ concurrency: 1 }); }); t.notThrows(() => { - new PQueue({concurrency: 10}); + new PQueue({ concurrency: 10 }); }); t.notThrows(() => { - new PQueue({concurrency: Number.POSITIVE_INFINITY}); + new PQueue({ concurrency: Number.POSITIVE_INFINITY }); }); }); -test('enforce number in queue.concurrency', t => { +test("enforce number in queue.concurrency", (t) => { t.throws( () => { - (new PQueue()).concurrency = 0; + new PQueue().concurrency = 0; }, - {instanceOf: TypeError}, + { instanceOf: TypeError }, ); t.throws( () => { // @ts-expect-error Testing - (new PQueue()).concurrency = undefined; + new PQueue().concurrency = undefined; }, - {instanceOf: TypeError}, + { instanceOf: TypeError }, ); t.notThrows(() => { - (new PQueue()).concurrency = 1; + new PQueue().concurrency = 1; }); t.notThrows(() => { - (new PQueue()).concurrency = 10; + new PQueue().concurrency = 10; }); t.notThrows(() => { - (new PQueue()).concurrency = Number.POSITIVE_INFINITY; + new PQueue().concurrency = Number.POSITIVE_INFINITY; }); }); -test('enforce number in options.intervalCap', t => { +test("enforce number in options.intervalCap", (t) => { t.throws( () => { - new PQueue({intervalCap: 0}); + new PQueue({ intervalCap: 0 }); }, - {instanceOf: TypeError}, + { instanceOf: TypeError }, ); t.throws( () => { - new PQueue({intervalCap: undefined}); + new PQueue({ intervalCap: undefined }); }, - {instanceOf: TypeError}, + { instanceOf: TypeError }, ); t.notThrows(() => { - new PQueue({intervalCap: 1}); + new PQueue({ intervalCap: 1 }); }); t.notThrows(() => { - new PQueue({intervalCap: 10}); + new PQueue({ intervalCap: 10 }); }); t.notThrows(() => { - new PQueue({intervalCap: Number.POSITIVE_INFINITY}); + new PQueue({ intervalCap: Number.POSITIVE_INFINITY }); }); }); -test('enforce finite in options.interval', t => { +test("enforce finite in options.interval", (t) => { t.throws( () => { - new PQueue({interval: -1}); + new PQueue({ interval: -1 }); }, - {instanceOf: TypeError}, + { instanceOf: TypeError }, ); t.throws( () => { - new PQueue({interval: undefined}); + new PQueue({ interval: undefined }); }, - {instanceOf: TypeError}, + { instanceOf: TypeError }, ); t.throws(() => { - new PQueue({interval: Number.POSITIVE_INFINITY}); + new PQueue({ interval: Number.POSITIVE_INFINITY }); }); t.notThrows(() => { - new PQueue({interval: 0}); + new PQueue({ interval: 0 }); }); t.notThrows(() => { - new PQueue({interval: 10}); + new PQueue({ interval: 10 }); }); t.throws(() => { - new PQueue({interval: Number.POSITIVE_INFINITY}); + new PQueue({ interval: Number.POSITIVE_INFINITY }); }); }); -test('autoStart: false', t => { - const queue = new PQueue({concurrency: 2, autoStart: false}); +test("autoStart: false", (t) => { + const queue = new PQueue({ concurrency: 2, autoStart: false }); queue.add(async () => delay(20_000)); queue.add(async () => delay(20_000)); @@ -422,8 +437,8 @@ test('autoStart: false', t => { t.is(queue.size, 0); }); -test('.start() - return this', async t => { - const queue = new PQueue({concurrency: 2, autoStart: false}); +test(".start() - return this", async (t) => { + const queue = new PQueue({ concurrency: 2, autoStart: false }); queue.add(async () => delay(100)); queue.add(async () => delay(100)); @@ -435,7 +450,7 @@ test('.start() - return this', async t => { t.is(queue.pending, 0); }); -test('.start() - not paused', t => { +test(".start() - not paused", (t) => { const queue = new PQueue(); t.falsy(queue.isPaused); @@ -445,8 +460,8 @@ test('.start() - not paused', t => { t.falsy(queue.isPaused); }); -test('.pause()', t => { - const queue = new PQueue({concurrency: 2}); +test(".pause()", (t) => { + const queue = new PQueue({ concurrency: 2 }); queue.pause(); queue.add(async () => delay(20_000)); @@ -478,11 +493,11 @@ test('.pause()', t => { t.is(queue.size, 0); }); -test('.add() sync/async mixed tasks', async t => { - const queue = new PQueue({concurrency: 1}); - queue.add(() => 'sync 1'); +test(".add() sync/async mixed tasks", async (t) => { + const queue = new PQueue({ concurrency: 1 }); + queue.add(() => "sync 1"); queue.add(async () => delay(1000)); - queue.add(() => 'sync 2'); + queue.add(() => "sync 2"); queue.add(() => fixture); t.is(queue.size, 3); t.is(queue.pending, 1); @@ -491,38 +506,34 @@ test('.add() sync/async mixed tasks', async t => { t.is(queue.pending, 0); }); -test.failing('.add() - handle task throwing error', async t => { - const queue = new PQueue({concurrency: 1}); +test.failing(".add() - handle task throwing error", async (t) => { + const queue = new PQueue({ concurrency: 1 }); - queue.add(() => 'sync 1'); + queue.add(() => "sync 1"); await t.throwsAsync( - queue.add( - () => { - throw new Error('broken'); - }, - ), - {message: 'broken'}, + queue.add(() => { + throw new Error("broken"); + }), + { message: "broken" }, ); - queue.add(() => 'sync 2'); + queue.add(() => "sync 2"); t.is(queue.size, 2); await queue.onIdle(); }); -test('.add() - handle task promise failure', async t => { - const queue = new PQueue({concurrency: 1}); +test(".add() - handle task promise failure", async (t) => { + const queue = new PQueue({ concurrency: 1 }); await t.throwsAsync( - queue.add( - async () => { - throw new Error('broken'); - }, - ), - {message: 'broken'}, + queue.add(async () => { + throw new Error("broken"); + }), + { message: "broken" }, ); - queue.add(() => 'task #1'); + queue.add(() => "task #1"); t.is(queue.pending, 1); @@ -531,13 +542,13 @@ test('.add() - handle task promise failure', async t => { t.is(queue.pending, 0); }); -test('.addAll() sync/async mixed tasks', async t => { +test(".addAll() sync/async mixed tasks", async (t) => { const queue = new PQueue(); - const functions: Array<() => (string | Promise | Promise)> = [ - () => 'sync 1', + const functions: Array<() => string | Promise | Promise> = [ + () => "sync 1", async () => delay(2000), - () => 'sync 2', + () => "sync 2", async () => fixture, ]; @@ -545,11 +556,11 @@ test('.addAll() sync/async mixed tasks', async t => { t.is(queue.size, 0); t.is(queue.pending, 4); - t.deepEqual(await promise, ['sync 1', undefined, 'sync 2', fixture]); + t.deepEqual(await promise, ["sync 1", undefined, "sync 2", fixture]); }); -test('should resolve empty when size is zero', async t => { - const queue = new PQueue({concurrency: 1, autoStart: false}); +test("should resolve empty when size is zero", async (t) => { + const queue = new PQueue({ concurrency: 1, autoStart: false }); // It should take 1 seconds to resolve all tasks for (let index = 0; index < 100; index++) { @@ -564,19 +575,16 @@ test('should resolve empty when size is zero', async t => { queue.start(); // Pause at 0.5 second - setTimeout( - async () => { - queue.pause(); - await delay(10); - queue.start(); - }, - 500, - ); + setTimeout(async () => { + queue.pause(); + await delay(10); + queue.start(); + }, 500); await queue.onIdle(); }); -test('.add() - throttled', async t => { +test(".add() - throttled", async (t) => { const result: number[] = []; const queue = new PQueue({ intervalCap: 1, @@ -592,7 +600,7 @@ test('.add() - throttled', async t => { t.deepEqual(result, [1, 2]); }); -test('.add() - throttled, carryoverConcurrencyCount false', async t => { +test(".add() - throttled, carryoverConcurrencyCount false", async (t) => { const result: number[] = []; const queue = new PQueue({ @@ -628,7 +636,7 @@ test('.add() - throttled, carryoverConcurrencyCount false', async t => { t.deepEqual(result, values); }); -test('.add() - throttled, carryoverConcurrencyCount true', async t => { +test(".add() - throttled, carryoverConcurrencyCount true", async (t) => { const result: number[] = []; const queue = new PQueue({ @@ -675,7 +683,7 @@ test('.add() - throttled, carryoverConcurrencyCount true', async t => { t.deepEqual(result, values); }); -test('.add() - throttled 10, concurrency 5', async t => { +test(".add() - throttled 10, concurrency 5", async (t) => { const result: number[] = []; const queue = new PQueue({ @@ -685,9 +693,9 @@ test('.add() - throttled 10, concurrency 5', async t => { autoStart: false, }); - const firstValue = [...Array.from({length: 5}).keys()]; - const secondValue = [...Array.from({length: 10}).keys()]; - const thirdValue = [...Array.from({length: 13}).keys()]; + const firstValue = [...Array.from({ length: 5 }).keys()]; + const secondValue = [...Array.from({ length: 10 }).keys()]; + const thirdValue = [...Array.from({ length: 13 }).keys()]; for (const value of thirdValue) { queue.add(async () => { @@ -721,7 +729,7 @@ test('.add() - throttled 10, concurrency 5', async t => { t.deepEqual(result, thirdValue); }); -test('.add() - throttled finish and resume', async t => { +test(".add() - throttled finish and resume", async (t) => { const result: number[] = []; const queue = new PQueue({ @@ -763,7 +771,7 @@ test('.add() - throttled finish and resume', async t => { t.deepEqual(result, secondValue); }); -test('pause should work when throttled', async t => { +test("pause should work when throttled", async (t) => { const result: number[] = []; const queue = new PQueue({ @@ -814,7 +822,7 @@ test('pause should work when throttled', async t => { await delay(2500); }); -test('clear interval on pause', async t => { +test("clear interval on pause", async (t) => { const queue = new PQueue({ interval: 100, intervalCap: 1, @@ -824,24 +832,24 @@ test('clear interval on pause', async t => { queue.pause(); }); - queue.add(() => 'task #1'); + queue.add(() => "task #1"); await delay(300); t.is(queue.size, 1); }); -test('should be an event emitter', t => { +test("should be an event emitter", (t) => { const queue = new PQueue(); t.true(queue instanceof EventEmitter); }); -test('should emit active event per item', async t => { +test("should emit active event per item", async (t) => { const items = [0, 1, 2, 3, 4]; const queue = new PQueue(); let eventCount = 0; - queue.on('active', () => { + queue.on("active", () => { eventCount++; }); @@ -854,11 +862,11 @@ test('should emit active event per item', async t => { t.is(eventCount, items.length); }); -test('should emit idle event when idle', async t => { - const queue = new PQueue({concurrency: 1}); +test("should emit idle event when idle", async (t) => { + const queue = new PQueue({ concurrency: 1 }); let timesCalled = 0; - queue.on('idle', () => { + queue.on("idle", () => { timesCalled++; }); @@ -893,16 +901,16 @@ test('should emit idle event when idle', async t => { t.is(timesCalled, 2); }); -test('should emit empty event when empty', async t => { - const queue = new PQueue({concurrency: 1}); +test("should emit empty event when empty", async (t) => { + const queue = new PQueue({ concurrency: 1 }); let timesCalled = 0; - queue.on('empty', () => { + queue.on("empty", () => { timesCalled++; }); - const {resolve: resolveJob1, promise: job1Promise} = pDefer(); - const {resolve: resolveJob2, promise: job2Promise} = pDefer(); + const { resolve: resolveJob1, promise: job1Promise } = pDefer(); + const { resolve: resolveJob2, promise: job2Promise } = pDefer(); const job1 = queue.add(async () => job1Promise); const job2 = queue.add(async () => job2Promise); @@ -925,11 +933,11 @@ test('should emit empty event when empty', async t => { t.is(timesCalled, 1); }); -test('should emit add event when adding task', async t => { - const queue = new PQueue({concurrency: 1}); +test("should emit add event when adding task", async (t) => { + const queue = new PQueue({ concurrency: 1 }); let timesCalled = 0; - queue.on('add', () => { + queue.on("add", () => { timesCalled++; }); @@ -969,11 +977,11 @@ test('should emit add event when adding task', async t => { t.is(timesCalled, 3); }); -test('should emit next event when completing task', async t => { - const queue = new PQueue({concurrency: 1}); +test("should emit next event when completing task", async (t) => { + const queue = new PQueue({ concurrency: 1 }); let timesCalled = 0; - queue.on('next', () => { + queue.on("next", () => { timesCalled++; }); @@ -1013,15 +1021,15 @@ test('should emit next event when completing task', async t => { t.is(timesCalled, 3); }); -test('should emit completed / error events', async t => { - const queue = new PQueue({concurrency: 1}); +test("should emit completed / error events", async (t) => { + const queue = new PQueue({ concurrency: 1 }); let errorEvents = 0; let completedEvents = 0; - queue.on('error', () => { + queue.on("error", () => { errorEvents++; }); - queue.on('completed', () => { + queue.on("completed", () => { completedEvents++; }); @@ -1034,7 +1042,7 @@ test('should emit completed / error events', async t => { const job2 = queue.add(async () => { await delay(1); - throw new Error('failure'); + throw new Error("failure"); }); t.is(queue.pending, 1); @@ -1070,228 +1078,326 @@ test('should emit completed / error events', async t => { t.is(completedEvents, 2); }); -test('should verify timeout overrides passed to add', async t => { - const queue = new PQueue({timeout: 200, throwOnTimeout: true}); +test("should verify timeout overrides passed to add", async (t) => { + const queue = new PQueue({ timeout: 200, throwOnTimeout: true }); - await t.throwsAsync(queue.add(async () => { - await delay(400); - })); + await t.throwsAsync( + queue.add(async () => { + await delay(400); + }), + ); - await t.notThrowsAsync(queue.add(async () => { - await delay(400); - }, {throwOnTimeout: false})); + await t.notThrowsAsync( + queue.add( + async () => { + await delay(400); + }, + { throwOnTimeout: false }, + ), + ); - await t.notThrowsAsync(queue.add(async () => { - await delay(400); - }, {timeout: 600})); + await t.notThrowsAsync( + queue.add( + async () => { + await delay(400); + }, + { timeout: 600 }, + ), + ); - await t.notThrowsAsync(queue.add(async () => { - await delay(100); - })); + await t.notThrowsAsync( + queue.add(async () => { + await delay(100); + }), + ); - await t.throwsAsync(queue.add(async () => { - await delay(100); - }, {timeout: 50})); + await t.throwsAsync( + queue.add( + async () => { + await delay(100); + }, + { timeout: 50 }, + ), + ); await queue.onIdle(); }); -test('should skip an aborted job', async t => { +test("should skip an aborted job", async (t) => { const queue = new PQueue(); const controller = new AbortController(); controller.abort(); // eslint-disable-next-line @typescript-eslint/no-empty-function - await t.throwsAsync(queue.add(() => {}, {signal: controller.signal}), { - instanceOf: DOMException, - }); + await t.throwsAsync( + queue.add(() => {}, { signal: controller.signal }), + { + instanceOf: DOMException, + }, + ); }); -test('should pass AbortSignal instance to job', async t => { +test("should pass AbortSignal instance to job", async (t) => { const queue = new PQueue(); const controller = new AbortController(); - await queue.add(async ({signal}) => { - t.is(controller.signal, signal!); - }, {signal: controller.signal}); + await queue.add( + async ({ signal }) => { + t.is(controller.signal, signal!); + }, + { signal: controller.signal }, + ); }); -test('aborting multiple jobs at the same time', async t => { - const queue = new PQueue({concurrency: 1}); +test("aborting multiple jobs at the same time", async (t) => { + const queue = new PQueue({ concurrency: 1 }); const controller1 = new AbortController(); const controller2 = new AbortController(); - const task1 = queue.add(async () => new Promise(() => {}), {signal: controller1.signal}); // eslint-disable-line @typescript-eslint/no-empty-function - const task2 = queue.add(async () => new Promise(() => {}), {signal: controller2.signal}); // eslint-disable-line @typescript-eslint/no-empty-function + const task1 = queue.add(async () => new Promise(() => {}), { + signal: controller1.signal, + }); // eslint-disable-line @typescript-eslint/no-empty-function + const task2 = queue.add(async () => new Promise(() => {}), { + signal: controller2.signal, + }); // eslint-disable-line @typescript-eslint/no-empty-function setTimeout(() => { controller1.abort(); controller2.abort(); }, 0); - await t.throwsAsync(task1, {instanceOf: DOMException}); - await t.throwsAsync(task2, {instanceOf: DOMException}); - t.like(queue, {size: 0, pending: 0}); + await t.throwsAsync(task1, { instanceOf: DOMException }); + await t.throwsAsync(task2, { instanceOf: DOMException }); + t.like(queue, { size: 0, pending: 0 }); }); -test('.setPriority() - execute a promise before planned', async t => { +test(".setPriority() - execute a promise before planned", async (t) => { const result: string[] = []; - const queue = new PQueue({concurrency: 1}); - queue.add(async () => { - await delay(400); - result.push('🐌'); - }, {id: '🐌'}); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {id: '🦆'}); - queue.add(async () => { - await delay(400); - result.push('🐢'); - }, {id: '🐢'}); - queue.setPriority('🐢', 1); + const queue = new PQueue({ concurrency: 1 }); + queue.add( + async () => { + await delay(400); + result.push("🐌"); + }, + { id: "🐌" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦆"); + }, + { id: "🦆" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🐢"); + }, + { id: "🐢" }, + ); + queue.setPriority("🐢", 1); await queue.onIdle(); - t.deepEqual(result, ['🐌', '🐢', '🦆']); + t.deepEqual(result, ["🐌", "🐢", "🦆"]); }); -test('.setPriority() - execute a promise after planned', async t => { +test(".setPriority() - execute a promise after planned", async (t) => { const result: string[] = []; - const queue = new PQueue({concurrency: 1}); - queue.add(async () => { - await delay(400); - result.push('🐌'); - }, {id: '🐌'}); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {id: '🦆'}); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {id: '🦆'}); - queue.add(async () => { - await delay(400); - result.push('🐢'); - }, {id: '🐢'}); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {id: '🦆'}); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {id: '🦆'}); - queue.setPriority('🐢', -1); + const queue = new PQueue({ concurrency: 1 }); + queue.add( + async () => { + await delay(400); + result.push("🐌"); + }, + { id: "🐌" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦆"); + }, + { id: "🦆" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦆"); + }, + { id: "🦆" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🐢"); + }, + { id: "🐢" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦆"); + }, + { id: "🦆" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦆"); + }, + { id: "🦆" }, + ); + queue.setPriority("🐢", -1); await queue.onIdle(); - t.deepEqual(result, ['🐌', '🦆', '🦆', '🦆', '🦆', '🐢']); + t.deepEqual(result, ["🐌", "🦆", "🦆", "🦆", "🦆", "🐢"]); }); -test('.setPriority() - execute a promise before planned - concurrency 2', async t => { +test(".setPriority() - execute a promise before planned - concurrency 2", async (t) => { const result: string[] = []; - const queue = new PQueue({concurrency: 2}); - queue.add(async () => { - await delay(400); - result.push('🐌'); - }, {id: '🐌'}); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {id: '🦆'}); - queue.add(async () => { - await delay(400); - result.push('🐢'); - }, {id: '🐢'}); - queue.add(async () => { - await delay(400); - result.push('⚡️'); - }, {id: '⚡️'}); - queue.setPriority('⚡️', 1); + const queue = new PQueue({ concurrency: 2 }); + queue.add( + async () => { + await delay(400); + result.push("🐌"); + }, + { id: "🐌" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦆"); + }, + { id: "🦆" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🐢"); + }, + { id: "🐢" }, + ); + queue.add( + async () => { + await delay(400); + result.push("⚡️"); + }, + { id: "⚡️" }, + ); + queue.setPriority("⚡️", 1); await queue.onIdle(); - t.deepEqual(result, ['🐌', '🦆', '⚡️', '🐢']); + t.deepEqual(result, ["🐌", "🦆", "⚡️", "🐢"]); }); -test('.setPriority() - execute a promise before planned - concurrency 3', async t => { +test(".setPriority() - execute a promise before planned - concurrency 3", async (t) => { const result: string[] = []; - const queue = new PQueue({concurrency: 3}); - queue.add(async () => { - await delay(400); - result.push('🐌'); - }, {id: '🐌'}); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {id: '🦆'}); - queue.add(async () => { - await delay(400); - result.push('🐢'); - }, {id: '🐢'}); - queue.add(async () => { - await delay(400); - result.push('⚡️'); - }, {id: '⚡️'}); - queue.add(async () => { - await delay(400); - result.push('🦀'); - }, {id: '🦀'}); - queue.setPriority('🦀', 1); + const queue = new PQueue({ concurrency: 3 }); + queue.add( + async () => { + await delay(400); + result.push("🐌"); + }, + { id: "🐌" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦆"); + }, + { id: "🦆" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🐢"); + }, + { id: "🐢" }, + ); + queue.add( + async () => { + await delay(400); + result.push("⚡️"); + }, + { id: "⚡️" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦀"); + }, + { id: "🦀" }, + ); + queue.setPriority("🦀", 1); await queue.onIdle(); - t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']); + t.deepEqual(result, ["🐌", "🦆", "🐢", "🦀", "⚡️"]); }); -test('.setPriority() - execute a multiple promise before planned, with variable priority', async t => { +test(".setPriority() - execute a multiple promise before planned, with variable priority", async (t) => { const result: string[] = []; - const queue = new PQueue({concurrency: 2}); - queue.add(async () => { - await delay(400); - result.push('🐌'); - }, {id: '🐌'}); - queue.add(async () => { - await delay(400); - result.push('🦆'); - }, {id: '🦆'}); - queue.add(async () => { - await delay(400); - result.push('🐢'); - }, {id: '🐢'}); - queue.add(async () => { - await delay(400); - result.push('⚡️'); - }, {id: '⚡️'}); - queue.add(async () => { - await delay(400); - result.push('🦀'); - }, {id: '🦀'}); - queue.setPriority('⚡️', 1); - queue.setPriority('🦀', 2); + const queue = new PQueue({ concurrency: 2 }); + queue.add( + async () => { + await delay(400); + result.push("🐌"); + }, + { id: "🐌" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦆"); + }, + { id: "🦆" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🐢"); + }, + { id: "🐢" }, + ); + queue.add( + async () => { + await delay(400); + result.push("⚡️"); + }, + { id: "⚡️" }, + ); + queue.add( + async () => { + await delay(400); + result.push("🦀"); + }, + { id: "🦀" }, + ); + queue.setPriority("⚡️", 1); + queue.setPriority("🦀", 2); await queue.onIdle(); - t.deepEqual(result, ['🐌', '🦆', '🦀', '⚡️', '🐢']); + t.deepEqual(result, ["🐌", "🦆", "🦀", "⚡️", "🐢"]); }); -test('.setPriority() - execute a promise before planned - concurrency 3 and unspecified `id`', async t => { +test(".setPriority() - execute a promise before planned - concurrency 3 and unspecified `id`", async (t) => { const result: string[] = []; - const queue = new PQueue({concurrency: 3}); + const queue = new PQueue({ concurrency: 3 }); queue.add(async () => { await delay(400); - result.push('🐌'); + result.push("🐌"); }); queue.add(async () => { await delay(400); - result.push('🦆'); + result.push("🦆"); }); queue.add(async () => { await delay(400); - result.push('🐢'); + result.push("🐢"); }); queue.add(async () => { await delay(400); - result.push('⚡️'); + result.push("⚡️"); }); queue.add(async () => { await delay(400); - result.push('🦀'); + result.push("🦀"); }); - queue.setPriority('5', 1); + queue.setPriority("5", 1); await queue.onIdle(); - t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']); + t.deepEqual(result, ["🐌", "🦆", "🐢", "🦀", "⚡️"]); });