Skip to content

Commit

Permalink
fix signal check
Browse files Browse the repository at this point in the history
  • Loading branch information
samthor committed Dec 28, 2024
1 parent 6814761 commit a2ab649
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
},
"name": "thorish",
"description": "This is a library of useful JS concepts and data structures for Node and the browser. It it, unashamedly, a dumping ground for code needed by [@samthor](https://twitter.com/samthor)'s projects.",
"version": "1.1.27",
"version": "1.1.28",
"directories": {
"test": "test"
},
Expand Down
30 changes: 20 additions & 10 deletions src/queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { promiseWithResolvers } from './promise.js';
import { promiseForSignal } from './signal.js';
import { promiseVoidForSignal } from './signal.js';

export class WorkQueue<T> {
private pending: T[] = [];
Expand Down Expand Up @@ -272,7 +272,7 @@ class LinkQueueImpl<X> implements LinkQueue<X> {
return true;
}

join(signal) {
join(signal?: AbortSignal) {
let waitNext: () => Promise<void>;
let ref: QueueRef<X> = this.head;

Expand All @@ -285,7 +285,7 @@ class LinkQueueImpl<X> implements LinkQueue<X> {
ref = emptyQueueRef as QueueRef<X>;
} else {
// normal signal
const signalPromise = promiseForSignal(signal, undefined);
const signalPromise = promiseVoidForSignal(signal);
waitNext = () => Promise.race([signalPromise, this.p!.promise]);
}

Expand All @@ -296,16 +296,18 @@ class LinkQueueImpl<X> implements LinkQueue<X> {
return ref.value;
}
async next() {
if (signal.aborted) {
if (signal?.aborted) {
return undefined;
}

let { value } = ref;
if (value !== undefined) {
// we're behind head: move it further, return value
ref = ref.next!;
return value;
}

// otherwise, we're at head: wait
outer.p ??= promiseWithResolvers();
await waitNext();
return this.next();
Expand Down Expand Up @@ -339,7 +341,10 @@ class ArrayQueueImpl<X> implements Queue<X> {

this.head += all.length;
if (this.subs.size === 0) {
this.data.splice(0, this.data.length);
// no subs, just nuke data anyway
if (this.data.length) {
this.data = [];
}
return false;
}

Expand All @@ -357,7 +362,7 @@ class ArrayQueueImpl<X> implements Queue<X> {

join(signal: AbortSignal): Listener<X> {
const outer = this;
const signalPromise = promiseForSignal(signal, undefined);
const signalPromise = promiseVoidForSignal(signal);

const waitFor = async (cb: (avail: number) => number): Promise<X[]> => {
for (;;) {
Expand Down Expand Up @@ -416,26 +421,31 @@ class ArrayQueueImpl<X> implements Queue<X> {
return l;
}

queueTrimEvents() {
private queueTrimEvents() {
if (this.trimTask) {
return;
}
this.trimTask = true;

// TODO: should this be configurable? this is really "admin cleanup"
const timeoutMs = 250;

setTimeout(() => {
this.trimTask = false;

const min = Math.min(...this.subs.values());
const min = Math.min(this.head, ...this.subs.values());
if (min === this.head) {
// subs all at head or no subs, nuke all data
if (this.data.length) {
this.data.splice(0, this.data.length);
this.data = [];
}
return;
}

const start = this.head - this.data.length;
const strip = min - start;
this.data.splice(0, strip);
}, 250);
}, timeoutMs);
}
}

Expand Down
9 changes: 9 additions & 0 deletions test/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,12 @@ test('queue abort', async () => {

assert.strictEqual(await lq.next(), undefined);
});

test('queue no signal', async () => {
const q = buildLinkQueue<number>();
const lq = q.join();

const p = lq.next();
q.push(123);
assert.strictEqual(await p, 123);
});

0 comments on commit a2ab649

Please sign in to comment.