From 58699cf56b633bc2cdf313ce3c88ac5aa6c08969 Mon Sep 17 00:00:00 2001 From: Matt <1009003+tantaman@users.noreply.github.com> Date: Fri, 15 Mar 2024 09:42:53 -0400 Subject: [PATCH] implement a basic `reduce` operator to support group-by --- src/zql/ast-to-ivm/pipeline-builder.ts | 46 ++++- src/zql/ast/ast.ts | 2 +- src/zql/ivm/graph/difference-stream.ts | 18 ++ src/zql/ivm/graph/operators/operator-index.ts | 69 +++++++ .../graph/operators/reduce-operator.test.ts | 178 ++++++++++++++++++ .../ivm/graph/operators/reduce-operator.ts | 126 +++++++++++++ src/zql/util/iterables.ts | 10 + 7 files changed, 444 insertions(+), 5 deletions(-) create mode 100644 src/zql/ivm/graph/operators/operator-index.ts create mode 100644 src/zql/ivm/graph/operators/reduce-operator.test.ts create mode 100644 src/zql/ivm/graph/operators/reduce-operator.ts diff --git a/src/zql/ast-to-ivm/pipeline-builder.ts b/src/zql/ast-to-ivm/pipeline-builder.ts index acd6d8b..b2615a7 100644 --- a/src/zql/ast-to-ivm/pipeline-builder.ts +++ b/src/zql/ast-to-ivm/pipeline-builder.ts @@ -1,3 +1,4 @@ +import {Entity} from '../../generate.js'; import {AST, Condition, ConditionList, Operator, Ordering} from '../ast/ast.js'; import {assert, must} from '../error/asserts.js'; import {DifferenceStream} from '../ivm/graph/difference-stream.js'; @@ -5,7 +6,7 @@ import {DifferenceStream} from '../ivm/graph/difference-stream.js'; export const orderingProp = Symbol(); export function buildPipeline( - sourceStreamProvider: (sourceName: string) => DifferenceStream, + sourceStreamProvider: (sourceName: string) => DifferenceStream, ast: AST, ) { // filters first @@ -28,13 +29,18 @@ export function buildPipeline( ret = applySelect(stream, ast.select, ast.orderBy); } + if (ast.groupBy) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ret = applyGroupBy(ret as any, ast.orderBy, ast.groupBy); + } + // Note: the stream is technically attached at this point. // We could detach it until the user actually runs (or subscribes to) the statement as a tiny optimization. return ret; } export function applySelect( - stream: DifferenceStream, + stream: DifferenceStream, select: string[], orderBy: Ordering | undefined, ) { @@ -62,7 +68,7 @@ export function applySelect( }); } -function applyWhere(stream: DifferenceStream, where: ConditionList) { +function applyWhere(stream: DifferenceStream, where: ConditionList) { let ret = stream; // We'll handle `OR` and parentheticals like so: // OR: We'll create a new stream for the LHS and RHS of the OR then merge together. @@ -93,7 +99,7 @@ function applyWhere(stream: DifferenceStream, where: ConditionList) { } function applyCondition( - stream: DifferenceStream, + stream: DifferenceStream, condition: Condition, ) { const operator = getOperator(condition.op); @@ -103,6 +109,38 @@ function applyCondition( ); } +function applyGroupBy( + stream: DifferenceStream< + Record & { + [orderingProp]: unknown[]; + } + >, + orderBy: Ordering, + columns: string[], +) { + const keyFunction = makeKeyFunction(columns); + const idIdx = orderBy[0].indexOf('id'); + return stream.reduce( + keyFunction, + value => value[orderingProp][idIdx] as string, + // TODO: apply aggregate functions against the group if specified + // e.g., count(x), sum(x), avg(x), array(x) etc. + values => values[Symbol.iterator]().next().value, + ); +} + +function makeKeyFunction(columns: string[]) { + return (x: Record) => { + const ret: unknown[] = []; + for (const column of columns) { + ret.push(x[column]); + } + // Would it be better to come up with someh hash function + // which can handle complex types? + return JSON.stringify(ret); + }; +} + // We're well-typed in the query builder so once we're down here // we can assume that the operator is valid. // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/src/zql/ast/ast.ts b/src/zql/ast/ast.ts index b988f1b..efcc287 100644 --- a/src/zql/ast/ast.ts +++ b/src/zql/ast/ast.ts @@ -29,7 +29,7 @@ export type AST = { // readonly on: ConditionList; // }[]; readonly limit?: number | undefined; - // readonly groupBy?: string[]; + readonly groupBy?: string[]; readonly orderBy: Ordering; // readonly after?: Primitive; }; diff --git a/src/zql/ivm/graph/difference-stream.ts b/src/zql/ivm/graph/difference-stream.ts index 3a43e01..f243df2 100644 --- a/src/zql/ivm/graph/difference-stream.ts +++ b/src/zql/ivm/graph/difference-stream.ts @@ -1,3 +1,5 @@ +import {Entity} from '../../../generate.js'; +import {Primitive} from '../../ast/ast.js'; import {Multiset} from '../multiset.js'; import {Source} from '../source/source.js'; import {Version} from '../types.js'; @@ -11,6 +13,7 @@ import {DebugOperator} from './operators/debug-operator.js'; import {DifferenceEffectOperator} from './operators/difference-effect-operator.js'; import {FilterOperator} from './operators/filter-operator.js'; import {MapOperator} from './operators/map-operator.js'; +import {ReduceOperator} from './operators/reduce-operator.js'; import {QueueEntry} from './queue.js'; /** @@ -48,6 +51,21 @@ export class DifferenceStream implements IDifferenceStream { return ret; } + reduce( + getKey: (value: T) => K, + getIdentity: (value: T) => string, + f: (input: Iterable) => O, + ): DifferenceStream { + const ret = this.newStream(); + new ReduceOperator( + this.#upstreamWriter.newReader(), + ret.#upstreamWriter, + getKey, + f, + ); + return ret; + } + /** * This differs from count in that `size` just counts the entire * stream whereas `count` counts the number of times each key appears. diff --git a/src/zql/ivm/graph/operators/operator-index.ts b/src/zql/ivm/graph/operators/operator-index.ts new file mode 100644 index 0000000..5602c56 --- /dev/null +++ b/src/zql/ivm/graph/operators/operator-index.ts @@ -0,0 +1,69 @@ +import {Entity} from '../../../../generate.js'; +import {Primitive} from '../../../ast/ast.js'; +import {Entry} from '../../multiset.js'; + +// TODO: see comment on join. Maybe we can do better here with somem sort of sorted map. +export class Index { + readonly #index = new Map[]>(); + + constructor() {} + + add(key: K, value: Entry) { + let existing = this.#index.get(key); + if (existing === undefined) { + existing = []; + this.#index.set(key, existing); + } + existing.push(value); + } + + extend(index: Index) { + for (const [key, value] of index.#index) { + for (const entry of value) { + this.add(key, entry); + } + } + } + + get(key: K): Entry[] { + return this.#index.get(key) ?? []; + } + + compact(keys: K[] = []) { + function consolidateValues(values: Entry[]): Entry[] { + const consolidated = new Map>(); + for (const [value, multiplicity] of values) { + if (multiplicity === 0) { + continue; + } + const existing = consolidated.get(value.id); + if (existing === undefined) { + consolidated.set(value.id, [value, multiplicity]); + } else { + const sum = existing[1] + multiplicity; + if (sum === 0) { + consolidated.delete(value.id); + } else { + consolidated.set(value.id, [value, sum]); + } + } + } + + return [...consolidated.values()]; + } + + // spread `keys` b/c if we do not then when we add below the iterator will continue. + const iterableKeys = keys.length !== 0 ? keys : [...this.#index.keys()]; + for (const key of iterableKeys) { + const entries = this.#index.get(key); + if (entries === undefined) { + continue; + } + this.#index.delete(key); + const consolidated = consolidateValues(entries); + if (consolidated.length !== 0) { + this.#index.set(key, consolidated); + } + } + } +} diff --git a/src/zql/ivm/graph/operators/reduce-operator.test.ts b/src/zql/ivm/graph/operators/reduce-operator.test.ts new file mode 100644 index 0000000..b706525 --- /dev/null +++ b/src/zql/ivm/graph/operators/reduce-operator.test.ts @@ -0,0 +1,178 @@ +import {expect, test} from 'vitest'; +import {DifferenceStreamWriter} from '../difference-stream-writer.js'; +import {ReduceOperator} from './reduce-operator.js'; +import {Multiset} from '../../multiset.js'; +import {NoOp} from './operator.js'; + +type Thing = { + id: string; + a: number; + b: string; +}; + +type Reduction = { + id: string; + sum: number; +}; + +test('collects all things with the same key', () => { + const inputWriter = new DifferenceStreamWriter(); + const inputReader = inputWriter.newReader(); + const output = new DifferenceStreamWriter(); + + function getKey(t: Thing) { + return t.b; + } + function getValueIdentity(t: Thing) { + return t.id; + } + + new ReduceOperator( + inputReader, + output, + getValueIdentity, + getKey, + (group: Iterable) => { + let sum = 0; + let id = ''; + for (const item of group) { + id = item.b; + sum += item.a; + } + + return { + id, + sum, + }; + }, + ); + + const outReader = output.newReader(); + outReader.setOperator(new NoOp()); + + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'a', + a: 1, + b: 'x', + }, + 1, + ], + [ + { + id: 'b', + a: 2, + b: 'x', + }, + 2, + ], + ]), + ]); + check([[{id: 'x', sum: 5}, 1]]); + + // retract an item + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'a', + a: 1, + b: 'x', + }, + -1, + ], + ]), + ]); + check([ + [{id: 'x', sum: 5}, -1], + [{id: 'x', sum: 4}, 1], + ]); + + // fully retract items that constitue a grouping + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'b', + a: 2, + b: 'x', + }, + -2, + ], + ]), + ]); + check([[{id: 'x', sum: 4}, -1]]); + + // add more entries + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'a', + a: 1, + b: 'c', + }, + 1, + ], + ]), + ]); + check([[{id: 'c', sum: 1}, 1]]); + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'b', + a: 2, + b: 'c', + }, + 1, + ], + ]), + ]); + check([ + [{id: 'c', sum: 1}, -1], + [{id: 'c', sum: 3}, 1], + ]); + + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'a', + a: 1, + b: 'c', + }, + -1, + ], + [ + { + id: 'a', + a: 2, + b: 'c', + }, + 1, + ], + ]), + ]); + check([ + [{id: 'c', sum: 3}, -1], + [{id: 'c', sum: 4}, 1], + ]); + + function check(expected: [Reduction, number][]) { + inputWriter.notify(1); + inputWriter.notifyCommitted(1); + const items = outReader.drain(1); + expect(items.length).toBe(1); + const entry = items[0]; + expect([...entry[1].entries]).toEqual(expected); + } +}); diff --git a/src/zql/ivm/graph/operators/reduce-operator.ts b/src/zql/ivm/graph/operators/reduce-operator.ts new file mode 100644 index 0000000..67d331a --- /dev/null +++ b/src/zql/ivm/graph/operators/reduce-operator.ts @@ -0,0 +1,126 @@ +import {Primitive} from '../../../ast/ast.js'; +import {assert} from '../../../error/asserts.js'; +import {flatMapIter} from '../../../util/iterables.js'; +import {Entry, Multiset} from '../../multiset.js'; +import {Version} from '../../types.js'; +import {DifferenceStreamReader} from '../difference-stream-reader.js'; +import {DifferenceStreamWriter} from '../difference-stream-writer.js'; +import {UnaryOperator} from './unary-operator.js'; + +/** + * Applies a `reduce` function against a stream of values. + * + * Since `reduce` is a stateful operation, we need to keep track of all the + * values that have been seen for a given key. + * + * If a given key has a member added or removed, we + * re-run the reduction function against the entire set of + * values for that key. + * + * In future iterations the reduction could also be made incremental. + */ +export class ReduceOperator< + K extends Primitive, + V, + O = V, +> extends UnaryOperator { + /** + * The set of all values that have been seen for a given key. + * + * Only positive multiplicities are expected to exist in this map. + * If a negative multiplicity comes through the pipeline, + * it reduces the multiplicity of the existing value in the map. + */ + readonly #inIndex = new Map>(); + /** + * Our prior reduction for a given key. + * + * This is used to retract reductions that are no longer valid. + * E.g., if someone downstream of us is maintaining a count + * then they'd need to know when a given reduction is no longer valid + * so they can remove it from their count. + */ + readonly #outIndex = new Map(); + readonly #getValueIdentity: (value: V) => string; + + constructor( + input: DifferenceStreamReader, + output: DifferenceStreamWriter, + getValueIdentity: (value: V) => string, + getGroupKey: (value: V) => K, + f: (input: Iterable) => O, + ) { + const inner = (version: Version) => { + const keysToDo = new Set(); + const ret: Entry[] = []; + for (const entry of this.inputMessages(version)) { + for (const [value, mult] of entry[1].entries) { + const key = getGroupKey(value); + keysToDo.add(key); + this.#addToIndex(key, value, mult); + } + } + + for (const k of keysToDo) { + const dataIn = this.#inIndex.get(k); + const existingOut = this.#outIndex.get(k); + if (dataIn === undefined) { + if (existingOut !== undefined) { + // retract the reduction + this.#outIndex.delete(k); + ret.push([existingOut, -1]); + } + continue; + } + + const reduction = f( + flatMapIter(dataIn.values(), function* ([v, mult]) { + for (let i = 0; i < mult; i++) { + yield v; + } + }), + ); + if (existingOut !== undefined) { + // modified reduction + ret.push([existingOut, -1]); + } + ret.push([reduction, 1]); + this.#outIndex.set(k, reduction); + } + + this._output.queueData([version, new Multiset(ret)]); + }; + super(input, output, inner); + this.#getValueIdentity = getValueIdentity; + } + + #addToIndex(key: K, value: V, mult: number) { + let existing = this.#inIndex.get(key); + if (existing === undefined) { + existing = new Map(); + this.#inIndex.set(key, existing); + } + const prev = existing.get(this.#getValueIdentity(value)); + if (prev === undefined) { + existing.set(this.#getValueIdentity(value), [value, mult]); + } else { + const [v, m] = prev; + const newMult = m + mult; + assert( + newMult >= 0, + 'Should not end up with a negative multiplicity when tracking all events for an item', + ); + if (newMult === 0) { + existing.delete(this.#getValueIdentity(value)); + if (existing.size === 0) { + this.#inIndex.delete(key); + return undefined; + } + } else { + existing.set(this.#getValueIdentity(value), [v, newMult]); + } + } + + return existing; + } +} diff --git a/src/zql/util/iterables.ts b/src/zql/util/iterables.ts index dc21c20..a2ed0be 100644 --- a/src/zql/util/iterables.ts +++ b/src/zql/util/iterables.ts @@ -7,3 +7,13 @@ export function* mapIter( yield f(t, index++); } } + +export function* flatMapIter( + iter: Iterable, + f: (t: T, index: number) => Iterable, +): Iterable { + let index = 0; + for (const t of iter) { + yield* f(t, index++); + } +}