diff --git a/src/zql/ast-to-ivm/pipeline-builder.test.ts b/src/zql/ast-to-ivm/pipeline-builder.test.ts index c3c1b36..e3fadcb 100644 --- a/src/zql/ast-to-ivm/pipeline-builder.test.ts +++ b/src/zql/ast-to-ivm/pipeline-builder.test.ts @@ -2,7 +2,7 @@ import {expect, test} from 'vitest'; import {z} from 'zod'; import {makeTestContext} from '../context/context.js'; import {Materialite} from '../ivm/materialite.js'; -import {EntityQueryImpl, astForTesting as ast} from '../query/entity-query.js'; +import {EntityQuery, astForTesting as ast} from '../query/entity-query.js'; import {buildPipeline} from './pipeline-builder.js'; const e1 = z.object({ @@ -16,7 +16,7 @@ type E1 = z.infer; const context = makeTestContext(); test('A simple select', () => { - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); const m = new Materialite(); let s = m.newStatelessSource(); let pipeline = buildPipeline( @@ -55,9 +55,9 @@ test('A simple select', () => { }); test('Count', () => { - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); const m = new Materialite(); - const s = m.newStatelessSource(); + const s = m.newStatelessSource(); const pipeline = buildPipeline(() => s.stream, ast(q.count())); let effectRunCount = 0; @@ -66,17 +66,17 @@ test('Count', () => { }); const expected = [1, 2, 1, 0]; - s.add({}); - s.add({}); - s.delete({}); - s.delete({}); + s.add({id: '1', a: 1, b: 1, d: false}); + s.add({id: '2', a: 1, b: 1, d: false}); + s.delete({id: '1', a: 1, b: 1, d: false}); + s.delete({id: '2', a: 1, b: 1, d: false}); expect(effectRunCount).toBe(4); }); test('Where', () => { - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); const m = new Materialite(); - const s = m.newStatelessSource(); + const s = m.newStatelessSource(); const pipeline = buildPipeline( () => s.stream, ast(q.select('id').where('a', '>', 1).where('b', '<', 2)), @@ -88,10 +88,10 @@ test('Where', () => { }); const expected = [{id: 'b'}]; - s.add({id: 'a', a: 1, b: 1n}); - s.add({id: 'b', a: 2, b: 1n}); - s.add({id: 'c', a: 1, b: 2n}); - s.add({id: 'd', a: 2, b: 2n}); + s.add({id: 'a', a: 1, b: 1, d: false}); + s.add({id: 'b', a: 2, b: 1, d: false}); + s.add({id: 'c', a: 1, b: 2, d: false}); + s.add({id: 'd', a: 2, b: 2, d: false}); expect(effectRunCount).toBe(1); }); diff --git a/src/zql/ast-to-ivm/pipeline-builder.ts b/src/zql/ast-to-ivm/pipeline-builder.ts index 6f01bf1..177830a 100644 --- a/src/zql/ast-to-ivm/pipeline-builder.ts +++ b/src/zql/ast-to-ivm/pipeline-builder.ts @@ -1,17 +1,19 @@ +import {Entity} from '../../generate.js'; import { AST, + Aggregation, Condition, Ordering, SimpleCondition, SimpleOperator, } from '../ast/ast.js'; -import {assert, must} from '../error/asserts.js'; +import {must} from '../error/asserts.js'; import {DifferenceStream} from '../ivm/graph/difference-stream.js'; export const orderingProp = Symbol(); export function buildPipeline( - sourceStreamProvider: (sourceName: string) => DifferenceStream, + sourceStreamProvider: (sourceName: string) => DifferenceStream, ast: AST, ) { // filters first @@ -26,12 +28,29 @@ export function buildPipeline( stream = applyWhere(stream, ast.where); } - let ret: DifferenceStream; - assert(ast.select, 'No select clause'); + let ret: DifferenceStream = stream; + if (ast.groupBy) { + ret = applyGroupBy( + ret as DifferenceStream, + ast.groupBy, + ast.aggregate ?? [], + Array.isArray(ast.select) ? ast.select : [], + ast.orderBy, + ); + } else if (ast.aggregate && ast.aggregate.length > 0) { + throw new Error( + 'Aggregates (other than count) without a group-by are not supported', + ); + } + if (ast.select === 'count') { - ret = stream.linearCount(); - } else { - ret = applySelect(stream, ast.select, ast.orderBy); + ret = ret.linearCount(); + } else if (ast.groupBy === undefined) { + ret = applySelect( + ret as DifferenceStream, + ast.select ?? [], + ast.orderBy, + ); } // Note: the stream is technically attached at this point. @@ -40,35 +59,48 @@ export function buildPipeline( } export function applySelect( - stream: DifferenceStream, + stream: DifferenceStream, select: string[], orderBy: Ordering | undefined, ) { return stream.map(x => { - const ret: Partial> = {}; - for (const field of select) { - ret[field] = (x as Record)[field]; - } - - const orderingValues: unknown[] = []; - if (orderBy !== undefined) { - for (const field of orderBy[0]) { - orderingValues.push((x as Record)[field]); + let ret: Record; + if (select.length === 0) { + ret = {...x}; + } else { + ret = {}; + for (const field of select) { + ret[field] = (x as Record)[field]; } } - Object.defineProperty(ret, orderingProp, { - enumerable: false, - writable: false, - configurable: false, - value: orderingValues, - }); + addOrdering(ret, x, orderBy); return ret; }); } -function applyWhere(stream: DifferenceStream, where: Condition) { +function addOrdering( + ret: Record, + row: Record, + orderBy: Ordering | undefined, +) { + const orderingValues: unknown[] = []; + if (orderBy !== undefined) { + for (const field of orderBy[0]) { + orderingValues.push(row[field]); + } + } + + Object.defineProperty(ret, orderingProp, { + enumerable: false, + writable: false, + configurable: false, + value: orderingValues, + }); +} + +function applyWhere(stream: DifferenceStream, where: Condition) { let ret = stream; // We'll handle `OR` and parentheticals like so: // OR: We'll create a new stream for the LHS and RHS of the OR then merge together. @@ -99,7 +131,7 @@ function applyWhere(stream: DifferenceStream, where: Condition) { } function applySimpleCondition( - stream: DifferenceStream, + stream: DifferenceStream, condition: SimpleCondition, ) { const operator = getOperator(condition.op); @@ -109,6 +141,102 @@ function applySimpleCondition( ); } +function applyGroupBy( + stream: DifferenceStream, + columns: string[], + aggregations: Aggregation[], + select: string[], + orderBy: Ordering | undefined, +) { + const keyFunction = makeKeyFunction(columns); + return stream.reduce( + keyFunction, + value => value.id as string, + values => { + const first = values[Symbol.iterator]().next().value; + const ret: Record = {}; + for (const column of select) { + ret[column] = first[column]; + } + addOrdering(ret, first, orderBy); + + for (const aggregation of aggregations) { + switch (aggregation.aggregate) { + case 'count': { + let count = 0; + for (const _ of values) { + count++; + } + ret[aggregation.alias] = count; + break; + } + case 'sum': { + let sum = 0; + for (const value of values) { + sum += value[aggregation.field as keyof T] as number; + } + ret[aggregation.alias] = sum; + break; + } + case 'avg': { + let sum = 0; + let count = 0; + for (const value of values) { + sum += value[aggregation.field as keyof T] as number; + count++; + } + ret[aggregation.alias] = sum / count; + break; + } + case 'min': { + let min = Infinity; + for (const value of values) { + min = Math.min( + min, + value[aggregation.field as keyof T] as number, + ); + } + ret[aggregation.alias] = min; + break; + } + case 'max': { + let max = -Infinity; + for (const value of values) { + max = Math.max( + max, + value[aggregation.field as keyof T] as number, + ); + } + ret[aggregation.alias] = max; + break; + } + case 'array': { + ret[aggregation.alias] = Array.from(values).map( + x => x[aggregation.field as keyof T], + ); + break; + } + default: + throw new Error(`Unknown aggregation ${aggregation.aggregate}`); + } + } + return ret; + }, + ); +} + +function makeKeyFunction(columns: string[]) { + return (x: Record) => { + const ret: unknown[] = []; + for (const column of columns) { + ret.push(x[column]); + } + // Would it be better to come up with someh hash function + // which can handle complex types? + return JSON.stringify(ret); + }; +} + // We're well-typed in the query builder so once we're down here // we can assume that the operator is valid. // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/src/zql/ast/ast.ts b/src/zql/ast/ast.ts index f837ef2..05c6831 100644 --- a/src/zql/ast/ast.ts +++ b/src/zql/ast/ast.ts @@ -5,6 +5,18 @@ // input to the query builder. export type Ordering = readonly [readonly string[], 'asc' | 'desc']; export type Primitive = string | number | boolean | null; + +// I think letting users provide their own lambda functions +// to perform the aggregation would make the most sense. +// We should should extend that to let users provide `filter`, `map`, and `reduce` lambdas +// to do things not available in the query language itself. +export type Aggregate = 'sum' | 'avg' | 'min' | 'max' | 'array' | 'count'; +export type Aggregation = { + readonly field: string; + readonly alias: string; + readonly aggregate: Aggregate; +}; + // type Ref = `${string}.${string}`; /** @@ -27,8 +39,9 @@ export type AST = { // readonly on: ConditionList; // }[]; readonly limit?: number | undefined; - // readonly groupBy?: string[]; + readonly groupBy?: string[]; readonly orderBy: Ordering; + readonly aggregate?: Aggregation[]; // readonly after?: Primitive; }; diff --git a/src/zql/context/replicache-context.test.ts b/src/zql/context/replicache-context.test.ts index b7983f6..5eebaf9 100644 --- a/src/zql/context/replicache-context.test.ts +++ b/src/zql/context/replicache-context.test.ts @@ -4,7 +4,7 @@ import {expect, test} from 'vitest'; import {z} from 'zod'; import {generate} from '../../generate.js'; import {SetSource} from '../ivm/source/set-source.js'; -import {EntityQueryImpl} from '../query/entity-query.js'; +import {EntityQuery} from '../query/entity-query.js'; import {makeReplicacheContext} from './replicache-context.js'; const e1 = z.object({ @@ -136,7 +136,7 @@ test('ZQL query with Replicache', async () => { const r = newRep(); const context = makeReplicacheContext(r); - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); const view = q.select('id').where('str', '>', 'm').prepare().view(); diff --git a/src/zql/integration.test.ts b/src/zql/integration.test.ts index 48fb27a..b28d7ae 100644 --- a/src/zql/integration.test.ts +++ b/src/zql/integration.test.ts @@ -1,11 +1,12 @@ -import fc from 'fast-check'; -import {nanoid} from 'nanoid'; -import {Replicache, TEST_LICENSE_KEY} from 'replicache'; import {expect, test} from 'vitest'; import {z} from 'zod'; import {generate} from '../generate.js'; import {makeReplicacheContext} from './context/replicache-context.js'; -import {EntityQueryImpl} from './query/entity-query.js'; +import {Replicache, TEST_LICENSE_KEY} from 'replicache'; +import {nanoid} from 'nanoid'; +import fc from 'fast-check'; +import {EntityQuery} from './query/entity-query.js'; +import * as agg from './query/agg.js'; export async function tickAFewTimes(n = 10, time = 0) { for (let i = 0; i < n; i++) { @@ -83,7 +84,7 @@ function sampleTenUniqueIssues() { function setup() { const r = newRep(); const c = makeReplicacheContext(r); - const q = new EntityQueryImpl<{fields: Issue}>(c, 'issue'); + const q = new EntityQuery<{fields: Issue}>(c, 'issue'); return {r, c, q}; } @@ -394,7 +395,112 @@ test('order by optional field', async () => { test('join', () => {}); test('having', () => {}); -test('group by', () => {}); + +test('group by', async () => { + const {q, r} = setup(); + const issues: Issue[] = [ + { + id: 'a', + title: 'foo', + status: 'open', + priority: 'high', + assignee: 'charles', + created: new Date('2024-01-01').getTime(), + updated: Date.now(), + }, + { + id: 'b', + title: 'bar', + status: 'open', + priority: 'medium', + assignee: 'bob', + created: new Date('2024-01-02').getTime(), + updated: Date.now(), + }, + { + id: 'c', + title: 'baz', + status: 'closed', + priority: 'low', + assignee: 'alice', + created: new Date('2024-01-03').getTime(), + updated: Date.now(), + }, + ] as const; + await Promise.all(issues.map(r.mutate.initIssue)); + const stmt = q + .select('status', agg.count('status', 'count')) + .groupBy('status') + .prepare(); + const rows = await stmt.exec(); + + expect(rows).toEqual([ + {status: 'open', count: 2}, + {status: 'closed', count: 1}, + ]); + + stmt.destroy(); + + const stmt2 = q + .select('status', agg.array('assignee')) + .groupBy('status') + .prepare(); + const rows2 = await stmt2.exec(); + + expect(rows2).toEqual([ + {status: 'open', assignee: ['charles', 'bob']}, + {status: 'closed', assignee: ['alice']}, + ]); + + const stmt3 = q + .select('status', agg.array('assignee'), agg.min('created')) + .groupBy('status') + .prepare(); + const rows3 = await stmt3.exec(); + + expect(rows3).toEqual([ + { + status: 'open', + assignee: ['charles', 'bob'], + created: issues[0].created, + }, + { + status: 'closed', + assignee: ['alice'], + created: issues[2].created, + }, + ]); + + const stmt4 = q + .select( + 'status', + agg.array('assignee'), + agg.min('created', 'minCreated'), + agg.max('created', 'maxCreated'), + ) + .groupBy('status') + .prepare(); + const rows4 = await stmt4.exec(); + + expect(rows4).toEqual([ + { + status: 'open', + assignee: ['charles', 'bob'], + minCreated: issues[0].created, + maxCreated: issues[1].created, + }, + { + status: 'closed', + assignee: ['alice'], + minCreated: issues[2].created, + maxCreated: issues[2].created, + }, + ]); + + await r.close(); +}); + +test('sorted groupings', () => {}); test('compound where', async () => { const {q, r} = setup(); @@ -448,8 +554,6 @@ test('limit', () => {}); // To be implemented here: `asEntries` in `set-source.ts` test('after', () => {}); -test('sorted groupings', () => {}); - test('adding items late to a source materializes them in the correct order', () => {}); test('disposing of a subscription causes us to no longer be called back', () => {}); diff --git a/src/zql/ivm/graph/difference-stream-reader.ts b/src/zql/ivm/graph/difference-stream-reader.ts index 42e3a49..5c00178 100644 --- a/src/zql/ivm/graph/difference-stream-reader.ts +++ b/src/zql/ivm/graph/difference-stream-reader.ts @@ -1,10 +1,10 @@ import {invariant, must} from '../../error/asserts.js'; +import {Multiset} from '../multiset.js'; import {Version} from '../types.js'; -import {Queue, QueueEntry} from './queue.js'; import {DifferenceStreamWriter} from './difference-stream-writer.js'; -import {Operator} from './operators/operator.js'; import {Request} from './message.js'; -import {Multiset} from '../multiset.js'; +import {Operator} from './operators/operator.js'; +import {Queue, QueueEntry} from './queue.js'; /** * Represents the input to an operator. @@ -21,13 +21,12 @@ import {Multiset} from '../multiset.js'; * o o o */ export class DifferenceStreamReader { - protected readonly _queue; + protected readonly _queue = new Queue(); readonly #upstreamWriter; #downstreamOperator: Operator | null = null; #lastSeenVersion: Version = -1; constructor(upstream: DifferenceStreamWriter) { - this._queue = new Queue(); this.#upstreamWriter = upstream; } diff --git a/src/zql/ivm/graph/difference-stream.ts b/src/zql/ivm/graph/difference-stream.ts index 3a43e01..b41edef 100644 --- a/src/zql/ivm/graph/difference-stream.ts +++ b/src/zql/ivm/graph/difference-stream.ts @@ -1,3 +1,4 @@ +import {Primitive} from '../../ast/ast.js'; import {Multiset} from '../multiset.js'; import {Source} from '../source/source.js'; import {Version} from '../types.js'; @@ -11,6 +12,7 @@ import {DebugOperator} from './operators/debug-operator.js'; import {DifferenceEffectOperator} from './operators/difference-effect-operator.js'; import {FilterOperator} from './operators/filter-operator.js'; import {MapOperator} from './operators/map-operator.js'; +import {ReduceOperator} from './operators/reduce-operator.js'; import {QueueEntry} from './queue.js'; /** @@ -48,6 +50,22 @@ export class DifferenceStream implements IDifferenceStream { return ret; } + reduce( + getKey: (value: T) => K, + getIdentity: (value: T) => string, + f: (input: Iterable) => O, + ): DifferenceStream { + const ret = new DifferenceStream(); + new ReduceOperator( + this.#upstreamWriter.newReader(), + ret.#upstreamWriter, + getIdentity, + getKey, + f, + ); + return ret; + } + /** * This differs from count in that `size` just counts the entire * stream whereas `count` counts the number of times each key appears. diff --git a/src/zql/ivm/graph/operators/reduce-operator.test.ts b/src/zql/ivm/graph/operators/reduce-operator.test.ts new file mode 100644 index 0000000..d399e08 --- /dev/null +++ b/src/zql/ivm/graph/operators/reduce-operator.test.ts @@ -0,0 +1,178 @@ +import {expect, test} from 'vitest'; +import {Multiset} from '../../multiset.js'; +import {DifferenceStreamWriter} from '../difference-stream-writer.js'; +import {NoOp} from './operator.js'; +import {ReduceOperator} from './reduce-operator.js'; + +type Thing = { + id: string; + value: number; + groupKey: string; +}; + +type Reduction = { + id: string; + sum: number; +}; + +test('collects all things with the same key', () => { + const inputWriter = new DifferenceStreamWriter(); + const inputReader = inputWriter.newReader(); + const output = new DifferenceStreamWriter(); + + function getGroupKey(t: Thing) { + return t.groupKey; + } + function getValueIdentity(t: Thing) { + return t.id; + } + + new ReduceOperator( + inputReader, + output, + getValueIdentity, + getGroupKey, + (group: Iterable) => { + let sum = 0; + let id = ''; + for (const item of group) { + id = item.groupKey; + sum += item.value; + } + + return { + id, + sum, + }; + }, + ); + + const outReader = output.newReader(); + outReader.setOperator(new NoOp()); + + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'a', + value: 1, + groupKey: 'x', + }, + 1, + ], + [ + { + id: 'b', + value: 2, + groupKey: 'x', + }, + 2, + ], + ]), + ]); + check([[{id: 'x', sum: 5}, 1]]); + + // retract an item + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'a', + value: 1, + groupKey: 'x', + }, + -1, + ], + ]), + ]); + check([ + [{id: 'x', sum: 5}, -1], + [{id: 'x', sum: 4}, 1], + ]); + + // fully retract items that constitute a grouping + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'b', + value: 2, + groupKey: 'x', + }, + -2, + ], + ]), + ]); + check([[{id: 'x', sum: 4}, -1]]); + + // add more entries + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'a', + value: 1, + groupKey: 'c', + }, + 1, + ], + ]), + ]); + check([[{id: 'c', sum: 1}, 1]]); + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'b', + value: 2, + groupKey: 'c', + }, + 1, + ], + ]), + ]); + check([ + [{id: 'c', sum: 1}, -1], + [{id: 'c', sum: 3}, 1], + ]); + + inputWriter.queueData([ + 1, + new Multiset([ + [ + { + id: 'a', + value: 1, + groupKey: 'c', + }, + -1, + ], + [ + { + id: 'a', + value: 2, + groupKey: 'c', + }, + 1, + ], + ]), + ]); + check([ + [{id: 'c', sum: 3}, -1], + [{id: 'c', sum: 4}, 1], + ]); + + function check(expected: [Reduction, number][]) { + inputWriter.notify(1); + inputWriter.notifyCommitted(1); + const items = outReader.drain(1); + expect(items.length).toBe(1); + const entry = items[0]; + expect([...entry[1].entries]).toEqual(expected); + } +}); diff --git a/src/zql/ivm/graph/operators/reduce-operator.ts b/src/zql/ivm/graph/operators/reduce-operator.ts new file mode 100644 index 0000000..af3d3da --- /dev/null +++ b/src/zql/ivm/graph/operators/reduce-operator.ts @@ -0,0 +1,130 @@ +import {Primitive} from '../../../ast/ast.js'; +import {assert} from '../../../error/asserts.js'; +import {flatMapIter} from '../../../util/iterables.js'; +import {Entry, Multiset} from '../../multiset.js'; +import {Version} from '../../types.js'; +import {DifferenceStreamReader} from '../difference-stream-reader.js'; +import {DifferenceStreamWriter} from '../difference-stream-writer.js'; +import {UnaryOperator} from './unary-operator.js'; + +/** + * Applies a `reduce` function against a stream of values. + * + * Since `reduce` is a stateful operation, we need to keep track of all the + * values that have been seen for a given key. + * + * If a given key has a member added or removed, we + * re-run the reduction function against the entire set of + * values for that key. + * + * In future iterations the reduction could also be made incremental. + */ +export class ReduceOperator< + K extends Primitive, + V, + O = V, +> extends UnaryOperator { + /** + * The set of all values that have been seen for a given key. + * + * Only positive multiplicities are expected to exist in this map. + * If a negative multiplicity comes through the pipeline, + * it reduces the multiplicity of the existing value in the map. + */ + readonly #inIndex = new Map>(); + /** + * Our prior reduction for a given key. + * + * This is used to retract reductions that are no longer valid. + * E.g., if someone downstream of us is maintaining a count + * then they'd need to know when a given reduction is no longer valid + * so they can remove it from their count. + */ + readonly #outIndex = new Map(); + readonly #getValueIdentity: (value: V) => string; + + constructor( + input: DifferenceStreamReader, + output: DifferenceStreamWriter, + getValueIdentity: (value: V) => string, + getGroupKey: (value: V) => K, + f: (input: Iterable) => O, + ) { + const inner = (version: Version) => { + const keysToProcess = new Set(); + const ret: Entry[] = []; + for (const entry of this.inputMessages(version)) { + for (const [value, mult] of entry[1].entries) { + const key = getGroupKey(value); + keysToProcess.add(key); + this.#addToIndex(key, value, mult); + } + } + + for (const k of keysToProcess) { + const dataIn = this.#inIndex.get(k); + const existingOut = this.#outIndex.get(k); + if (dataIn === undefined) { + if (existingOut !== undefined) { + // retract the reduction + this.#outIndex.delete(k); + ret.push([existingOut, -1]); + } + continue; + } + + const reduction = f( + flatMapIter( + () => dataIn.values(), + function* ([v, mult]) { + for (let i = 0; i < mult; i++) { + yield v; + } + }, + ), + ); + if (existingOut !== undefined) { + // modified reduction + ret.push([existingOut, -1]); + } + ret.push([reduction, 1]); + this.#outIndex.set(k, reduction); + } + + this._output.queueData([version, new Multiset(ret)]); + }; + super(input, output, inner); + this.#getValueIdentity = getValueIdentity; + } + + #addToIndex(key: K, value: V, mult: number) { + let existing = this.#inIndex.get(key); + if (existing === undefined) { + existing = new Map(); + this.#inIndex.set(key, existing); + } + const valueIdentity = this.#getValueIdentity(value); + const prev = existing.get(valueIdentity); + if (prev === undefined) { + existing.set(valueIdentity, [value, mult]); + } else { + const [v, m] = prev; + const newMult = m + mult; + assert( + newMult >= 0, + 'Should not end up with a negative multiplicity when tracking all events for an item', + ); + if (newMult === 0) { + existing.delete(valueIdentity); + if (existing.size === 0) { + this.#inIndex.delete(key); + return undefined; + } + } else { + existing.set(valueIdentity, [v, newMult]); + } + } + + return existing; + } +} diff --git a/src/zql/ivm/graph/queue.ts b/src/zql/ivm/graph/queue.ts index ff2fb4e..cafb829 100644 --- a/src/zql/ivm/graph/queue.ts +++ b/src/zql/ivm/graph/queue.ts @@ -4,8 +4,8 @@ import {Version} from '../types.js'; import {Reply} from './message.js'; export type QueueEntry = - | readonly [Version, Multiset, Reply] - | readonly [Version, Multiset]; + | readonly [version: Version, multiset: Multiset, reply: Reply] + | readonly [version: Version, multiset: Multiset]; type Node = { data: QueueEntry; diff --git a/src/zql/ivm/notes.md b/src/zql/ivm/notes.md index e69de29..7d4654a 100644 --- a/src/zql/ivm/notes.md +++ b/src/zql/ivm/notes.md @@ -0,0 +1,2 @@ +- aliasable select +- encode fully qualified field into ast for select, group-by, join, where, etc. diff --git a/src/zql/query/agg.ts b/src/zql/query/agg.ts new file mode 100644 index 0000000..6624de7 --- /dev/null +++ b/src/zql/query/agg.ts @@ -0,0 +1,144 @@ +type AggregateBase = { + field: Field; + alias: Alias; +}; + +export type Aggregate = + | Min + | Max + | Sum + | Count + | Avg + | AggArray; + +type Min = { + aggregate: 'min'; +} & AggregateBase; + +type Max = { + aggregate: 'max'; +} & AggregateBase; + +type Sum = { + aggregate: 'sum'; +} & AggregateBase; + +type Avg = { + aggregate: 'avg'; +} & AggregateBase; + +export type Count = { + aggregate: 'count'; +} & AggregateBase; + +export type AggArray = { + aggregate: 'array'; +} & AggregateBase; + +export function min(field: Field): Min; +export function min( + field: Field, + alias: Alias, +): Min; +export function min( + field: Field, + alias?: Alias | undefined, +): Min { + return { + aggregate: 'min', + field, + alias: alias ?? (field as unknown as Alias), + }; +} + +export function max(field: Field): Max; +export function max( + field: Field, + alias: Alias, +): Max; +export function max( + field: Field, + alias?: Alias | undefined, +): Max { + return { + aggregate: 'max', + field, + alias: alias ?? (field as unknown as Alias), + }; +} + +export function sum(field: Field): Sum; +export function sum( + field: Field, + alias: Alias, +): Sum; +export function sum( + field: Field, + alias?: Alias | undefined, +): Sum { + return { + aggregate: 'sum', + field, + alias: alias ?? (field as unknown as Alias), + }; +} + +export function count(field: Field): Count; +export function count( + field: Field, + alias: Alias, +): Count; +export function count( + field: Field, + alias?: Alias | undefined, +): Count { + return { + aggregate: 'count', + field, + alias: alias ?? (field as unknown as Alias), + }; +} + +export function avg(field: Field): Avg; +export function avg( + field: Field, + alias: Alias, +): Avg; +export function avg( + field: Field, + alias?: Alias | undefined, +): Avg { + return { + aggregate: 'avg', + field, + alias: alias ?? (field as unknown as Alias), + }; +} + +export function array( + field: Field, +): AggArray; +export function array( + field: Field, + alias: Alias, +): AggArray; +export function array( + field: Field, + alias?: Alias | undefined, +): AggArray { + return { + aggregate: 'array', + field, + alias: alias ?? (field as unknown as Alias), + }; +} + +export function isAggregate( + x: unknown, +): x is Aggregate { + return ( + x !== null && + typeof x === 'object' && + typeof (x as Record).aggregate === 'string' + ); +} diff --git a/src/zql/query/entity-query.test.ts b/src/zql/query/entity-query.test.ts index 09aa10e..822502e 100644 --- a/src/zql/query/entity-query.test.ts +++ b/src/zql/query/entity-query.test.ts @@ -2,7 +2,8 @@ import {expect, expectTypeOf, test} from 'vitest'; import {z} from 'zod'; import {makeTestContext} from '../context/context.js'; import {Misuse} from '../error/misuse.js'; -import {EntityQueryImpl, astForTesting as ast} from './entity-query.js'; +import {EntityQuery, astForTesting as ast} from './entity-query.js'; +import * as agg from './agg.js'; const context = makeTestContext(); test('query types', () => { @@ -14,7 +15,7 @@ test('query types', () => { [sym]: boolean; }; - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); // @ts-expect-error - selecting fields that do not exist in the schema is a type error q.select('does-not-exist'); @@ -33,16 +34,6 @@ test('query types', () => { >(); // where/order/limit do not change return type - expectTypeOf( - q - .select('id', 'str') - .where('id', '<', '123') - .limit(1) - .asc('id') - .prepare() - .exec(), - ).toMatchTypeOf>(); - expectTypeOf(q.where).toBeCallableWith('id', '=', 'foo'); expectTypeOf(q.where).toBeCallableWith('str', '<', 'foo'); expectTypeOf(q.where).toBeCallableWith('optStr', '>', 'foo'); @@ -60,6 +51,21 @@ test('query types', () => { // @ts-expect-error - Argument of type 'unique symbol' is not assignable to parameter of type 'FieldName<{ fields: E1; }>'.ts(2345) q.where(sym, '==', true); + + // @ts-expect-error - 'x' is not a field that we can aggregate on + q.select(agg.array('x')).groupBy('id'); + + expectTypeOf( + q.select('id', agg.array('str')).groupBy('optStr').prepare().exec(), + ).toMatchTypeOf>(); + + expectTypeOf( + q + .select('id', agg.array('str', 'alias')) + .groupBy('optStr') + .prepare() + .exec(), + ).toMatchTypeOf>(); }); const e1 = z.object({ @@ -80,7 +86,7 @@ const dummyObject: E1 = { }; test('ast: select', () => { - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); // each individual field is selectable on its own Object.keys(dummyObject).forEach(k => { @@ -114,19 +120,19 @@ test('ast: count', () => { // Cannot select fields in addition to a count. // A query is one or the other: count query or selection query. expect(() => - new EntityQueryImpl<{fields: E1}>(context, 'e1').select('id').count(), + new EntityQuery<{fields: E1}>(context, 'e1').select('id').count(), ).toThrow(Misuse); expect(() => - new EntityQueryImpl<{fields: E1}>(context, 'e1').count().select('id'), + new EntityQuery<{fields: E1}>(context, 'e1').count().select('id'), ).toThrow(Misuse); // selection set is the literal `count`, not an array of fields - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1').count(); + const q = new EntityQuery<{fields: E1}>(context, 'e1').count(); expect(ast(q).select).toEqual('count'); }); test('ast: where', () => { - let q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + let q = new EntityQuery<{fields: E1}>(context, 'e1'); // where is applied q = q.where('id', '=', 'a'); @@ -214,7 +220,7 @@ test('ast: where', () => { }); test('ast: limit', () => { - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); expect({...ast(q.limit(10)), alias: 0}).toEqual({ orderBy: [['id'], 'asc'], alias: 0, @@ -224,7 +230,7 @@ test('ast: limit', () => { }); test('ast: asc/desc', () => { - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); // order methods update the ast expect({...ast(q.asc('id')), alias: 0}).toEqual({ @@ -245,7 +251,7 @@ test('ast: asc/desc', () => { }); test('ast: independent of method call order', () => { - const base = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const base = new EntityQuery<{fields: E1}>(context, 'e1'); const calls = { select(q: typeof base) { diff --git a/src/zql/query/entity-query.ts b/src/zql/query/entity-query.ts index a795329..de8348b 100644 --- a/src/zql/query/entity-query.ts +++ b/src/zql/query/entity-query.ts @@ -1,5 +1,6 @@ import { AST, + Aggregation, Condition, Primitive, SimpleCondition, @@ -9,6 +10,7 @@ import {Context} from '../context/context.js'; import {must} from '../error/asserts.js'; import {Misuse} from '../error/misuse.js'; import {EntitySchema} from '../schema/entity-schema.js'; +import {AggArray, Aggregate, Count, isAggregate} from './agg.js'; import {Statement} from './statement.js'; type FieldValue< @@ -16,6 +18,13 @@ type FieldValue< K extends Selectable, > = S['fields'][K] extends Primitive | undefined ? S['fields'][K] : never; +type AggregateValue> = + K extends Count + ? number + : K extends AggArray + ? S['fields'][K['field']][] + : S['fields'][K['field']]; + export type SelectedFields< S extends EntitySchema, Fields extends Selectable[], @@ -24,12 +33,39 @@ export type SelectedFields< Fields[number] extends keyof S['fields'] ? Fields[number] : never >; +type SelectedAggregates< + S extends EntitySchema, + Aggregates extends Aggregable[], +> = { + [K in Aggregates[number]['alias']]: AggregateValue< + S, + Extract + >; +}; + type AsString = T extends string ? T : never; export type Selectable = | AsString | 'id'; +type Aggregable = Aggregate< + AsString, + string +>; + +type ToSelectableOnly = T extends (infer U)[] + ? U extends Selectable + ? U[] + : never + : never; + +type ToAggregableOnly = T extends (infer U)[] + ? U extends Aggregable + ? U[] + : never + : never; + /** * Have you ever noticed that when you hover over Types in TypeScript, it shows * Pick, K>? Rather than the final object structure after picking and omitting? @@ -42,30 +78,9 @@ export type MakeHumanReadable = {} & { readonly [P in keyof T]: T[P] extends string ? T[P] : MakeHumanReadable; }; -export interface EntityQuery { - readonly select: []>( - ...x: Fields - ) => EntityQuery[]>; - readonly count: () => EntityQuery; - readonly where: >( - f: Key, - op: SimpleOperator, - value: FieldValue, - ) => EntityQuery; - readonly limit: (n: number) => EntityQuery; - readonly asc: (...x: Selectable[]) => EntityQuery; - readonly desc: (...x: Selectable[]) => EntityQuery; - - // TODO: we can probably skip the `prepare` step and just have `materialize` - // Although we'd need the prepare step in order to get a stmt to change bindings. - readonly prepare: () => Statement; -} - let aliasCount = 0; -export class EntityQueryImpl - implements EntityQuery -{ +export class EntityQuery { readonly #ast: AST; readonly #name: string; readonly #context: Context; @@ -83,25 +98,39 @@ export class EntityQueryImpl astWeakMap.set(this, this.#ast); } - select[]>(...x: Fields) { + select | Aggregable)[]>(...x: Fields) { + // TODO: we should drop the explicit `count` API and use `agg.count` instead if (this.#ast.select === 'count') { throw new Misuse( 'A query can either return fields or a count, not both.', ); } const select = new Set(this.#ast.select); + const aggregate: Aggregation[] = []; for (const more of x) { - select.add(more); + if (!isAggregate(more)) { + select.add(more); + continue; + } + aggregate.push(more); } - return new EntityQueryImpl[]>( - this.#context, - this.#name, - { - ...this.#ast, - select: [...select], - }, - ); + return new EntityQuery< + S, + (SelectedFields> & + SelectedAggregates>)[] + >(this.#context, this.#name, { + ...this.#ast, + select: [...select], + aggregate, + }); + } + + groupBy>(...x: K[]) { + return new EntityQuery(this.#context, this.#name, { + ...this.#ast, + groupBy: x as string[], + }); } where>( @@ -133,7 +162,7 @@ export class EntityQueryImpl }; } - return new EntityQueryImpl(this.#context, this.#name, { + return new EntityQuery(this.#context, this.#name, { ...this.#ast, where: cond, }); @@ -144,7 +173,7 @@ export class EntityQueryImpl throw new Misuse('Limit already set'); } - return new EntityQueryImpl(this.#context, this.#name, { + return new EntityQuery(this.#context, this.#name, { ...this.#ast, limit: n, }); @@ -155,7 +184,7 @@ export class EntityQueryImpl x.push('id'); } - return new EntityQueryImpl(this.#context, this.#name, { + return new EntityQuery(this.#context, this.#name, { ...this.#ast, orderBy: [x, 'asc'], }); @@ -166,7 +195,7 @@ export class EntityQueryImpl x.push('id'); } - return new EntityQueryImpl(this.#context, this.#name, { + return new EntityQuery(this.#context, this.#name, { ...this.#ast, orderBy: [x, 'desc'], }); @@ -178,7 +207,7 @@ export class EntityQueryImpl 'Selection set already set. Will not change to a count query.', ); } - return new EntityQueryImpl(this.#context, this.#name, { + return new EntityQuery(this.#context, this.#name, { ...this.#ast, select: 'count', }); @@ -189,8 +218,8 @@ export class EntityQueryImpl } } -const astWeakMap = new WeakMap, AST>(); +const astWeakMap = new WeakMap, AST>(); -export function astForTesting(q: EntityQueryImpl): AST { +export function astForTesting(q: EntityQuery): AST { return must(astWeakMap.get(q)); } diff --git a/src/zql/query/statement.test.ts b/src/zql/query/statement.test.ts index 231057a..b04a417 100644 --- a/src/zql/query/statement.test.ts +++ b/src/zql/query/statement.test.ts @@ -2,7 +2,7 @@ import {expect, test} from 'vitest'; import {z} from 'zod'; import {orderingProp} from '../ast-to-ivm/pipeline-builder.js'; import {makeTestContext} from '../context/context.js'; -import {EntityQueryImpl} from './entity-query.js'; +import {EntityQuery} from './entity-query.js'; import {ascComparator} from './statement.js'; const e1 = z.object({ @@ -13,7 +13,7 @@ const e1 = z.object({ type E1 = z.infer; test('basic materialization', () => { const context = makeTestContext(); - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); const stmt = q.select('id', 'n').where('n', '>', 100).prepare(); @@ -43,7 +43,7 @@ test('basic materialization', () => { test('sorted materialization', () => { const context = makeTestContext(); type E1 = z.infer; - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); const ascView = q.select('id').asc('n').prepare().view(); const descView = q.select('id').desc('n').prepare().view(); @@ -67,7 +67,7 @@ test('sorted materialization', () => { test('sorting is stable via suffixing the primary key to the order', () => { const context = makeTestContext(); type E1 = z.infer; - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); const ascView = q.select('id').asc('n').prepare().view(); const descView = q.select('id').desc('n').prepare().view(); @@ -115,7 +115,7 @@ test('ascComparator', () => { test('destroying the statement stops updating the view', async () => { const context = makeTestContext(); - const q = new EntityQueryImpl<{fields: E1}>(context, 'e1'); + const q = new EntityQuery<{fields: E1}>(context, 'e1'); const stmt = q.select('id', 'n').prepare(); diff --git a/src/zql/util/iterables.test.ts b/src/zql/util/iterables.test.ts new file mode 100644 index 0000000..04171e9 --- /dev/null +++ b/src/zql/util/iterables.test.ts @@ -0,0 +1,20 @@ +import {expect, test} from 'vitest'; +import {flatMapIter, mapIter} from './iterables.js'; + +test('mapIter', () => { + const iterable = [1, 2, 3]; + const result = mapIter(iterable, (x, i) => x + i); + expect([...result]).toEqual([1, 3, 5]); +}); + +test('flatMapIter', () => { + const iterable = [[1], [2, 3], [4, 5, 6]]; + const flatMapper = flatMapIter( + () => iterable, + x => x, + ); + + expect([...flatMapper]).toEqual([1, 2, 3, 4, 5, 6]); + // can iterate it a second time + expect([...flatMapper]).toEqual([1, 2, 3, 4, 5, 6]); +}); diff --git a/src/zql/util/iterables.ts b/src/zql/util/iterables.ts index dc21c20..75a5ab6 100644 --- a/src/zql/util/iterables.ts +++ b/src/zql/util/iterables.ts @@ -7,3 +7,23 @@ export function* mapIter( yield f(t, index++); } } + +/** + * Flat maps the items returned from the iterable. + * + * `iter` is a lambda that returns an iterable + * so this function can return an `IterableIterator` + */ +export function flatMapIter( + iter: () => Iterable, + f: (t: T, index: number) => Iterable, +) { + return { + *[Symbol.iterator]() { + let index = 0; + for (const t of iter()) { + yield* f(t, index++); + } + }, + }; +}