Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove code not strictly required for ZQL, update comments [1] #53

Merged
merged 4 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
},
"prettier": "@rocicorp/prettier-config",
"dependencies": {
"@vlcn.io/ds-and-algos": "^3.0.2"
"@vlcn.io/ds-and-algos": "^3.0.2",
"compare-utf8": "^0.1.1"
}
}
10 changes: 6 additions & 4 deletions src/zql/ast-to-ivm/pipeline-builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {makeTestContext} from '../context/context.js';
import {Materialite} from '../ivm/materialite.js';
import {EntityQuery, astForTesting as ast} from '../query/entity-query.js';
import {buildPipeline} from './pipeline-builder.js';
import {compareUTF8} from 'compare-utf8';

const e1 = z.object({
id: z.string(),
Expand All @@ -15,10 +16,11 @@ const e1 = z.object({
type E1 = z.infer<typeof e1>;

const context = makeTestContext();
const comparator = (l: E1, r: E1) => compareUTF8(l.id, r.id);
test('A simple select', () => {
const q = new EntityQuery<{fields: E1}>(context, 'e1');
const m = new Materialite();
let s = m.newStatelessSource<E1>();
let s = m.newSetSource<E1>(comparator);
let pipeline = buildPipeline(
() => s.stream,
ast(q.select('id', 'a', 'b', 'c', 'd')),
Expand All @@ -38,7 +40,7 @@ test('A simple select', () => {
s.add(expected[1]);
expect(effectRunCount).toBe(2);

s = m.newStatelessSource();
s = m.newSetSource(comparator);
pipeline = buildPipeline(() => s.stream, ast(q.select('a', 'd')));
const expected2 = [
{a: 1, d: true},
Expand All @@ -57,7 +59,7 @@ test('A simple select', () => {
test('Count', () => {
const q = new EntityQuery<{fields: E1}>(context, 'e1');
const m = new Materialite();
const s = m.newStatelessSource<E1>();
const s = m.newSetSource<E1>(comparator);
const pipeline = buildPipeline(() => s.stream, ast(q.count()));

let effectRunCount = 0;
Expand All @@ -76,7 +78,7 @@ test('Count', () => {
test('Where', () => {
const q = new EntityQuery<{fields: E1}>(context, 'e1');
const m = new Materialite();
const s = m.newStatelessSource<E1>();
const s = m.newSetSource<E1>(comparator);
const pipeline = buildPipeline(
() => s.stream,
ast(q.select('id').where('a', '>', 1).where('b', '<', 2)),
Expand Down
7 changes: 4 additions & 3 deletions src/zql/ast-to-ivm/pipeline-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ export function buildPipeline(
ast: AST,
) {
// filters first
// maps second
// order is a param to materialization
// select last
// order is a param to the source or view
// as well as limit? How does limit work in materialite again?
let stream = sourceStreamProvider(
must(ast.table, 'Table not specified in the AST'),
Expand Down Expand Up @@ -119,6 +119,7 @@ function applyWhere(stream: DifferenceStream<Entity>, where: Condition) {
// |
//
// So `ORs` cause a fork (two branches that need to be evaluated) and then that fork is combined.

if (where.op === 'AND') {
for (const condition of where.conditions) {
ret = applyWhere(ret, condition);
Expand Down Expand Up @@ -257,7 +258,7 @@ function getOperator(op: SimpleOperator): (l: any, r: any) => boolean {
case 'LIKE':
return (l, r) => l.includes(r);
case 'ILIKE':
return (l, r) => l.toLowerCase().includes(r.toLocaleLowerCase());
return (l, r) => l.toLowerCase().includes(r.toLowerCase());
default:
throw new Error(`Operator ${op} not supported`);
}
Expand Down
8 changes: 0 additions & 8 deletions src/zql/ast/ast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,3 @@ export type SimpleCondition =
// value: AST;
// };
};

// | {
// type: 'ref';
// value: Ref;
// } | {
// type: 'query';
// value: AST;
// };
13 changes: 12 additions & 1 deletion src/zql/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@ import {Materialite} from '../ivm/materialite.js';
import {Source} from '../ivm/source/source.js';
import {Entity} from '../../generate.js';
import {Ordering} from '../ast/ast.js';
import {compareUTF8} from 'compare-utf8';

/**
* Used to integrate with the host environment.
*
* A source is a table or collection which ZQL can query.
* The name of a source represents the name of the table
* ZQL is querying.
*/
export type Context = {
materialite: Materialite;
getSource: <T extends Entity>(name: string, ordering?: Ordering) => Source<T>;
Expand All @@ -13,7 +21,10 @@ export function makeTestContext(): Context {
const sources = new Map<string, Source<unknown>>();
const getSource = <T extends Entity>(name: string) => {
if (!sources.has(name)) {
sources.set(name, materialite.newStatelessSource<T>());
sources.set(
name,
materialite.newSetSource<T>((l, r) => compareUTF8(l.id, r.id)),
);
}
return sources.get(name)! as Source<T>;
};
Expand Down
29 changes: 11 additions & 18 deletions src/zql/context/replicache-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export function makeReplicacheContext(rep: ReplicacheLike): Context {
}

/**
* Forwards Replicache changes to Materialite sources so they
* Forwards Replicache changes to ZQL sources so they
* can be fed into any queries that may exist.
*
* Maintains derived orderings of sources as well.
Expand Down Expand Up @@ -61,7 +61,7 @@ class ReplicacheSourceStore {

class ReplicacheSource {
readonly #materialite;
readonly #sources: Map<string, Source<Entity>> = new Map();
readonly #sorts: Map<string, Source<Entity>> = new Map();
readonly #canonicalSource: MutableSetSource<Entity>;
#receivedFirstDiff = false;

Expand All @@ -77,17 +77,18 @@ class ReplicacheSource {

#onReplicacheDiff = (changes: ExperimentalNoIndexDiff) => {
// The first diff is the set of initial values
// to seed the source. We don't process these
// through the dataflow graph.
// Views will explicitly request historical data as needed.
// to seed the source. We call `seed`, rather than add,
// to process these. `seed` will only send to changes
// to views that have explicitly requested history whereas `add` will
// send them to everyone as if they were changes happening _now_.
if (this.#receivedFirstDiff === false) {
this.#canonicalSource.seed(
mapIter(changes, diff => {
assert(diff.op === 'add');
return diff.newValue as Entity;
}),
);
for (const derived of this.#sources.values()) {
for (const derived of this.#sorts.values()) {
derived.seed(
mapIter(changes, diff => {
assert(diff.op === 'add');
Expand All @@ -101,24 +102,16 @@ class ReplicacheSource {
this.#materialite.tx(() => {
for (const diff of changes) {
if (diff.op === 'del' || diff.op === 'change') {
// This lookup of old was due to seeing `oldValue` values that did not match
// what was stored on the client when working on Repliear.
// `oldValue` not matching what is on the client is problematic
// if there are derived sources with orderings that depend on fields outside the id.
// E.g.,
// If a derived source is sorted by `modified_time`
// and the `oldValue` provided has a different `modified_time` we'll fail to remove
// the correct value from the derived source.
const old = this.#canonicalSource.get(diff.oldValue as Entity);
assert(old, 'oldValue not found in canonical source');
this.#canonicalSource.delete(old);
for (const derived of this.#sources.values()) {
for (const derived of this.#sorts.values()) {
derived.delete(old);
}
}
if (diff.op === 'add' || diff.op === 'change') {
this.#canonicalSource.add(diff.newValue as Entity);
for (const derived of this.#sources.values()) {
for (const derived of this.#sorts.values()) {
derived.add(diff.newValue as Entity);
}
}
Expand All @@ -138,11 +131,11 @@ class ReplicacheSource {
const [keys] = ordering;
// We do _not_ use the direction to derive a soure. We can iterate backwards for DESC.
const key = keys.join(',');
let derivation = this.#sources.get(key);
let derivation = this.#sorts.get(key);
if (derivation === undefined) {
const comparator = makeComparator(keys);
derivation = this.#canonicalSource.withNewOrdering(comparator);
this.#sources.set(key, derivation);
this.#sorts.set(key, derivation);
}

return derivation;
Expand Down
74 changes: 0 additions & 74 deletions src/zql/ivm/README.md

This file was deleted.

13 changes: 3 additions & 10 deletions src/zql/ivm/graph/difference-stream-reader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,14 @@ test('drain', () => {
const r = new DifferenceStreamReader(new DifferenceStreamWriter());

// draining empty is not an error
expect(r.drain(1)).toEqual([]);
expect(r.drain(1)).toEqual(undefined);

// only drains up to version
const s1 = new Multiset([[1, 1]]);
const s2 = new Multiset([[2, 1]]);
const s3 = new Multiset([[3, 1]]);
r.enqueue([1, s1]);
r.enqueue([2, s2]);
r.enqueue([3, s3]);
expect(r.drain(2)).toEqual([
[1, s1],
[2, s2],
]);
expect(r.drain(2)).toEqual(undefined);

// drain leaves the queue empty if we're draining all versions in it
expect(r.drain(3)).toEqual([[3, s3]]);
expect(r.drain(3)).toEqual([3, s3]);
expect(r.isEmpty()).toBe(true);
});
Loading
Loading