Skip to content

Commit

Permalink
implement a basic reduce operator to support group-by
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Mar 20, 2024
1 parent 4ad041e commit 58699cf
Show file tree
Hide file tree
Showing 7 changed files with 444 additions and 5 deletions.
46 changes: 42 additions & 4 deletions src/zql/ast-to-ivm/pipeline-builder.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import {Entity} from '../../generate.js';
import {AST, Condition, ConditionList, Operator, Ordering} from '../ast/ast.js';
import {assert, must} from '../error/asserts.js';
import {DifferenceStream} from '../ivm/graph/difference-stream.js';

export const orderingProp = Symbol();

export function buildPipeline(
sourceStreamProvider: (sourceName: string) => DifferenceStream<unknown>,
sourceStreamProvider: (sourceName: string) => DifferenceStream<Entity>,
ast: AST,
) {
// filters first
Expand All @@ -28,13 +29,18 @@ export function buildPipeline(
ret = applySelect(stream, ast.select, ast.orderBy);
}

if (ast.groupBy) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ret = applyGroupBy(ret as any, ast.orderBy, ast.groupBy);
}

// Note: the stream is technically attached at this point.
// We could detach it until the user actually runs (or subscribes to) the statement as a tiny optimization.
return ret;
}

export function applySelect(
stream: DifferenceStream<unknown>,
stream: DifferenceStream<Entity>,
select: string[],
orderBy: Ordering | undefined,
) {
Expand Down Expand Up @@ -62,7 +68,7 @@ export function applySelect(
});
}

function applyWhere(stream: DifferenceStream<unknown>, where: ConditionList) {
function applyWhere(stream: DifferenceStream<Entity>, where: ConditionList) {
let ret = stream;
// We'll handle `OR` and parentheticals like so:
// OR: We'll create a new stream for the LHS and RHS of the OR then merge together.
Expand Down Expand Up @@ -93,7 +99,7 @@ function applyWhere(stream: DifferenceStream<unknown>, where: ConditionList) {
}

function applyCondition(
stream: DifferenceStream<unknown>,
stream: DifferenceStream<Entity>,
condition: Condition,
) {
const operator = getOperator(condition.op);
Expand All @@ -103,6 +109,38 @@ function applyCondition(
);
}

function applyGroupBy(
stream: DifferenceStream<
Record<string, unknown> & {
[orderingProp]: unknown[];
}
>,
orderBy: Ordering,
columns: string[],
) {
const keyFunction = makeKeyFunction(columns);
const idIdx = orderBy[0].indexOf('id');
return stream.reduce(
keyFunction,
value => value[orderingProp][idIdx] as string,
// TODO: apply aggregate functions against the group if specified
// e.g., count(x), sum(x), avg(x), array(x) etc.
values => values[Symbol.iterator]().next().value,
);
}

function makeKeyFunction(columns: string[]) {
return (x: Record<string, unknown>) => {
const ret: unknown[] = [];
for (const column of columns) {
ret.push(x[column]);
}
// Would it be better to come up with someh hash function
// which can handle complex types?
return JSON.stringify(ret);
};
}

// We're well-typed in the query builder so once we're down here
// we can assume that the operator is valid.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
2 changes: 1 addition & 1 deletion src/zql/ast/ast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export type AST = {
// readonly on: ConditionList;
// }[];
readonly limit?: number | undefined;
// readonly groupBy?: string[];
readonly groupBy?: string[];
readonly orderBy: Ordering;
// readonly after?: Primitive;
};
Expand Down
18 changes: 18 additions & 0 deletions src/zql/ivm/graph/difference-stream.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import {Entity} from '../../../generate.js';
import {Primitive} from '../../ast/ast.js';
import {Multiset} from '../multiset.js';
import {Source} from '../source/source.js';
import {Version} from '../types.js';
Expand All @@ -11,6 +13,7 @@ import {DebugOperator} from './operators/debug-operator.js';
import {DifferenceEffectOperator} from './operators/difference-effect-operator.js';
import {FilterOperator} from './operators/filter-operator.js';
import {MapOperator} from './operators/map-operator.js';
import {ReduceOperator} from './operators/reduce-operator.js';
import {QueueEntry} from './queue.js';

/**
Expand Down Expand Up @@ -48,6 +51,21 @@ export class DifferenceStream<T> implements IDifferenceStream<T> {
return ret;
}

reduce<K extends Primitive, O extends Entity>(
getKey: (value: T) => K,
getIdentity: (value: T) => string,
f: (input: Iterable<T>) => O,
): DifferenceStream<O> {
const ret = this.newStream<O>();
new ReduceOperator<K, T, O>(
this.#upstreamWriter.newReader(),
ret.#upstreamWriter,
getKey,
f,
);
return ret;
}

/**
* This differs from count in that `size` just counts the entire
* stream whereas `count` counts the number of times each key appears.
Expand Down
69 changes: 69 additions & 0 deletions src/zql/ivm/graph/operators/operator-index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import {Entity} from '../../../../generate.js';
import {Primitive} from '../../../ast/ast.js';
import {Entry} from '../../multiset.js';

// TODO: see comment on join. Maybe we can do better here with somem sort of sorted map.
export class Index<K extends Primitive, V extends Entity> {
readonly #index = new Map<K, Entry<V>[]>();

constructor() {}

add(key: K, value: Entry<V>) {
let existing = this.#index.get(key);
if (existing === undefined) {
existing = [];
this.#index.set(key, existing);
}
existing.push(value);
}

extend(index: Index<K, V>) {
for (const [key, value] of index.#index) {
for (const entry of value) {
this.add(key, entry);
}
}
}

get(key: K): Entry<V>[] {
return this.#index.get(key) ?? [];
}

compact(keys: K[] = []) {
function consolidateValues(values: Entry<V>[]): Entry<V>[] {
const consolidated = new Map<string, Entry<V>>();
for (const [value, multiplicity] of values) {
if (multiplicity === 0) {
continue;
}
const existing = consolidated.get(value.id);
if (existing === undefined) {
consolidated.set(value.id, [value, multiplicity]);
} else {
const sum = existing[1] + multiplicity;
if (sum === 0) {
consolidated.delete(value.id);
} else {
consolidated.set(value.id, [value, sum]);
}
}
}

return [...consolidated.values()];
}

// spread `keys` b/c if we do not then when we add below the iterator will continue.
const iterableKeys = keys.length !== 0 ? keys : [...this.#index.keys()];
for (const key of iterableKeys) {
const entries = this.#index.get(key);
if (entries === undefined) {
continue;
}
this.#index.delete(key);
const consolidated = consolidateValues(entries);
if (consolidated.length !== 0) {
this.#index.set(key, consolidated);
}
}
}
}
178 changes: 178 additions & 0 deletions src/zql/ivm/graph/operators/reduce-operator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import {expect, test} from 'vitest';
import {DifferenceStreamWriter} from '../difference-stream-writer.js';
import {ReduceOperator} from './reduce-operator.js';
import {Multiset} from '../../multiset.js';
import {NoOp} from './operator.js';

type Thing = {
id: string;
a: number;
b: string;
};

type Reduction = {
id: string;
sum: number;
};

test('collects all things with the same key', () => {
const inputWriter = new DifferenceStreamWriter<Thing>();
const inputReader = inputWriter.newReader();
const output = new DifferenceStreamWriter<Reduction>();

function getKey(t: Thing) {
return t.b;
}
function getValueIdentity(t: Thing) {
return t.id;
}

new ReduceOperator(
inputReader,
output,
getValueIdentity,
getKey,
(group: Iterable<Thing>) => {
let sum = 0;
let id = '';
for (const item of group) {
id = item.b;
sum += item.a;
}

return {
id,
sum,
};
},
);

const outReader = output.newReader();
outReader.setOperator(new NoOp());

inputWriter.queueData([
1,
new Multiset([
[
{
id: 'a',
a: 1,
b: 'x',
},
1,
],
[
{
id: 'b',
a: 2,
b: 'x',
},
2,
],
]),
]);
check([[{id: 'x', sum: 5}, 1]]);

// retract an item
inputWriter.queueData([
1,
new Multiset([
[
{
id: 'a',
a: 1,
b: 'x',
},
-1,
],
]),
]);
check([
[{id: 'x', sum: 5}, -1],
[{id: 'x', sum: 4}, 1],
]);

// fully retract items that constitue a grouping
inputWriter.queueData([
1,
new Multiset([
[
{
id: 'b',
a: 2,
b: 'x',
},
-2,
],
]),
]);
check([[{id: 'x', sum: 4}, -1]]);

// add more entries
inputWriter.queueData([
1,
new Multiset([
[
{
id: 'a',
a: 1,
b: 'c',
},
1,
],
]),
]);
check([[{id: 'c', sum: 1}, 1]]);
inputWriter.queueData([
1,
new Multiset([
[
{
id: 'b',
a: 2,
b: 'c',
},
1,
],
]),
]);
check([
[{id: 'c', sum: 1}, -1],
[{id: 'c', sum: 3}, 1],
]);

inputWriter.queueData([
1,
new Multiset([
[
{
id: 'a',
a: 1,
b: 'c',
},
-1,
],
[
{
id: 'a',
a: 2,
b: 'c',
},
1,
],
]),
]);
check([
[{id: 'c', sum: 3}, -1],
[{id: 'c', sum: 4}, 1],
]);

function check(expected: [Reduction, number][]) {
inputWriter.notify(1);
inputWriter.notifyCommitted(1);
const items = outReader.drain(1);
expect(items.length).toBe(1);
const entry = items[0];
expect([...entry[1].entries]).toEqual(expected);
}
});
Loading

0 comments on commit 58699cf

Please sign in to comment.