diff --git a/src/zql/ast-to-ivm/pipeline-builder.test.ts b/src/zql/ast-to-ivm/pipeline-builder.test.ts index e3fadcb..e93812c 100644 --- a/src/zql/ast-to-ivm/pipeline-builder.test.ts +++ b/src/zql/ast-to-ivm/pipeline-builder.test.ts @@ -15,10 +15,11 @@ const e1 = z.object({ type E1 = z.infer; const context = makeTestContext(); +const comparator = (l: E1, r: E1) => l.id.localeCompare(r.id); test('A simple select', () => { const q = new EntityQuery<{fields: E1}>(context, 'e1'); const m = new Materialite(); - let s = m.newStatelessSource(); + let s = m.newSetSource(comparator); let pipeline = buildPipeline( () => s.stream, ast(q.select('id', 'a', 'b', 'c', 'd')), @@ -38,7 +39,7 @@ test('A simple select', () => { s.add(expected[1]); expect(effectRunCount).toBe(2); - s = m.newStatelessSource(); + s = m.newSetSource(comparator); pipeline = buildPipeline(() => s.stream, ast(q.select('a', 'd'))); const expected2 = [ {a: 1, d: true}, @@ -57,7 +58,7 @@ test('A simple select', () => { test('Count', () => { const q = new EntityQuery<{fields: E1}>(context, 'e1'); const m = new Materialite(); - const s = m.newStatelessSource(); + const s = m.newSetSource(comparator); const pipeline = buildPipeline(() => s.stream, ast(q.count())); let effectRunCount = 0; @@ -76,7 +77,7 @@ test('Count', () => { test('Where', () => { const q = new EntityQuery<{fields: E1}>(context, 'e1'); const m = new Materialite(); - const s = m.newStatelessSource(); + const s = m.newSetSource(comparator); const pipeline = buildPipeline( () => s.stream, ast(q.select('id').where('a', '>', 1).where('b', '<', 2)), diff --git a/src/zql/context/context.ts b/src/zql/context/context.ts index 1a69f7b..9126752 100644 --- a/src/zql/context/context.ts +++ b/src/zql/context/context.ts @@ -20,7 +20,10 @@ export function makeTestContext(): Context { const sources = new Map>(); const getSource = (name: string) => { if (!sources.has(name)) { - sources.set(name, materialite.newStatelessSource()); + sources.set( + name, + materialite.newSetSource((l, r) => l.id.localeCompare(r.id)), + ); } return sources.get(name)! as Source; }; diff --git a/src/zql/ivm/graph/difference-stream.test.ts b/src/zql/ivm/graph/difference-stream.test.ts index ecd980c..d13ba63 100644 --- a/src/zql/ivm/graph/difference-stream.test.ts +++ b/src/zql/ivm/graph/difference-stream.test.ts @@ -119,7 +119,7 @@ test('map, filter, linearCount', () => { test('cleaning up the only user of a stream cleans up the entire pipeline', () => { const materialite = new Materialite(); - const set = materialite.newStatelessSource(); + const set = materialite.newSetSource((l, r) => l - r); let notifyCount = 0; const final = set.stream @@ -138,7 +138,7 @@ test('cleaning up the only user of a stream cleans up the entire pipeline', () = test('cleaning up the only user of a stream cleans up the entire pipeline but stops at a used fork', () => { const materialite = new Materialite(); - const set = materialite.newStatelessSource(); + const set = materialite.newSetSource((l, r) => l - r); let notifyCount = 0; const stream1 = set.stream.effect(_ => notifyCount++); diff --git a/src/zql/ivm/materialite.ts b/src/zql/ivm/materialite.ts index e0c8b58..b5957cc 100644 --- a/src/zql/ivm/materialite.ts +++ b/src/zql/ivm/materialite.ts @@ -1,7 +1,6 @@ import {Comparator} from '@vlcn.io/ds-and-algos/types'; import {SourceInternal} from './source/source.js'; import {MutableSetSource} from './source/set-source.js'; -import {StatelessSource} from './source/stateless-source.js'; import {Version} from './types.js'; import {must} from '../error/asserts.js'; @@ -34,10 +33,6 @@ export class Materialite { }; } - newStatelessSource() { - return new StatelessSource(this.#internal); - } - newSetSource(comparator: Comparator) { return new MutableSetSource(this.#internal, comparator); } diff --git a/src/zql/ivm/source/stateless-source.test.ts b/src/zql/ivm/source/stateless-source.test.ts deleted file mode 100644 index 29523ed..0000000 --- a/src/zql/ivm/source/stateless-source.test.ts +++ /dev/null @@ -1,113 +0,0 @@ -import {test, expect} from 'vitest'; -import {Materialite} from '../materialite.js'; - -test('add', () => { - const m = new Materialite(); - const s = m.newStatelessSource(); - - let runs = 0; - s.stream.effect(() => { - runs++; - }); - s.add(1); - expect(runs).toBe(1); - s.add(1); - s.add(2); - expect(runs).toBe(3); -}); - -test('remove', () => { - const m = new Materialite(); - const s = m.newStatelessSource(); - - let runs = 0; - s.stream.effect(() => { - runs++; - }); - // A stateless source does not track what it actually contains - // so it is not an error to remove things that were never added. - s.delete(2); - expect(runs).toBe(1); - s.delete(2); - s.delete(2); - expect(runs).toBe(3); -}); - -test('rollback', () => { - const m = new Materialite(); - const s = m.newStatelessSource(); - - let runs = 0; - s.stream.effect(() => { - runs++; - }); - try { - m.tx(() => { - s.add(1); - s.add(2); - throw new Error('rollback'); - }); - } catch (_) { - // ignore - } - expect(runs).toBe(0); -}); - -test('effects are not notified until transaction commit', () => { - const m = new Materialite(); - const s = m.newStatelessSource(); - - let runs = 0; - s.stream.effect(() => { - runs++; - }); - m.tx(() => { - s.add(2); - expect(runs).toBe(0); - }); - expect(runs).toBe(1); - m.tx(() => { - s.delete(2); - expect(runs).toBe(1); - }); - expect(runs).toBe(2); -}); - -// We don't have a way to test this at the moment 🤔 -// test('reactive graph fully runs on notify', () => { -// const m = new Materialite(); -// const s = m.newStatelessSource(); - -// let mapRun = false; -// let filterRun = false; -// let map2Run = false; -// let filter2Run = false; -// const mapped = s.stream.map(x => { -// mapRun = true; -// return x * 2; -// }); -// mapped.filter(x => { -// filterRun = true; -// return x % 2 === 0; -// }); -// mapped -// .map(x => { -// map2Run = true; -// return x * 2; -// }) -// .filter(x => { -// filter2Run = true; -// return x % 2 === 0; -// }); - -// m.tx(() => { -// s.add(1); -// // hmm... this API shouldn't be exposed to clients. -// // it lets us advance a transaction without committing it 😅 -// s.stream.notify(1); -// expect(mapRun).toBe(true); -// expect(filterRun).toBe(true); -// expect(map2Run).toBe(true); -// expect(filter2Run).toBe(true); -// }); -// }); diff --git a/src/zql/ivm/source/stateless-source.ts b/src/zql/ivm/source/stateless-source.ts deleted file mode 100644 index 5c1fde2..0000000 --- a/src/zql/ivm/source/stateless-source.ts +++ /dev/null @@ -1,108 +0,0 @@ -import {MaterialiteForSourceInternal} from '../materialite.js'; -import {Entry, Multiset} from '../multiset.js'; -import {DifferenceStream} from '../graph/difference-stream.js'; -import {Source, SourceInternal} from './source.js'; -import {Version} from '../types.js'; -import {createPullResponseMessage} from '../graph/message.js'; -import {Request} from '../graph/message.js'; -import {DifferenceStreamReader} from '../graph/difference-stream-reader.js'; - -/** - * Is a source of values. - */ -export class StatelessSource implements Source { - #stream: DifferenceStream; - readonly #internal: SourceInternal; - readonly #materialite: MaterialiteForSourceInternal; - - #pending: Entry[] = []; - - constructor(materialite: MaterialiteForSourceInternal) { - this.#materialite = materialite; - this.#stream = new DifferenceStream(); - this.#internal = { - // add values to queues, add values to the set - onCommitEnqueue: (version: Version) => { - this.#stream.queueData([version, new Multiset(this.#pending)]); - this.#pending = []; - }, - // release queues by telling the stream to send data - onCommitRun: (version: Version) => { - this.#stream.notify(version); - }, - // notify effects / listeners - // this is done once the entire reactive graph has finished computing - // itself - onCommitted: (v: Version) => { - this.#stream.notifyCommitted(v); - }, - onRollback: () => { - this.#pending = []; - }, - }; - } - - get stream(): DifferenceStream { - return this.#stream; - } - - seed(_: Iterable): this { - return this; - } - - processMessage( - message: Request, - downstream: DifferenceStreamReader, - ): void { - switch (message.type) { - case 'pull': { - // This is problematic under the current model of how we run the graph. - // As in, I don't think this'll work for operators with many inputs. - // So this presents another reason to move to optimistically running the graph - // as soon as data is enqueued and making the operators - // able to handle partial inputs. Something I thought avoiding would be simpler but turns out the opposite. - // The other reason is interactive transactions as discussed with Erik - // For interactive transactions we also can't wait until all inputs have been updated - // before running the graph. - const response = createPullResponseMessage(message); - downstream.enqueue([ - this.#materialite.getVersion(), - new Multiset([]), - response, - ]); - downstream.notify(this.#materialite.getVersion()); - downstream.notifyCommitted(this.#materialite.getVersion()); - break; - } - } - } - - addAll(values: Iterable): this { - // TODO (mlaw): start a materialite transaction - for (const v of values) { - this.#pending.push([v, 1]); - } - this.#materialite.addDirtySource(this.#internal); - return this; - } - - add(value: T): this { - this.#pending.push([value, 1]); - this.#materialite.addDirtySource(this.#internal); - return this; - } - - delete(value: T): this { - this.#pending.push([value, -1]); - this.#materialite.addDirtySource(this.#internal); - return this; - } - - deleteAll(values: Iterable): this { - for (const v of values) { - this.#pending.push([v, -1]); - } - this.#materialite.addDirtySource(this.#internal); - return this; - } -} diff --git a/src/zql/ivm/view/tree-view.test.ts b/src/zql/ivm/view/tree-view.test.ts index ed6966a..fd5f2d3 100644 --- a/src/zql/ivm/view/tree-view.test.ts +++ b/src/zql/ivm/view/tree-view.test.ts @@ -16,7 +16,7 @@ type Entity = { type Selected = {id: string; [orderingProp]: Primitive[]}; test('asc and descComparator on Entities', () => { const m = new Materialite(); - const s = m.newStatelessSource(); + const s = m.newSetSource((l, r) => l.id.localeCompare(r.id)); const updatedStream = applySelect( s.stream, @@ -57,7 +57,7 @@ test('add & remove', () => { fc.assert( fc.property(fc.uniqueArray(fc.integer()), arr => { const m = new Materialite(); - const source = m.newStatelessSource(); + const source = m.newSetSource((l, r) => l - r); const view = new MutableTreeView( m, source.stream, @@ -82,7 +82,7 @@ test('replace', () => { fc.assert( fc.property(fc.uniqueArray(fc.integer()), arr => { const m = new Materialite(); - const source = m.newStatelessSource(); + const source = m.newSetSource((l, r) => l - r); const view = new MutableTreeView(m, source.stream, numberComparator, [ ['id'], 'asc',