Skip to content

Commit

Permalink
Operator for join (#57)
Browse files Browse the repository at this point in the history
* add an inner join operator

* flatten results of previous joins

* named arguments for `JoinOperator`

* remove goofy string 'undefined'

* fix type errors

* fixup binary operator to account for new interface without reader and writers

* add `join` to `DifferenceStream`

* only compact keys seen in the difference

* remove `pending` from `join`

* fix tests to correctly reset expected items

* difference index tests

* remove unused test case

* fix type errors from incorrect git merge

* address review comments

* send many rows at the same version

* stable ids, do smallest collection in the outer loop
  • Loading branch information
tantaman authored Apr 9, 2024
1 parent 46d3d6b commit 12607f1
Show file tree
Hide file tree
Showing 21 changed files with 1,310 additions and 75 deletions.
18 changes: 9 additions & 9 deletions src/zql/ivm/graph/difference-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ test('map', () => {
expect(x).toEqual({x: 4});
});

s.newData(1, [
s.newDifference(1, [
[{x: 2}, 1],
[{x: 2}, 1],
[{x: 2}, 1],
Expand All @@ -33,7 +33,7 @@ test('filter', () => {
expect(x).toEqual({x: 2});
});

s.newData(1, [
s.newDifference(1, [
[{x: 1}, 1],
[{x: 2}, 1],
[{x: 3}, 1],
Expand All @@ -54,7 +54,7 @@ test('count', () => {
}
});

s.newData(1, [
s.newDifference(1, [
[{x: 1}, 1],
[{x: 2}, 1],
[{x: 3}, 1],
Expand All @@ -63,7 +63,7 @@ test('count', () => {

expect(expectRan).toBe(1);

s.newData(2, [
s.newDifference(2, [
[{x: 1}, 1],
[{x: 2}, 1],
[{x: 3}, 1],
Expand Down Expand Up @@ -91,12 +91,12 @@ test('map, filter, linearCount', () => {
}
});

s.newData(1, [[{x: 1}, 1]]);
s.newDifference(1, [[{x: 1}, 1]]);
s.commit(1);

expect(expectRan).toBe(1);

s.newData(2, [
s.newDifference(2, [
[{x: 1}, 1],
[{x: 2}, 1],
]);
Expand Down Expand Up @@ -160,7 +160,7 @@ test('adding data runs the operator', () => {
ran = true;
});
expect(ran).toBe(false);
stream.newData(1, []);
stream.newDifference(1, []);
expect(ran).toBe(true);
});

Expand All @@ -170,7 +170,7 @@ test('commit notifies the operator', () => {
stream.effect(() => {
ran = true;
});
stream.newData(1, [[{}, 1]]);
stream.newDifference(1, [[{}, 1]]);
expect(ran).toBe(false);
stream.commit(1);
expect(ran).toBe(true);
Expand Down Expand Up @@ -205,7 +205,7 @@ test('replying to a message only notifies along the requesting path', () => {

expect(notified).toEqual([]);

stream.newData(1, [], createPullResponseMessage(msg));
stream.newDifference(1, [], createPullResponseMessage(msg));

expect(notified).toEqual([2, 5]);
});
33 changes: 31 additions & 2 deletions src/zql/ivm/graph/difference-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ import {Entity} from '../../../generate.js';
import {Primitive} from '../../ast/ast.js';
import {invariant} from '../../error/asserts.js';
import {Multiset} from '../multiset.js';
import {Version} from '../types.js';
import {Reply, Request} from './message.js';
import {ConcatOperator} from './operators/concat-operator.js';
import {DebugOperator} from './operators/debug-operator.js';
import {DifferenceEffectOperator} from './operators/difference-effect-operator.js';
import {DistinctOperator} from './operators/distinct-operator.js';
import {FilterOperator} from './operators/filter-operator.js';
import {JoinResult, Version} from '../types.js';
import {
AggregateOut,
FullAvgOperator,
Expand All @@ -18,6 +18,7 @@ import {
import {MapOperator} from './operators/map-operator.js';
import {Operator} from './operators/operator.js';
import {ReduceOperator} from './operators/reduce-operator.js';
import {InnerJoinOperator, JoinArgs} from './operators/join-operator.js';

export type Listener<T> = {
newDifference: (
Expand Down Expand Up @@ -75,7 +76,11 @@ export class DifferenceStream<T extends object> {
return this;
}

newData(version: Version, data: Multiset<T>, reply?: Reply | undefined) {
newDifference(
version: Version,
data: Multiset<T>,
reply?: Reply | undefined,
) {
if (reply) {
for (const requestor of this.#requestors) {
requestor.newDifference(version, data, reply);
Expand Down Expand Up @@ -148,6 +153,26 @@ export class DifferenceStream<T extends object> {
);
}

join<
Key extends Primitive,
BValue extends object,
AAlias extends string | undefined,
BAlias extends string | undefined,
>(
args: Omit<JoinArgs<Key, T, BValue, AAlias, BAlias>, 'a' | 'output'>,
): DifferenceStream<JoinResult<T, BValue, AAlias, BAlias>> {
const stream = new DifferenceStream<
JoinResult<T, BValue, AAlias, BAlias>
>();
return stream.setUpstream(
new InnerJoinOperator({
a: this,
output: stream,
...args,
}),
);
}

/**
* This differs from count in that `size` just counts the entire
* stream whereas `count` counts the number of times each key appears.
Expand Down Expand Up @@ -203,6 +228,10 @@ export class DifferenceStream<T extends object> {
this.destroy();
}
}

toString() {
return this.#upstream?.toString() ?? 'DifferenceStream';
}
}

export function concat<T extends object>(
Expand Down
59 changes: 59 additions & 0 deletions src/zql/ivm/graph/operators/binary-operator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import {Multiset} from '../../multiset.js';
import {Version} from '../../types.js';
import {DifferenceStream, Listener} from '../difference-stream.js';
import {Request} from '../message.js';
import {OperatorBase} from './operator.js';

export class BinaryOperator<
I1 extends object,
I2 extends object,
O extends object,
> extends OperatorBase<O> {
readonly #listener1: Listener<I1>;
readonly #input1: DifferenceStream<I1>;
readonly #listener2: Listener<I2>;
readonly #input2: DifferenceStream<I2>;

constructor(
input1: DifferenceStream<I1>,
input2: DifferenceStream<I2>,
output: DifferenceStream<O>,
fn: (
v: Version,
inputA: Multiset<I1> | undefined,
inputB: Multiset<I2> | undefined,
) => Multiset<O>,
) {
super(output);
this.#listener1 = {
newDifference: (version, data) => {
output.newDifference(version, fn(version, data, undefined));
},
commit: version => {
this.commit(version);
},
};
this.#listener2 = {
newDifference: (version, data) => {
output.newDifference(version, fn(version, undefined, data));
},
commit: version => {
this.commit(version);
},
};
input1.addDownstream(this.#listener1);
input2.addDownstream(this.#listener2);
this.#input1 = input1;
this.#input2 = input2;
}

messageUpstream(message: Request): void {
this.#input1.messageUpstream(message, this.#listener1);
this.#input2.messageUpstream(message, this.#listener2);
}

destroy() {
this.#input1.removeDownstream(this.#listener1);
this.#input2.removeDownstream(this.#listener2);
}
}
10 changes: 5 additions & 5 deletions src/zql/ivm/graph/operators/concat-operator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ test('All branches notify', () => {
items.push(d);
});

inputs[0].newData(version, [
inputs[0].newDifference(version, [
[{x: 1}, 1],
[{x: 2}, 2],
]);
Expand All @@ -35,9 +35,9 @@ test('All branches notify', () => {
items.length = 0;
version++;

inputs[0].newData(version, [[{x: 0}, 1]]);
inputs[1].newData(version, [[{x: 1}, 1]]);
inputs[2].newData(version, [[{x: 2}, 2]]);
inputs[0].newDifference(version, [[{x: 0}, 1]]);
inputs[1].newDifference(version, [[{x: 1}, 1]]);
inputs[2].newDifference(version, [[{x: 2}, 2]]);
inputs[0].commit(version);
inputs[1].commit(version);
inputs[2].commit(version);
Expand All @@ -58,7 +58,7 @@ test('Test with single input', () => {
items.push(d);
});

input.newData(version, [
input.newDifference(version, [
[{x: 1}, 1],
[{x: 2}, 2],
]);
Expand Down
2 changes: 1 addition & 1 deletion src/zql/ivm/graph/operators/concat-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class ConcatOperator<T extends object> implements Operator {
this.#output = output;
this.#listener = {
newDifference: (version, data) => {
output.newData(version, data);
output.newDifference(version, data);
},
commit: version => {
this.commit(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ test('calls effect with raw difference events', () => {
mult = m;
});

input.newData(1, [[{x: 1}, 1]]);
input.newDifference(1, [[{x: 1}, 1]]);

// effect not run until commit
expect(called).toBe(false);
Expand All @@ -29,7 +29,7 @@ test('calls effect with raw difference events', () => {
called = false;
value = 0;
mult = 0;
input.newData(2, [[{x: 1}, -1]]);
input.newDifference(2, [[{x: 1}, -1]]);

// effect not run until commit
expect(called).toBe(false);
Expand Down
Loading

0 comments on commit 12607f1

Please sign in to comment.