Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix (#214) : added typing for consistent return type for add() method #218

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 114 additions & 57 deletions source/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
import {EventEmitter} from 'eventemitter3';
import pTimeout, {TimeoutError} from 'p-timeout';
import {type Queue, type RunFunction} from './queue.js';
import PriorityQueue from './priority-queue.js';
import {type QueueAddOptions, type Options, type TaskOptions} from './options.js';
import { EventEmitter } from "eventemitter3";

Check failure on line 1 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

There should be no space after '{'.

Check failure on line 1 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

There should be no space before '}'.

Check failure on line 1 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

Strings must use singlequote.

Check failure on line 1 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

There should be no space after '{'.

Check failure on line 1 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

There should be no space before '}'.

Check failure on line 1 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

Strings must use singlequote.
import pTimeout, { TimeoutError } from "p-timeout";

Check failure on line 2 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

There should be no space after '{'.

Check failure on line 2 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

There should be no space before '}'.

Check failure on line 2 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

Strings must use singlequote.

Check failure on line 2 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

There should be no space after '{'.

Check failure on line 2 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

There should be no space before '}'.

Check failure on line 2 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

Strings must use singlequote.
import { type Queue, type RunFunction } from "./queue.js";

Check failure on line 3 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

There should be no space after '{'.

Check failure on line 3 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

There should be no space before '}'.

Check failure on line 3 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

Strings must use singlequote.

Check failure on line 3 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

There should be no space after '{'.

Check failure on line 3 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

There should be no space before '}'.

Check failure on line 3 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

Strings must use singlequote.
import PriorityQueue from "./priority-queue.js";

Check failure on line 4 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

Strings must use singlequote.

Check failure on line 4 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

Strings must use singlequote.
import {
type QueueAddOptions,
type Options,
type TaskOptions,
Truthy,
Falsy,
BooleanTypeReturn,
} from "./options.js";

type Task<TaskResultType> =
| ((options: TaskOptions) => PromiseLike<TaskResultType>)
| ((options: TaskOptions) => TaskResultType);

type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error';
type EventName =
| "active"
| "idle"
| "empty"
| "add"
| "next"
| "completed"
| "error";

/**
Promise queue with concurrency control.
*/
export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> { // eslint-disable-line @typescript-eslint/naming-convention, unicorn/prefer-event-target
export default class PQueue<
T extends Truthy | Falsy,
QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue,
EnqueueOptionsType extends QueueAddOptions<T> = QueueAddOptions<T>,
> extends EventEmitter<EventName> {
// eslint-disable-line @typescript-eslint/naming-convention, unicorn/prefer-event-target
readonly #carryoverConcurrencyCount: boolean;

readonly #isIntervalIgnored: boolean;
Expand Down Expand Up @@ -54,7 +73,7 @@
timeout?: number;

// TODO: The `throwOnTimeout` option should affect the return types of `add()` and `addAll()`
constructor(options?: Options<QueueType, EnqueueOptionsType>) {
constructor(options?: Options<QueueType, EnqueueOptionsType, T>) {
super();

// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
Expand All @@ -66,18 +85,33 @@
autoStart: true,
queueClass: PriorityQueue,
...options,
} as Options<QueueType, EnqueueOptionsType>;

if (!(typeof options.intervalCap === 'number' && options.intervalCap >= 1)) {
throw new TypeError(`Expected \`intervalCap\` to be a number from 1 and up, got \`${options.intervalCap?.toString() ?? ''}\` (${typeof options.intervalCap})`);
} as Options<QueueType, EnqueueOptionsType, T>;

if (
!(typeof options.intervalCap === "number" && options.intervalCap >= 1)
) {
throw new TypeError(
`Expected \`intervalCap\` to be a number from 1 and up, got \`${
options.intervalCap?.toString() ?? ""
}\` (${typeof options.intervalCap})`,
);
}

if (options.interval === undefined || !(Number.isFinite(options.interval) && options.interval >= 0)) {
throw new TypeError(`Expected \`interval\` to be a finite number >= 0, got \`${options.interval?.toString() ?? ''}\` (${typeof options.interval})`);
if (
options.interval === undefined ||
!(Number.isFinite(options.interval) && options.interval >= 0)
) {
throw new TypeError(
`Expected \`interval\` to be a finite number >= 0, got \`${
options.interval?.toString() ?? ""
}\` (${typeof options.interval})`,
);
}

this.#carryoverConcurrencyCount = options.carryoverConcurrencyCount!;
this.#isIntervalIgnored = options.intervalCap === Number.POSITIVE_INFINITY || options.interval === 0;
this.#isIntervalIgnored =
options.intervalCap === Number.POSITIVE_INFINITY ||
options.interval === 0;
this.#intervalCap = options.intervalCap;
this.#interval = options.interval;
this.#queue = new options.queueClass!();
Expand All @@ -99,7 +133,7 @@
#next(): void {
this.#pending--;
this.#tryToStartAnother();
this.emit('next');
this.emit("next");
}

#onResumeInterval(): void {
Expand All @@ -116,16 +150,15 @@
if (delay < 0) {
// Act as the interval was done
// We don't need to resume it here because it will be resumed on line 160
this.#intervalCount = (this.#carryoverConcurrencyCount) ? this.#pending : 0;
this.#intervalCount = this.#carryoverConcurrencyCount
? this.#pending
: 0;
} else {
// Act as the interval is pending
if (this.#timeoutId === undefined) {
this.#timeoutId = setTimeout(
() => {
this.#onResumeInterval();
},
delay,
);
this.#timeoutId = setTimeout(() => {
this.#onResumeInterval();
}, delay);
}

return true;
Expand All @@ -145,10 +178,10 @@

this.#intervalId = undefined;

this.emit('empty');
this.emit("empty");

if (this.#pending === 0) {
this.emit('idle');
this.emit("idle");
}

return false;
Expand All @@ -162,7 +195,7 @@
return false;
}

this.emit('active');
this.emit("active");
job();

if (canInitializeInterval) {
Expand All @@ -181,12 +214,9 @@
return;
}

this.#intervalId = setInterval(
() => {
this.#onInterval();
},
this.#interval,
);
this.#intervalId = setInterval(() => {
this.#onInterval();
}, this.#interval);

this.#intervalEnd = Date.now() + this.#interval;
}
Expand Down Expand Up @@ -214,8 +244,10 @@
}

set concurrency(newConcurrency: number) {
if (!(typeof newConcurrency === 'number' && newConcurrency >= 1)) {
throw new TypeError(`Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})`);
if (!(typeof newConcurrency === "number" && newConcurrency >= 1)) {
throw new TypeError(
`Expected \`concurrency\` to be a number from 1 and up, got \`${newConcurrency}\` (${typeof newConcurrency})`,
);
}

this.#concurrency = newConcurrency;
Expand All @@ -225,9 +257,13 @@

async #throwOnAbort(signal: AbortSignal): Promise<never> {
return new Promise((_resolve, reject) => {
signal.addEventListener('abort', () => {
reject(signal.reason);
}, {once: true});
signal.addEventListener(
"abort",
() => {
reject(signal.reason);
},
{ once: true },
);
});
}

Expand Down Expand Up @@ -274,9 +310,21 @@
/**
Adds a sync or async task to the queue. Always returns a promise.
*/
async add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
async add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> {
async add<TaskResultType>(
function_: Task<TaskResultType>,
options: { throwOnTimeout: true } & Exclude<
EnqueueOptionsType,
"throwOnTimeout"
>,
): Promise<TaskResultType>;
async add<TaskResultType>(
function_: Task<TaskResultType>,
options?: Partial<EnqueueOptionsType>,
): Promise<BooleanTypeReturn<T, TaskResultType, void>>;
async add<TaskResultType>(
function_: Task<TaskResultType>,
options: Partial<EnqueueOptionsType> = {},
): Promise<BooleanTypeReturn<T, TaskResultType, void>> {
// In case `id` is not defined.
options.id ??= (this.#idAssigner++).toString();

Expand All @@ -294,33 +342,38 @@
try {
options.signal?.throwIfAborted();

let operation = function_({signal: options.signal});
let operation = function_({ signal: options.signal });

if (options.timeout) {
operation = pTimeout(Promise.resolve(operation), {milliseconds: options.timeout});
operation = pTimeout(Promise.resolve(operation), {
milliseconds: options.timeout,
});
}

if (options.signal) {
operation = Promise.race([operation, this.#throwOnAbort(options.signal)]);
operation = Promise.race([
operation,
this.#throwOnAbort(options.signal),
]);
}

const result = await operation;
resolve(result);
this.emit('completed', result);
this.emit("completed", result);
} catch (error: unknown) {
if (error instanceof TimeoutError && !options.throwOnTimeout) {
resolve();
return;
}

reject(error);
this.emit('error', error);
this.emit("error", error);
} finally {
this.#next();
}
}, options);

this.emit('add');
this.emit("add");

this.#tryToStartAnother();
});
Expand All @@ -333,17 +386,21 @@
*/
async addAll<TaskResultsType>(
functions: ReadonlyArray<Task<TaskResultsType>>,
options?: {throwOnTimeout: true} & Partial<Exclude<EnqueueOptionsType, 'throwOnTimeout'>>,
): Promise<TaskResultsType[]>;
options?: { throwOnTimeout: true } & Partial<
Exclude<EnqueueOptionsType, "throwOnTimeout">
>,
): Promise<BooleanTypeReturn<T, TaskResultsType, void>[]>;
async addAll<TaskResultsType>(
functions: ReadonlyArray<Task<TaskResultsType>>,
options?: Partial<EnqueueOptionsType>,
): Promise<Array<TaskResultsType | void>>;
): Promise<BooleanTypeReturn<T, TaskResultsType, void>[]>;
async addAll<TaskResultsType>(
functions: ReadonlyArray<Task<TaskResultsType>>,
options?: Partial<EnqueueOptionsType>,
): Promise<Array<TaskResultsType | void>> {
return Promise.all(functions.map(async function_ => this.add(function_, options)));
): Promise<BooleanTypeReturn<T, TaskResultsType, void>[]> {
return Promise.all(
functions.map(async (function_) => this.add(function_, options)),
);
}

/**
Expand Down Expand Up @@ -385,7 +442,7 @@
return;
}

await this.#onEvent('empty');
await this.#onEvent("empty");
}

/**
Expand All @@ -401,7 +458,7 @@
return;
}

await this.#onEvent('next', () => this.#queue.size < limit);
await this.#onEvent("next", () => this.#queue.size < limit);
}

/**
Expand All @@ -415,11 +472,11 @@
return;
}

await this.#onEvent('idle');
await this.#onEvent("idle");
}

async #onEvent(event: EventName, filter?: () => boolean): Promise<void> {
return new Promise(resolve => {
return new Promise((resolve) => {
const listener = () => {
if (filter && !filter()) {
return;
Expand Down Expand Up @@ -465,5 +522,5 @@
}
}

export type {Queue} from './queue.js';
export {type QueueAddOptions, type Options} from './options.js';
export type { Queue } from "./queue.js";
export { type QueueAddOptions, type Options } from "./options.js";
28 changes: 21 additions & 7 deletions source/options.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import {type Queue, type RunFunction} from './queue.js';
import { type Queue, type RunFunction } from "./queue.js";

type TimeoutOptions = {
export type Falsy = false;
export type Truthy = true;

export type BooleanTypeReturn<
T extends Falsy | Truthy,
TruthyReturn,
FalsyReturn
> = T extends Truthy ? TruthyReturn : FalsyReturn;

type TimeoutOptions<T extends Falsy | Truthy> = {
/**
Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.
*/
Expand All @@ -11,10 +20,14 @@ type TimeoutOptions = {

@default false
*/
throwOnTimeout?: boolean;
throwOnTimeout?: T;
};

export type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = {
export type Options<
QueueType extends Queue<RunFunction, QueueOptions>,
QueueOptions extends QueueAddOptions<Throw>,
Throw extends Falsy | Truthy
> = {
/**
Concurrency limit.

Expand Down Expand Up @@ -60,9 +73,9 @@ export type Options<QueueType extends Queue<RunFunction, QueueOptions>, QueueOpt
@default false
*/
readonly carryoverConcurrencyCount?: boolean;
} & TimeoutOptions;
} & TimeoutOptions<Throw>;

export type QueueAddOptions = {
export type QueueAddOptions<Throw extends Falsy | Truthy> = {
/**
Priority of operation. Operations with greater priority will be scheduled first.

Expand All @@ -74,7 +87,8 @@ export type QueueAddOptions = {
Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`.
*/
id?: string;
} & TaskOptions & TimeoutOptions;
} & TaskOptions &
TimeoutOptions<Throw>;

export type TaskOptions = {
/**
Expand Down
Loading
Loading