diff --git a/barrier.ts b/barrier.ts index 321c167..8ffbd53 100644 --- a/barrier.ts +++ b/barrier.ts @@ -33,11 +33,13 @@ export class Barrier { * Creates a new `Barrier` that blocks until `size` threads have called `wait`. * * @param size The number of threads that must reach the barrier before it unblocks. - * @throws Error if the size is negative. + * @throws {RangeError} if the size is not a positive safe integer. */ constructor(size: number) { - if (size < 0) { - throw new Error("The size must be greater than 0"); + if (size <= 0 || !Number.isSafeInteger(size)) { + throw new RangeError( + `size must be a positive safe integer, got ${size}`, + ); } this.#rest = size; } diff --git a/barrier_test.ts b/barrier_test.ts index acb3b0b..22d0275 100644 --- a/barrier_test.ts +++ b/barrier_test.ts @@ -1,4 +1,4 @@ -import { assertEquals, assertRejects } from "@std/assert"; +import { assertEquals, assertRejects, assertThrows } from "@std/assert"; import { deadline, delay } from "@std/async"; import { Barrier } from "./barrier.ts"; @@ -79,4 +79,16 @@ Deno.test("Barrier", async (t) => { ); }, ); + + await t.step( + "throws RangeError if size is not a positive safe integer", + () => { + assertThrows(() => new Barrier(NaN), RangeError); + assertThrows(() => new Barrier(Infinity), RangeError); + assertThrows(() => new Barrier(-Infinity), RangeError); + assertThrows(() => new Barrier(-1), RangeError); + assertThrows(() => new Barrier(1.1), RangeError); + assertThrows(() => new Barrier(0), RangeError); + }, + ); }); diff --git a/deno.jsonc b/deno.jsonc index bd7becc..801b3d2 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -42,6 +42,7 @@ "@core/asyncutil/semaphore": "./semaphore.ts", "@core/asyncutil/stack": "./stack.ts", "@core/asyncutil/wait-group": "./wait_group.ts", + "@core/iterutil": "jsr:@core/iterutil@^0.6.0", "@std/assert": "jsr:@std/assert@^1.0.2", "@std/async": "jsr:@std/async@^1.0.2" }, diff --git a/deno.lock b/deno.lock index 5b0da6c..b2a1a57 100644 --- a/deno.lock +++ b/deno.lock @@ -2,11 +2,15 @@ "version": "3", "packages": { "specifiers": { + "jsr:@core/iterutil@^0.6.0": "jsr:@core/iterutil@0.6.0", "jsr:@std/assert@^1.0.2": "jsr:@std/assert@1.0.2", "jsr:@std/async@^1.0.2": "jsr:@std/async@1.0.2", "jsr:@std/internal@^1.0.1": "jsr:@std/internal@1.0.1" }, "jsr": { + "@core/iterutil@0.6.0": { + "integrity": "8de0d0062a515496ae744983941d7e379668c2ee2edf43f63423e8da753828b1" + }, "@std/assert@1.0.2": { "integrity": "ccacec332958126deaceb5c63ff8b4eaf9f5ed0eac9feccf124110435e59e49c", "dependencies": [ @@ -24,6 +28,7 @@ "remote": {}, "workspace": { "dependencies": [ + "jsr:@core/iterutil@^0.6.0", "jsr:@std/assert@^1.0.2", "jsr:@std/async@^1.0.2" ] diff --git a/notify.ts b/notify.ts index a44b8d9..355c633 100644 --- a/notify.ts +++ b/notify.ts @@ -1,3 +1,6 @@ +import { iter } from "@core/iterutil/iter"; +import { take } from "@core/iterutil/take"; + /** * Async notifier that allows one or more "waiters" to wait for a notification. * @@ -20,30 +23,31 @@ * ``` */ export class Notify { - #waiters: { - promise: Promise; - resolve: () => void; - reject: (reason?: unknown) => void; - }[] = []; + #waiters: Set> = new Set(); /** * Returns the number of waiters that are waiting for notification. */ get waiterCount(): number { - return this.#waiters.length; + return this.#waiters.size; } /** * Notifies `n` waiters that are waiting for notification. Resolves each of the notified waiters. * If there are fewer than `n` waiters, all waiters are notified. + * + * @param n The number of waiters to notify. + * @throws {RangeError} if `n` is not a positive safe integer. */ notify(n = 1): void { - const head = this.#waiters.slice(0, n); - const tail = this.#waiters.slice(n); - for (const waiter of head) { + if (n <= 0 || !Number.isSafeInteger(n)) { + throw new RangeError(`n must be a positive safe integer, got ${n}`); + } + const it = iter(this.#waiters); + for (const waiter of take(it, n)) { waiter.resolve(); } - this.#waiters = tail; + this.#waiters = new Set(it); } /** @@ -53,7 +57,7 @@ export class Notify { for (const waiter of this.#waiters) { waiter.resolve(); } - this.#waiters = []; + this.#waiters = new Set(); } /** @@ -67,17 +71,12 @@ export class Notify { } const waiter = Promise.withResolvers(); const abort = () => { - removeItem(this.#waiters, waiter); + this.#waiters.delete(waiter); waiter.reject(signal!.reason); }; signal?.addEventListener("abort", abort, { once: true }); - this.#waiters.push(waiter); + this.#waiters.add(waiter); await waiter.promise; signal?.removeEventListener("abort", abort); } } - -function removeItem(array: T[], item: T): void { - const index = array.indexOf(item); - array.splice(index, 1); -} diff --git a/notify_test.ts b/notify_test.ts index f20a539..6e7bd86 100644 --- a/notify_test.ts +++ b/notify_test.ts @@ -1,5 +1,5 @@ import { delay } from "@std/async/delay"; -import { assertEquals, assertRejects } from "@std/assert"; +import { assertEquals, assertRejects, assertThrows } from "@std/assert"; import { promiseState } from "./promise_state.ts"; import { Notify } from "./notify.ts"; @@ -8,21 +8,61 @@ Deno.test("Notify", async (t) => { const notify = new Notify(); const waiter1 = notify.notified(); const waiter2 = notify.notified(); + assertEquals(notify.waiterCount, 2); notify.notify(); + assertEquals(notify.waiterCount, 1); assertEquals(await promiseState(waiter1), "fulfilled"); assertEquals(await promiseState(waiter2), "pending"); notify.notify(); + assertEquals(notify.waiterCount, 0); assertEquals(await promiseState(waiter1), "fulfilled"); assertEquals(await promiseState(waiter2), "fulfilled"); }); + await t.step("'notify' wakes up a multiple waiters", async () => { + const notify = new Notify(); + const waiter1 = notify.notified(); + const waiter2 = notify.notified(); + const waiter3 = notify.notified(); + const waiter4 = notify.notified(); + const waiter5 = notify.notified(); + assertEquals(notify.waiterCount, 5); + + notify.notify(2); + assertEquals(notify.waiterCount, 3); + assertEquals(await promiseState(waiter1), "fulfilled"); + assertEquals(await promiseState(waiter2), "fulfilled"); + assertEquals(await promiseState(waiter3), "pending"); + assertEquals(await promiseState(waiter4), "pending"); + assertEquals(await promiseState(waiter5), "pending"); + + notify.notify(2); + assertEquals(notify.waiterCount, 1); + assertEquals(await promiseState(waiter1), "fulfilled"); + assertEquals(await promiseState(waiter2), "fulfilled"); + assertEquals(await promiseState(waiter3), "fulfilled"); + assertEquals(await promiseState(waiter4), "fulfilled"); + assertEquals(await promiseState(waiter5), "pending"); + + notify.notify(2); + assertEquals(notify.waiterCount, 0); + assertEquals(await promiseState(waiter1), "fulfilled"); + assertEquals(await promiseState(waiter2), "fulfilled"); + assertEquals(await promiseState(waiter3), "fulfilled"); + assertEquals(await promiseState(waiter4), "fulfilled"); + assertEquals(await promiseState(waiter5), "fulfilled"); + }); + await t.step("'notifyAll' wakes up all waiters", async () => { const notify = new Notify(); const waiter1 = notify.notified(); const waiter2 = notify.notified(); + assertEquals(notify.waiterCount, 2); + notify.notifyAll(); + assertEquals(notify.waiterCount, 0); assertEquals(await promiseState(waiter1), "fulfilled"); assertEquals(await promiseState(waiter2), "fulfilled"); }); @@ -69,4 +109,17 @@ Deno.test("Notify", async (t) => { ); }, ); + + await t.step( + "'notify' throws RangeError if size is not a positive safe integer", + () => { + const notify = new Notify(); + assertThrows(() => notify.notify(NaN), RangeError); + assertThrows(() => notify.notify(Infinity), RangeError); + assertThrows(() => notify.notify(-Infinity), RangeError); + assertThrows(() => notify.notify(-1), RangeError); + assertThrows(() => notify.notify(1.1), RangeError); + assertThrows(() => notify.notify(0), RangeError); + }, + ); }); diff --git a/semaphore.ts b/semaphore.ts index 0c90f93..3aba954 100644 --- a/semaphore.ts +++ b/semaphore.ts @@ -23,11 +23,13 @@ export class Semaphore { * Creates a new semaphore with the specified limit. * * @param size The maximum number of times the semaphore can be acquired before blocking. - * @throws Error if the size is less than 1. + * @throws {RangeError} if the size is not a positive safe integer. */ constructor(size: number) { - if (size < 0) { - throw new Error("The size must be greater than 0"); + if (size <= 0 || !Number.isSafeInteger(size)) { + throw new RangeError( + `size must be a positive safe integer, got ${size}`, + ); } this.#rest = size + 1; } diff --git a/semaphore_test.ts b/semaphore_test.ts index 9b21457..85be7ce 100644 --- a/semaphore_test.ts +++ b/semaphore_test.ts @@ -1,9 +1,9 @@ -import { assertEquals } from "@std/assert"; +import { assertEquals, assertThrows } from "@std/assert"; import { Semaphore } from "./semaphore.ts"; Deno.test("Semaphore", async (t) => { await t.step( - "regulates the number of workers concurrently running", + "regulates the number of workers concurrently running (n=5)", async () => { let nworkers = 0; const results: number[] = []; @@ -32,4 +32,78 @@ Deno.test("Semaphore", async (t) => { ]); }, ); + + await t.step( + "regulates the number of workers concurrently running (n=1)", + async () => { + let nworkers = 0; + const results: number[] = []; + const sem = new Semaphore(1); + const worker = () => { + return sem.lock(async () => { + nworkers++; + results.push(nworkers); + await new Promise((resolve) => setTimeout(resolve, 10)); + nworkers--; + }); + }; + await Promise.all([...Array(10)].map(() => worker())); + assertEquals(nworkers, 0); + assertEquals(results, [ + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + ]); + }, + ); + + await t.step( + "regulates the number of workers concurrently running (n=10)", + async () => { + let nworkers = 0; + const results: number[] = []; + const sem = new Semaphore(10); + const worker = () => { + return sem.lock(async () => { + nworkers++; + results.push(nworkers); + await new Promise((resolve) => setTimeout(resolve, 10)); + nworkers--; + }); + }; + await Promise.all([...Array(10)].map(() => worker())); + assertEquals(nworkers, 0); + assertEquals(results, [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + ]); + }, + ); + + await t.step( + "throws RangeError if size is not a positive safe integer", + () => { + assertThrows(() => new Semaphore(NaN), RangeError); + assertThrows(() => new Semaphore(Infinity), RangeError); + assertThrows(() => new Semaphore(-Infinity), RangeError); + assertThrows(() => new Semaphore(-1), RangeError); + assertThrows(() => new Semaphore(1.1), RangeError); + assertThrows(() => new Semaphore(0), RangeError); + }, + ); }); diff --git a/wait_group.ts b/wait_group.ts index b98d165..2f9bb46 100644 --- a/wait_group.ts +++ b/wait_group.ts @@ -36,6 +36,9 @@ export class WaitGroup { * @param delta The number to add to the counter. It can be positive or negative. */ add(delta: number): void { + if (!Number.isSafeInteger(delta)) { + throw new RangeError(`delta must be a safe integer, got ${delta}`); + } this.#count += delta; if (this.#count === 0) { this.#notify.notifyAll(); diff --git a/wait_group_test.ts b/wait_group_test.ts index 786cb20..bcbbca2 100644 --- a/wait_group_test.ts +++ b/wait_group_test.ts @@ -1,4 +1,4 @@ -import { assertEquals, assertRejects } from "@std/assert"; +import { assertEquals, assertRejects, assertThrows } from "@std/assert"; import { deadline, delay } from "@std/async"; import { WaitGroup } from "./wait_group.ts"; @@ -83,4 +83,15 @@ Deno.test("WaitGroup", async (t) => { ); }, ); + + await t.step( + "'add' throws RangeError if delta is not a safe integer", + () => { + const wg = new WaitGroup(); + assertThrows(() => wg.add(NaN), RangeError); + assertThrows(() => wg.add(Infinity), RangeError); + assertThrows(() => wg.add(-Infinity), RangeError); + assertThrows(() => wg.add(1.1), RangeError); + }, + ); });