Skip to content

Commit

Permalink
make upstream/downstream names explicit, remove awkward queue handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Mar 15, 2024
1 parent 22a0af1 commit 2a7cf20
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 89 deletions.
51 changes: 12 additions & 39 deletions src/zql/ivm/graph/difference-stream-reader.test.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,24 @@
import {expect, test} from 'vitest';
import {DifferenceStreamReader} from './difference-stream-reader.js';
import {Queue} from './queue.js';
import {NoOp} from './operators/operator.js';
import {InvariantViolation} from '../../error/asserts.js';
import {Multiset} from '../multiset.js';
import {DifferenceStreamWriter} from './difference-stream-writer.js';

test('cannot set two operators', () => {
const r = new DifferenceStreamReader(
new DifferenceStreamWriter(),
new Queue(),
);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());
r.setOperator(new NoOp());
expect(() => r.setOperator(new NoOp())).toThrow(InvariantViolation);
});

test('calling notify without calling run throws', () => {
const r = new DifferenceStreamReader(
new DifferenceStreamWriter(),
new Queue(),
);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());
r.setOperator(new NoOp());
expect(() => r.notify(1)).toThrow(InvariantViolation);
});

test('calling notify with a mismatched version throws', () => {
const r = new DifferenceStreamReader(
new DifferenceStreamWriter(),
new Queue(),
);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());
r.setOperator(new NoOp());
r.run(1);
expect(() => r.notify(2)).toThrow(InvariantViolation);
Expand All @@ -44,10 +34,7 @@ test('run runs the operator', () => {
notifyCommitted() {},
destroy() {},
};
const r = new DifferenceStreamReader(
new DifferenceStreamWriter(),
new Queue(),
);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());
r.setOperator(op);
expect(ran).toBe(false);
r.run(1);
Expand All @@ -64,10 +51,7 @@ test('notifyCommitted passes along to the operator', () => {
},
destroy() {},
};
const r = new DifferenceStreamReader(
new DifferenceStreamWriter(),
new Queue(),
);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());
r.setOperator(op);
r.run(1);
r.notify(1);
Expand All @@ -77,18 +61,12 @@ test('notifyCommitted passes along to the operator', () => {
});

test('notify throws if the operator is missing', () => {
const r = new DifferenceStreamReader(
new DifferenceStreamWriter(),
new Queue(),
);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());
expect(() => r.notify(1)).toThrow(InvariantViolation);
});

test('notifyCommited throws if the operator is missing', () => {
const r = new DifferenceStreamReader(
new DifferenceStreamWriter(),
new Queue(),
);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());
try {
r.run(1);
r.notify(1);
Expand All @@ -109,10 +87,7 @@ test('notifyCommitted does not notify on version mismatch', () => {
},
destroy() {},
};
const r = new DifferenceStreamReader(
new DifferenceStreamWriter(),
new Queue(),
);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());
r.setOperator(op);
r.run(1);
r.notify(1);
Expand All @@ -122,8 +97,7 @@ test('notifyCommitted does not notify on version mismatch', () => {
});

test('drain', () => {
const q = new Queue();
const r = new DifferenceStreamReader(new DifferenceStreamWriter(), q);
const r = new DifferenceStreamReader(new DifferenceStreamWriter());

// draining empty is not an error
expect(r.drain(1)).toEqual([]);
Expand All @@ -132,13 +106,12 @@ test('drain', () => {
const s1 = new Multiset([[1, 1]]);
const s2 = new Multiset([[2, 1]]);
const s3 = new Multiset([[3, 1]]);
q.enqueue([1, s1]);
q.enqueue([2, s2]);
q.enqueue([3, s3]);
r.enqueue([1, s1]);
r.enqueue([2, s2]);
r.enqueue([3, s3]);
expect(r.drain(2)).toEqual([s1, s2]);

// drain leaves the queue empty if we're draining all versions in it
expect(r.drain(3)).toEqual([s3]);
expect(q.isEmpty()).toBe(true);
expect(r.isEmpty()).toBe(true);
});
31 changes: 18 additions & 13 deletions src/zql/ivm/graph/difference-stream-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,32 @@ import {DifferenceStreamWriter} from './difference-stream-writer.js';
*/
export class DifferenceStreamReader<T = unknown> {
readonly #queue;
// upstream writer
readonly #upstream;
// downstream operator
#operator: IOperator | null = null;
readonly #upstreamWriter;
#downstreamOperator: IOperator | null = null;
#lastSeenVersion: Version = -1;

constructor(upstream: DifferenceStreamWriter<T>, queue: Queue<T>) {
this.#queue = queue;
this.#upstream = upstream;
constructor(upstream: DifferenceStreamWriter<T>) {
this.#queue = new Queue<T>();
this.#upstreamWriter = upstream;
}

setOperator(operator: IOperator) {
invariant(this.#operator === null, 'Operator already set!');
this.#operator = operator;
invariant(this.#downstreamOperator === null, 'Operator already set!');
this.#downstreamOperator = operator;
}

enqueue(data: [Version, Multiset<T>]) {
this.#queue.enqueue(data);
}

run(v: Version) {
this.#lastSeenVersion = v;
must(this.#operator, 'reader is missing operator').run(v);
must(this.#downstreamOperator, 'reader is missing operator').run(v);
}

notify(v: Version) {
invariant(v === this.#lastSeenVersion, 'notify called out of order');
must(this.#operator, 'reader is missing operator').notify(v);
must(this.#downstreamOperator, 'reader is missing operator').notify(v);
}

notifyCommitted(v: Version) {
Expand All @@ -53,7 +55,10 @@ export class DifferenceStreamReader<T = unknown> {
if (v !== this.#lastSeenVersion) {
return;
}
must(this.#operator, 'reader is missing operator').notifyCommitted(v);
must(
this.#downstreamOperator,
'reader is missing operator',
).notifyCommitted(v);
}

drain(version: Version) {
Expand All @@ -77,7 +82,7 @@ export class DifferenceStreamReader<T = unknown> {
}

destroy() {
this.#upstream.removeReaderAndMaybeDestroy(this);
this.#upstreamWriter.removeReaderAndMaybeDestroy(this);
this.#queue.clear();
}
}
49 changes: 23 additions & 26 deletions src/zql/ivm/graph/difference-stream-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {Multiset} from '../multiset.js';
import {Version} from '../types.js';
import {DifferenceStreamReader} from './difference-stream-reader.js';
import {IOperator} from './operators/operator.js';
import {Queue} from './queue.js';

/**
* Represents the output of an Operator.
Expand All @@ -24,15 +23,12 @@ import {Queue} from './queue.js';
* r = reader
*/
export class DifferenceStreamWriter<T> {
readonly queues: Queue<T>[] = [];
// downstream readers
readonly readers: DifferenceStreamReader<T>[] = [];
// upstream operator
#operator: IOperator | null = null;
#upstreamOperator: IOperator | null = null;
readonly downstreamReaders: DifferenceStreamReader<T>[] = [];

setOperator(operator: IOperator) {
invariant(this.#operator === null, 'Operator already set!');
this.#operator = operator;
invariant(this.#upstreamOperator === null, 'Operator already set!');
this.#upstreamOperator = operator;
}

/**
Expand All @@ -41,19 +37,23 @@ export class DifferenceStreamWriter<T> {
* Used so we can batch a set of mutations together before running a pipeline.
*/
queueData(data: [Version, Multiset<T>]) {
for (const q of this.queues) {
q.enqueue(data);
for (const r of this.downstreamReaders) {
r.enqueue(data);
}
}

/**
* Notifies readers. Called during transaction commit.
*/
notify(version: Version) {
for (const r of this.readers) {
// Tell downstreams to run their operators
for (const r of this.downstreamReaders) {
r.run(version);
}
for (const r of this.readers) {
// After all operators have been run we can tell them
// to notify along their output edges which will
// cause the next level of writers & operators to run and notify.
for (const r of this.downstreamReaders) {
r.notify(version);
}
}
Expand All @@ -63,28 +63,26 @@ export class DifferenceStreamWriter<T> {
* has completed. Called immediately after transaction commit.
*/
notifyCommitted(v: Version) {
for (const r of this.readers) {
for (const r of this.downstreamReaders) {
r.notifyCommitted(v);
}
}

/**
* Forks a new reader off of this writer.
* Values sent to the writer will be copied off to this new reader.
* Values sent to the writer will be fanned out
* to this new reader.
*/
newReader(): DifferenceStreamReader<T> {
const queue = new Queue<T>();
this.queues.push(queue);
const reader = new DifferenceStreamReader(this, queue);
this.readers.push(reader);
const reader = new DifferenceStreamReader(this);
this.downstreamReaders.push(reader);
return reader;
}

removeReader(reader: DifferenceStreamReader<T>) {
const idx = this.readers.indexOf(reader);
const idx = this.downstreamReaders.indexOf(reader);
assert(idx !== -1, 'Reader not found');
this.readers.splice(idx, 1);
this.queues.splice(idx, 1);
this.downstreamReaders.splice(idx, 1);
}

/**
Expand All @@ -97,15 +95,14 @@ export class DifferenceStreamWriter<T> {
*/
removeReaderAndMaybeDestroy(reader: DifferenceStreamReader<T>) {
this.removeReader(reader);
if (this.readers.length === 0) {
if (this.downstreamReaders.length === 0) {
this.destroy();
}
}

destroy() {
this.readers.length = 0;
// writers will not have a downstream operator
// if they are the leaf node in the graph
this.#operator?.destroy();
this.downstreamReaders.length = 0;
// The root differnce stream will not have an upstream operator
this.#upstreamOperator?.destroy();
}
}
41 changes: 30 additions & 11 deletions src/zql/ivm/graph/difference-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,31 @@ import {MapOperator} from './operators/map-operator.js';
* Encapsulates all the details of wiring together operators, readers, and writers.
*/
export class DifferenceStream<T> implements IDifferenceStream<T> {
readonly #writer = new DifferenceStreamWriter<T>();
readonly #upstreamWriter = new DifferenceStreamWriter<T>();

newStream<X>(): DifferenceStream<X> {
return new DifferenceStream();
}

map<O>(f: (value: T) => O): DifferenceStream<O> {
const ret = this.newStream<O>();
new MapOperator<T, O>(this.#writer.newReader(), ret.#writer, f);
new MapOperator<T, O>(
this.#upstreamWriter.newReader(),
ret.#upstreamWriter,
f,
);
return ret;
}

filter<S extends T>(f: (x: T) => x is S): DifferenceStream<S>;
filter(f: (x: T) => boolean): DifferenceStream<T>;
filter<S extends T>(f: (x: T) => boolean): DifferenceStream<S> {
const ret = this.newStream<S>();
new FilterOperator<T>(this.#writer.newReader(), ret.#writer, f);
new FilterOperator<T>(
this.#upstreamWriter.newReader(),
ret.#upstreamWriter,
f,
);
return ret;
}

Expand All @@ -42,7 +50,10 @@ export class DifferenceStream<T> implements IDifferenceStream<T> {
*/
linearCount() {
const ret = this.newStream<number>();
new LinearCountOperator(this.#writer.newReader(), ret.#writer);
new LinearCountOperator(
this.#upstreamWriter.newReader(),
ret.#upstreamWriter,
);
return ret;
}

Expand All @@ -54,33 +65,41 @@ export class DifferenceStream<T> implements IDifferenceStream<T> {
*/
effect(f: (i: T, mult: number) => void) {
const ret = this.newStream<T>();
new DifferenceEffectOperator(this.#writer.newReader(), ret.#writer, f);
new DifferenceEffectOperator(
this.#upstreamWriter.newReader(),
ret.#upstreamWriter,
f,
);
return ret;
}

debug(onMessage: (c: Multiset<T>) => void) {
const ret = this.newStream<T>();
new DebugOperator(this.#writer.newReader(), ret.#writer, onMessage);
new DebugOperator(
this.#upstreamWriter.newReader(),
ret.#upstreamWriter,
onMessage,
);
return ret;
}

queueData(data: [Version, Multiset<T>]) {
this.#writer.queueData(data);
this.#upstreamWriter.queueData(data);
}

notify(v: Version) {
this.#writer.notify(v);
this.#upstreamWriter.notify(v);
}

notifyCommitted(v: Version) {
this.#writer.notifyCommitted(v);
this.#upstreamWriter.notifyCommitted(v);
}

newReader() {
return this.#writer.newReader();
return this.#upstreamWriter.newReader();
}

destroy() {
this.#writer.destroy();
this.#upstreamWriter.destroy();
}
}

0 comments on commit 2a7cf20

Please sign in to comment.