Skip to content

Commit

Permalink
nits while stepping through tests
Browse files Browse the repository at this point in the history
  • Loading branch information
arv committed Mar 22, 2024
1 parent 9948e36 commit 7c429e5
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 40 deletions.
9 changes: 4 additions & 5 deletions src/zql/ivm/graph/difference-stream-reader.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {invariant, must} from '../../error/asserts.js';
import {Multiset} from '../multiset.js';
import {Version} from '../types.js';
import {Queue, QueueEntry} from './queue.js';
import {DifferenceStreamWriter} from './difference-stream-writer.js';
import {Operator} from './operators/operator.js';
import {Request} from './message.js';
import {Multiset} from '../multiset.js';
import {Operator} from './operators/operator.js';
import {Queue, QueueEntry} from './queue.js';

/**
* Represents the input to an operator.
Expand All @@ -21,13 +21,12 @@ import {Multiset} from '../multiset.js';
* o o o
*/
export class DifferenceStreamReader<T = unknown> {
protected readonly _queue;
protected readonly _queue = new Queue<T>();
readonly #upstreamWriter;
#downstreamOperator: Operator | null = null;
#lastSeenVersion: Version = -1;

constructor(upstream: DifferenceStreamWriter<T>) {
this._queue = new Queue<T>();
this.#upstreamWriter = upstream;
}

Expand Down
52 changes: 26 additions & 26 deletions src/zql/ivm/graph/operators/reduce-operator.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {expect, test} from 'vitest';
import {DifferenceStreamWriter} from '../difference-stream-writer.js';
import {ReduceOperator} from './reduce-operator.js';
import {Multiset} from '../../multiset.js';
import {DifferenceStreamWriter} from '../difference-stream-writer.js';
import {NoOp} from './operator.js';
import {ReduceOperator} from './reduce-operator.js';

type Thing = {
id: string;
a: number;
b: string;
value: number;
groupKey: string;
};

type Reduction = {
Expand All @@ -20,8 +20,8 @@ test('collects all things with the same key', () => {
const inputReader = inputWriter.newReader();
const output = new DifferenceStreamWriter<Reduction>();

function getKey(t: Thing) {
return t.b;
function getGroupKey(t: Thing) {
return t.groupKey;
}
function getValueIdentity(t: Thing) {
return t.id;
Expand All @@ -31,13 +31,13 @@ test('collects all things with the same key', () => {
inputReader,
output,
getValueIdentity,
getKey,
getGroupKey,
(group: Iterable<Thing>) => {
let sum = 0;
let id = '';
for (const item of group) {
id = item.b;
sum += item.a;
id = item.groupKey;
sum += item.value;
}

return {
Expand All @@ -56,16 +56,16 @@ test('collects all things with the same key', () => {
[
{
id: 'a',
a: 1,
b: 'x',
value: 1,
groupKey: 'x',
},
1,
],
[
{
id: 'b',
a: 2,
b: 'x',
value: 2,
groupKey: 'x',
},
2,
],
Expand All @@ -80,8 +80,8 @@ test('collects all things with the same key', () => {
[
{
id: 'a',
a: 1,
b: 'x',
value: 1,
groupKey: 'x',
},
-1,
],
Expand All @@ -92,15 +92,15 @@ test('collects all things with the same key', () => {
[{id: 'x', sum: 4}, 1],
]);

// fully retract items that constitue a grouping
// fully retract items that constitute a grouping
inputWriter.queueData([
1,
new Multiset([
[
{
id: 'b',
a: 2,
b: 'x',
value: 2,
groupKey: 'x',
},
-2,
],
Expand All @@ -115,8 +115,8 @@ test('collects all things with the same key', () => {
[
{
id: 'a',
a: 1,
b: 'c',
value: 1,
groupKey: 'c',
},
1,
],
Expand All @@ -129,8 +129,8 @@ test('collects all things with the same key', () => {
[
{
id: 'b',
a: 2,
b: 'c',
value: 2,
groupKey: 'c',
},
1,
],
Expand All @@ -147,16 +147,16 @@ test('collects all things with the same key', () => {
[
{
id: 'a',
a: 1,
b: 'c',
value: 1,
groupKey: 'c',
},
-1,
],
[
{
id: 'a',
a: 2,
b: 'c',
value: 2,
groupKey: 'c',
},
1,
],
Expand Down
15 changes: 8 additions & 7 deletions src/zql/ivm/graph/operators/reduce-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ export class ReduceOperator<
f: (input: Iterable<V>) => O,
) {
const inner = (version: Version) => {
const keysToDo = new Set<K>();
const keysToProcess = new Set<K>();
const ret: Entry<O>[] = [];
for (const entry of this.inputMessages(version)) {
for (const [value, mult] of entry[1].entries) {
const key = getGroupKey(value);
keysToDo.add(key);
keysToProcess.add(key);
this.#addToIndex(key, value, mult);
}
}

for (const k of keysToDo) {
for (const k of keysToProcess) {
const dataIn = this.#inIndex.get(k);
const existingOut = this.#outIndex.get(k);
if (dataIn === undefined) {
Expand Down Expand Up @@ -100,9 +100,10 @@ export class ReduceOperator<
existing = new Map<string, [V, number]>();
this.#inIndex.set(key, existing);
}
const prev = existing.get(this.#getValueIdentity(value));
const valueIdentity = this.#getValueIdentity(value);
const prev = existing.get(valueIdentity);
if (prev === undefined) {
existing.set(this.#getValueIdentity(value), [value, mult]);
existing.set(valueIdentity, [value, mult]);
} else {
const [v, m] = prev;
const newMult = m + mult;
Expand All @@ -111,13 +112,13 @@ export class ReduceOperator<
'Should not end up with a negative multiplicity when tracking all events for an item',
);
if (newMult === 0) {
existing.delete(this.#getValueIdentity(value));
existing.delete(valueIdentity);
if (existing.size === 0) {
this.#inIndex.delete(key);
return undefined;
}
} else {
existing.set(this.#getValueIdentity(value), [v, newMult]);
existing.set(valueIdentity, [v, newMult]);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/zql/ivm/graph/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import {Version} from '../types.js';
import {Reply} from './message.js';

export type QueueEntry<T> =
| readonly [Version, Multiset<T>, Reply]
| readonly [Version, Multiset<T>];
| readonly [version: Version, multiset: Multiset<T>, reply: Reply]
| readonly [version: Version, multiset: Multiset<T>];

type Node<T> = {
data: QueueEntry<T>;
Expand Down

0 comments on commit 7c429e5

Please sign in to comment.