diff --git a/src/zql/ivm/graph/difference-stream-reader.ts b/src/zql/ivm/graph/difference-stream-reader.ts index 42e3a49..5c00178 100644 --- a/src/zql/ivm/graph/difference-stream-reader.ts +++ b/src/zql/ivm/graph/difference-stream-reader.ts @@ -1,10 +1,10 @@ import {invariant, must} from '../../error/asserts.js'; +import {Multiset} from '../multiset.js'; import {Version} from '../types.js'; -import {Queue, QueueEntry} from './queue.js'; import {DifferenceStreamWriter} from './difference-stream-writer.js'; -import {Operator} from './operators/operator.js'; import {Request} from './message.js'; -import {Multiset} from '../multiset.js'; +import {Operator} from './operators/operator.js'; +import {Queue, QueueEntry} from './queue.js'; /** * Represents the input to an operator. @@ -21,13 +21,12 @@ import {Multiset} from '../multiset.js'; * o o o */ export class DifferenceStreamReader { - protected readonly _queue; + protected readonly _queue = new Queue(); readonly #upstreamWriter; #downstreamOperator: Operator | null = null; #lastSeenVersion: Version = -1; constructor(upstream: DifferenceStreamWriter) { - this._queue = new Queue(); this.#upstreamWriter = upstream; } diff --git a/src/zql/ivm/graph/operators/reduce-operator.test.ts b/src/zql/ivm/graph/operators/reduce-operator.test.ts index b706525..d399e08 100644 --- a/src/zql/ivm/graph/operators/reduce-operator.test.ts +++ b/src/zql/ivm/graph/operators/reduce-operator.test.ts @@ -1,13 +1,13 @@ import {expect, test} from 'vitest'; -import {DifferenceStreamWriter} from '../difference-stream-writer.js'; -import {ReduceOperator} from './reduce-operator.js'; import {Multiset} from '../../multiset.js'; +import {DifferenceStreamWriter} from '../difference-stream-writer.js'; import {NoOp} from './operator.js'; +import {ReduceOperator} from './reduce-operator.js'; type Thing = { id: string; - a: number; - b: string; + value: number; + groupKey: string; }; type Reduction = { @@ -20,8 +20,8 @@ test('collects all things with the same key', () => { const inputReader = inputWriter.newReader(); const output = new DifferenceStreamWriter(); - function getKey(t: Thing) { - return t.b; + function getGroupKey(t: Thing) { + return t.groupKey; } function getValueIdentity(t: Thing) { return t.id; @@ -31,13 +31,13 @@ test('collects all things with the same key', () => { inputReader, output, getValueIdentity, - getKey, + getGroupKey, (group: Iterable) => { let sum = 0; let id = ''; for (const item of group) { - id = item.b; - sum += item.a; + id = item.groupKey; + sum += item.value; } return { @@ -56,16 +56,16 @@ test('collects all things with the same key', () => { [ { id: 'a', - a: 1, - b: 'x', + value: 1, + groupKey: 'x', }, 1, ], [ { id: 'b', - a: 2, - b: 'x', + value: 2, + groupKey: 'x', }, 2, ], @@ -80,8 +80,8 @@ test('collects all things with the same key', () => { [ { id: 'a', - a: 1, - b: 'x', + value: 1, + groupKey: 'x', }, -1, ], @@ -92,15 +92,15 @@ test('collects all things with the same key', () => { [{id: 'x', sum: 4}, 1], ]); - // fully retract items that constitue a grouping + // fully retract items that constitute a grouping inputWriter.queueData([ 1, new Multiset([ [ { id: 'b', - a: 2, - b: 'x', + value: 2, + groupKey: 'x', }, -2, ], @@ -115,8 +115,8 @@ test('collects all things with the same key', () => { [ { id: 'a', - a: 1, - b: 'c', + value: 1, + groupKey: 'c', }, 1, ], @@ -129,8 +129,8 @@ test('collects all things with the same key', () => { [ { id: 'b', - a: 2, - b: 'c', + value: 2, + groupKey: 'c', }, 1, ], @@ -147,16 +147,16 @@ test('collects all things with the same key', () => { [ { id: 'a', - a: 1, - b: 'c', + value: 1, + groupKey: 'c', }, -1, ], [ { id: 'a', - a: 2, - b: 'c', + value: 2, + groupKey: 'c', }, 1, ], diff --git a/src/zql/ivm/graph/operators/reduce-operator.ts b/src/zql/ivm/graph/operators/reduce-operator.ts index 67d331a..3aefe87 100644 --- a/src/zql/ivm/graph/operators/reduce-operator.ts +++ b/src/zql/ivm/graph/operators/reduce-operator.ts @@ -51,17 +51,17 @@ export class ReduceOperator< f: (input: Iterable) => O, ) { const inner = (version: Version) => { - const keysToDo = new Set(); + const keysToProcess = new Set(); const ret: Entry[] = []; for (const entry of this.inputMessages(version)) { for (const [value, mult] of entry[1].entries) { const key = getGroupKey(value); - keysToDo.add(key); + keysToProcess.add(key); this.#addToIndex(key, value, mult); } } - for (const k of keysToDo) { + for (const k of keysToProcess) { const dataIn = this.#inIndex.get(k); const existingOut = this.#outIndex.get(k); if (dataIn === undefined) { @@ -100,9 +100,10 @@ export class ReduceOperator< existing = new Map(); this.#inIndex.set(key, existing); } - const prev = existing.get(this.#getValueIdentity(value)); + const valueIdentity = this.#getValueIdentity(value); + const prev = existing.get(valueIdentity); if (prev === undefined) { - existing.set(this.#getValueIdentity(value), [value, mult]); + existing.set(valueIdentity, [value, mult]); } else { const [v, m] = prev; const newMult = m + mult; @@ -111,13 +112,13 @@ export class ReduceOperator< 'Should not end up with a negative multiplicity when tracking all events for an item', ); if (newMult === 0) { - existing.delete(this.#getValueIdentity(value)); + existing.delete(valueIdentity); if (existing.size === 0) { this.#inIndex.delete(key); return undefined; } } else { - existing.set(this.#getValueIdentity(value), [v, newMult]); + existing.set(valueIdentity, [v, newMult]); } } diff --git a/src/zql/ivm/graph/queue.ts b/src/zql/ivm/graph/queue.ts index ff2fb4e..cafb829 100644 --- a/src/zql/ivm/graph/queue.ts +++ b/src/zql/ivm/graph/queue.ts @@ -4,8 +4,8 @@ import {Version} from '../types.js'; import {Reply} from './message.js'; export type QueueEntry = - | readonly [Version, Multiset, Reply] - | readonly [Version, Multiset]; + | readonly [version: Version, multiset: Multiset, reply: Reply] + | readonly [version: Version, multiset: Multiset]; type Node = { data: QueueEntry;