Skip to content

Commit

Permalink
test group-by
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Mar 25, 2024
1 parent c936b08 commit ca63558
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 70 deletions.
66 changes: 44 additions & 22 deletions src/zql/ast-to-ivm/pipeline-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
Operator,
Ordering,
} from '../ast/ast.js';
import {assert, must} from '../error/asserts.js';
import {must} from '../error/asserts.js';
import {DifferenceStream} from '../ivm/graph/difference-stream.js';

export const orderingProp = Symbol();
Expand All @@ -31,14 +31,23 @@ export function buildPipeline(
let ret: DifferenceStream<unknown> = stream;
if (ast.groupBy) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ret = applyGroupBy(ret as DifferenceStream<Entity>, ast.groupBy);
ret = applyGroupBy(
ret as DifferenceStream<Entity>,
ast.groupBy,
ast.aggregate ?? [],
Array.isArray(ast.select) ? ast.select : [],
ast.orderBy,
);
}

assert(ast.select, 'No select clause');
if (ast.select === 'count') {
ret = ret.linearCount();
} else {
ret = applySelect(ret as DifferenceStream<Entity>, ast.select, ast.orderBy);
} else if (ast.groupBy === undefined) {
ret = applySelect(
ret as DifferenceStream<Entity>,
ast.select ?? [],
ast.orderBy,
);
}

// Note: the stream is technically attached at this point.
Expand All @@ -62,24 +71,32 @@ export function applySelect(
}
}

const orderingValues: unknown[] = [];
if (orderBy !== undefined) {
for (const field of orderBy[0]) {
orderingValues.push((x as Record<string, unknown>)[field]);
}
}

Object.defineProperty(ret, orderingProp, {
enumerable: false,
writable: false,
configurable: false,
value: orderingValues,
});
addOrdering(ret, x, orderBy);

return ret;
});
}

function addOrdering(
ret: Record<string, unknown>,
row: Record<string, unknown>,
orderBy: Ordering | undefined,
) {
const orderingValues: unknown[] = [];
if (orderBy !== undefined) {
for (const field of orderBy[0]) {
orderingValues.push(row[field]);
}
}

Object.defineProperty(ret, orderingProp, {
enumerable: false,
writable: false,
configurable: false,
value: orderingValues,
});
}

function applyWhere(stream: DifferenceStream<Entity>, where: ConditionList) {
let ret = stream;
// We'll handle `OR` and parentheticals like so:
Expand Down Expand Up @@ -124,16 +141,21 @@ function applyCondition(
function applyGroupBy<T extends Entity>(
stream: DifferenceStream<T>,
columns: string[],
aggregations: Aggregation[] = [],
aggregations: Aggregation[],
select: string[],
orderBy: Ordering | undefined,
) {
const keyFunction = makeKeyFunction(columns);
return stream.reduce(
keyFunction,
value => value.id as string,
values => {
const ret: Entity & Record<string, unknown> = {
id: keyFunction(values[Symbol.iterator]().next().value),
};
const first = values[Symbol.iterator]().next().value;
const ret: Record<string, unknown> = {};
for (const column of select) {
ret[column] = first[column];
}
addOrdering(ret, first, orderBy);

for (const aggregation of aggregations) {
switch (aggregation.aggregate) {
Expand Down
104 changes: 72 additions & 32 deletions src/zql/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,38 +396,78 @@ test('join', () => {});
test('having', () => {});

test('group by', async () => {
// const {q, r} = setup();
// const issues: Issue[] = [
// {
// id: 'a',
// title: 'foo',
// status: 'open',
// priority: 'high',
// assignee: 'charles',
// created: new Date(),
// updated: new Date(),
// },
// {
// id: 'b',
// title: 'bar',
// status: 'open',
// priority: 'medium',
// assignee: 'bob',
// created: new Date(),
// updated: new Date(),
// },
// {
// id: 'c',
// title: 'baz',
// status: 'closed',
// priority: 'low',
// assignee: 'alice',
// created: new Date(),
// updated: new Date(),
// },
// ] as const;
// await Promise.all(issues.map(r.mutate.initIssue));
// const stmt = q.groupBy('status').count('id').prepare();
const {q, r} = setup();
const issues: Issue[] = [
{
id: 'a',
title: 'foo',
status: 'open',
priority: 'high',
assignee: 'charles',
created: new Date('2024-01-01'),
updated: new Date(),
},
{
id: 'b',
title: 'bar',
status: 'open',
priority: 'medium',
assignee: 'bob',
created: new Date('2024-01-02'),
updated: new Date(),
},
{
id: 'c',
title: 'baz',
status: 'closed',
priority: 'low',
assignee: 'alice',
created: new Date('2024-01-03'),
updated: new Date(),
},
] as const;
await Promise.all(issues.map(r.mutate.initIssue));
await new Promise(resolve => setTimeout(resolve, 0));
let stmt = q.select('status').groupBy('status').count().prepare();
let rows = await stmt.exec();

expect(rows).toEqual([
{status: 'open', count: 2},
{status: 'closed', count: 1},
]);

stmt.destroy();

stmt = q.select('status').groupBy('status').array('assignee').prepare();
rows = await stmt.exec();

expect(rows).toEqual([
{status: 'open', assignee: ['charles', 'bob']},
{status: 'closed', assignee: ['alice']},
]);

stmt = q
.select('status')
.groupBy('status')
.array('assignee')
.min('created')
.prepare();
rows = await stmt.exec();

expect(rows).toEqual([
{
status: 'open',
assignee: ['charles', 'bob'],
created: issues[0].created.getTime(),
},
{
status: 'closed',
assignee: ['alice'],
created: issues[2].created.getTime(),
},
]);

await r.close();
});

test('compound where', async () => {
Expand Down
3 changes: 1 addition & 2 deletions src/zql/ivm/graph/difference-stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {Entity} from '../../../generate.js';
import {Primitive} from '../../ast/ast.js';
import {Multiset} from '../multiset.js';
import {Source} from '../source/source.js';
Expand Down Expand Up @@ -51,7 +50,7 @@ export class DifferenceStream<T> implements IDifferenceStream<T> {
return ret;
}

reduce<K extends Primitive, O extends Entity>(
reduce<K extends Primitive, O>(
getKey: (value: T) => K,
getIdentity: (value: T) => string,
f: (input: Iterable<T>) => O,
Expand Down
13 changes: 8 additions & 5 deletions src/zql/ivm/graph/operators/reduce-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ export class ReduceOperator<
}

const reduction = f(
flatMapIter(dataIn.values(), function* ([v, mult]) {
for (let i = 0; i < mult; i++) {
yield v;
}
}),
flatMapIter(
() => dataIn.values(),
function* ([v, mult]) {
for (let i = 0; i < mult; i++) {
yield v;
}
},
),
);
if (existingOut !== undefined) {
// modified reduction
Expand Down
7 changes: 5 additions & 2 deletions src/zql/query/entity-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,17 @@ export class EntityGroupQuery<S extends EntitySchema, Return = []> {
return this.#ast;
}

count<K extends keyof S['fields']>(field: K, alias?: string | undefined) {
count<K extends keyof S['fields']>(
field?: K | undefined,
alias?: string | undefined,
) {
return new EntityGroupQuery<S, number>(this.#context, this.#name, {
...this.#ast,
aggregate: [
...(this.#ast.aggregate || []),
{
field: field as string,
alias: alias ?? (field as string),
alias: alias ?? 'count' + (field ? `_${field as string}` : ''),
aggregate: 'count',
},
],
Expand Down
18 changes: 11 additions & 7 deletions src/zql/util/iterables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ export function* mapIter<T, U>(
}
}

export function* flatMapIter<T, U>(
iter: Iterable<T>,
export function flatMapIter<T, U>(
iter: () => Iterable<T>,
f: (t: T, index: number) => Iterable<U>,
): Iterable<U> {
let index = 0;
for (const t of iter) {
yield* f(t, index++);
}
) {
return {
*[Symbol.iterator]() {
let index = 0;
for (const t of iter()) {
yield* f(t, index++);
}
},
};
}

0 comments on commit ca63558

Please sign in to comment.