Skip to content

Commit

Permalink
don't destructure in hot path, check multiset contents, remove extran…
Browse files Browse the repository at this point in the history
…eous code
  • Loading branch information
tantaman committed Apr 2, 2024
1 parent 62c0c08 commit 768ac01
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 7 deletions.
1 change: 0 additions & 1 deletion src/zql/ivm/graph/difference-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ test('replying to a message only notifies along the requesting path', () => {
const s3 = stream.debug(() => notified.push(3));

s1.debug(() => notified.push(4));
// s2.debug
const x = new DifferenceStream();
const s2Dbg = new DebugOperator(s2, x, () => notified.push(5));
x.setUpstream(s2Dbg);
Expand Down
13 changes: 12 additions & 1 deletion src/zql/ivm/graph/difference-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,17 @@ export type Listener<T> = {
*/
// T extends object: I believe in the context of ZQL we only deal with object.
export class DifferenceStream<T extends object> {
/**
* Operators that are listening to this stream.
*/
readonly #downstreams: Set<Listener<T>> = new Set();
/**
* The operator that is sending data to this stream.
*/
#upstream: Operator | undefined;
/**
* Downstreams that requested historical data.
*/
readonly #requestors = new Set<Listener<T>>();

addDownstream(listener: Listener<T>) {
Expand Down Expand Up @@ -82,11 +91,13 @@ export class DifferenceStream<T extends object> {

commit(version: Version) {
if (this.#requestors.size > 0) {
this.#requestors;
for (const requestor of this.#requestors) {
try {
requestor.commit(version);
} catch (e) {
// `commit` notifies client code
// If client code throws we'll put IVM back into a consistent state
// by clearing the requestors.
this.#requestors.clear();
throw e;
}
Expand Down
8 changes: 7 additions & 1 deletion src/zql/ivm/graph/operators/filter-operator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ test('test that filter is lazy / the filter is not actually run until we pull on

// consume all the rows
for (const m of msgs) {
[...m];
expect([...m]).toEqual([
[{id: 1}, 1],
[{id: 2}, 2],
[{id: 1}, -1],
[{id: 2}, -2],
]);
}

expect(called).toBe(true);
});
4 changes: 1 addition & 3 deletions src/zql/ivm/graph/operators/filter-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ export class FilterOperator<I extends object> extends LinearUnaryOperator<
output: DifferenceStream<I>,
f: (input: I) => boolean,
) {
super(input, output, (data: Multiset<I>) =>
genFilter(data, ([value, _]) => f(value)),
);
super(input, output, (data: Multiset<I>) => genFilter(data, e => f(e[0])));
}
}
2 changes: 1 addition & 1 deletion src/zql/ivm/graph/operators/operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class NoOp implements Operator {
constructor() {}
commit(_v: Version): void {}
messageUpstream(): void {}
destroy() {}
destroy(): void {}
}

/**
Expand Down

0 comments on commit 768ac01

Please sign in to comment.