Skip to content

Commit

Permalink
Flesh out what we'll hoist to sources
Browse files Browse the repository at this point in the history
PullMsg / PullHistoricalData asks a source to send old data.

Sources of history may not need to send _all_ history. And we wouldn't
want them to if they have say a million items.

To deal with that, the graph collects information
 about what could constrain the source data. It does this by augmenting the PullMsg.

 A view sets:
 - ordering
 - query type (optionally)

 Upstream operators set:
 - hoistedConditions

Why?

Querying historical data vs responding to changes in
data are slightly different problems.

E.g.,

 "Find me all items in the set greater than Y" /  SELECT * FROM set WHERE item > Y

vs

 "Find me all queries that do not care about the value of Y
 or where Y is less then item"

Pulling answers the former. The data flow graph
answers the latter as well as being able to (no optimally) answer the former.

The former doesn't have to be answered fully since the graph
will filter any over-fetched rows. This means that the source
only needs to select and apply the constraint that matches
its ordering.
  • Loading branch information
tantaman committed Mar 27, 2024
1 parent d6779ff commit 58c54f6
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 37 deletions.
16 changes: 15 additions & 1 deletion src/zql/ast/ast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
export type Ordering = readonly [readonly string[], 'asc' | 'desc'];
export type Primitive = string | number | boolean | null;
// type Ref = `${string}.${string}`;

/**
* Note: We'll eventually need to start ordering conditions
* in the dataflow graph so we get the maximum amount
* of sharing between queries.
*/
export type AST = {
readonly table?: string | undefined;
readonly alias?: number | undefined;
Expand All @@ -22,7 +28,7 @@ export type AST = {
// }[];
readonly limit?: number | undefined;
// readonly groupBy?: string[];
readonly orderBy?: Ordering | undefined;
readonly orderBy: Ordering;
// readonly after?: Primitive;
};

Expand Down Expand Up @@ -57,3 +63,11 @@ export type SimpleCondition =
// value: AST;
// };
};

// | {
// type: 'ref';
// value: Ref;
// } | {
// type: 'query';
// value: AST;
// };
2 changes: 1 addition & 1 deletion src/zql/ivm/graph/difference-stream-writer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ test('replying to a message only notifies along the requesting path', () => {
new DebugOperator(r, outputWriter, () => (notifications[i] = true));
});

const msg = createPullMessage();
const msg = createPullMessage([[], 'asc'], 'select');

outputs[1].messageUpstream(msg);

Expand Down
53 changes: 48 additions & 5 deletions src/zql/ivm/graph/message.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,52 @@
import {Ordering, SimpleCondition} from '../../ast/ast.js';

export type Request = PullMsg;

/**
* Used to pull historical data down the pipeline.
*
* Sources of history may not need to send _all_ history.
*
* To deal with that, the graph collects information
* about what could constrain the data to send.
*
* The view sets:
* - ordering
* - query type
*
* Upstream operators set:
* - hoistedConditions
*
* Querying historical data vs responding to changes in
* data are slightly different problems.
*
* E.g.,
*
* "Find me all items in the set greater than Y" ->
* SELECT * FROM set WHERE item > Y
*
*
* vs
*
* "Find me all queries that do not care about the value of Y
* or where Y is less then item"
*
* Pulling answers the former. The data flow graph
* answers the latter.
*/
export type PullMsg = {
id: number;
type: 'pull';
readonly id: number;
readonly type: 'pull';
readonly order?: Ordering | undefined;
readonly hoistedConditions: SimpleCondition[];
readonly queryType?: 'count' | 'select' | undefined;
};

export type Reply = PullReplyMsg;

export type PullReplyMsg = {
replyingTo: number;
type: 'pullResponse';
readonly replyingTo: number;
readonly type: 'pullResponse';
};

let messageID = 0;
Expand All @@ -28,10 +65,16 @@ export function nextMessageID() {
* E.g., if there is a filter against the primary key. The source
* can use that information to restrict the rows it returns.
*/
export function createPullMessage(): Request {
export function createPullMessage(
order: Ordering | undefined,
queryType?: 'count' | 'select' | undefined,
): Request {
return {
id: nextMessageID(),
type: 'pull',
order,
hoistedConditions: [],
queryType,
};
}

Expand Down
5 changes: 1 addition & 4 deletions src/zql/ivm/view/abstract-view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {DifferenceStream} from '../graph/difference-stream.js';
import {Version} from '../types.js';
import {View} from './view.js';
import {assert} from '../../error/asserts.js';
import {createPullMessage} from '../graph/message.js';

export abstract class AbstractView<T, CT> implements View<CT> {
readonly #stream;
Expand Down Expand Up @@ -56,9 +55,7 @@ export abstract class AbstractView<T, CT> implements View<CT> {
return this.#hydrated;
}

pullHistoricalData() {
this._reader.messageUpstream(createPullMessage());
}
abstract pullHistoricalData(): void;

protected _notifyCommitted(d: CT, v: Version) {
if (this._notifiedListenersVersion === v) {
Expand Down
5 changes: 5 additions & 0 deletions src/zql/ivm/view/primitive-view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {DifferenceStream} from '../graph/difference-stream.js';
import {Version} from '../types.js';
import {AbstractView} from './abstract-view.js';
import {must} from '../../error/asserts.js';
import {createPullMessage} from '../graph/message.js';

/**
* Represents the most recent value from a stream of primitives.
Expand All @@ -23,6 +24,10 @@ export class ValueView<T> extends AbstractView<T, T | null> {
return this.#data;
}

pullHistoricalData(): void {
this._reader.messageUpstream(createPullMessage(undefined));
}

protected _run(version: Version) {
const collections = this._reader.drain(version);
if (collections.length === 0) {
Expand Down
24 changes: 10 additions & 14 deletions src/zql/ivm/view/tree-view.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ test('asc and descComparator on Entities', () => {
[['n', 'id'], 'asc'],
) as DifferenceStream<Selected>;

const view = new MutableTreeView<Selected>(
m,
updatedStream,
ascComparator,
true,
);
const view = new MutableTreeView<Selected>(m, updatedStream, ascComparator, [
['n', 'id'],
'asc',
]);
const descView = new MutableTreeView<Selected>(
m,
applySelect(
Expand All @@ -38,7 +36,7 @@ test('asc and descComparator on Entities', () => {
[['n', 'id'], 'desc'],
) as DifferenceStream<Selected>,
descComparator,
true,
[['n', 'id'], 'desc'],
);

const items = [
Expand All @@ -64,7 +62,7 @@ test('add & remove', () => {
m,
source.stream,
numberComparator,
true,
undefined,
);

m.tx(() => {
Expand All @@ -85,12 +83,10 @@ test('replace', () => {
fc.property(fc.uniqueArray(fc.integer()), arr => {
const m = new Materialite();
const source = m.newStatelessSource<number>();
const view = new MutableTreeView(
m,
source.stream,
numberComparator,
true,
);
const view = new MutableTreeView(m, source.stream, numberComparator, [
['id'],
'asc',
]);

m.tx(() => {
arr.forEach(x => source.add(x));
Expand Down
28 changes: 18 additions & 10 deletions src/zql/ivm/view/tree-view.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {Materialite} from '../materialite.js';
import {Multiset} from '../multiset.js';
import {Version} from '../types.js';
import {AbstractView} from './abstract-view.js';
import {Ordering} from '../../ast/ast.js';
import {createPullMessage} from '../graph/message.js';

/**
* A sink that maintains the list of values in-order.
Expand All @@ -25,7 +27,7 @@ class AbstractTreeView<T> extends AbstractView<T, T[]> {
#limit?: number;
#min?: T;
#max?: T;
readonly #isInSourceOrder;
readonly #order;
readonly id = id++;
readonly #comparator;

Expand All @@ -34,15 +36,15 @@ class AbstractTreeView<T> extends AbstractView<T, T[]> {
stream: DifferenceStream<T>,
comparator: Comparator<T>,
tree: ITree<T>,
isInSourceOrder: boolean,
limit?: number,
order: Ordering | undefined,
limit?: number | undefined,
name: string = '',
) {
super(materialite, stream, name);
this.#limit = limit;
this.#data = tree;
this.#comparator = comparator;
this.#isInSourceOrder = isInSourceOrder;
this.#order = order;
if (limit !== undefined) {
this.#addAll = this.#limitedAddAll;
this.#removeAll = this.#limitedRemoveAll;
Expand Down Expand Up @@ -95,7 +97,7 @@ class AbstractTreeView<T> extends AbstractView<T, T[]> {
const fullRecompute = false;
while (!(next = iterator.next()).done) {
const [value, mult] = next.value;
if (this.#limit !== undefined && fullRecompute && this.#isInSourceOrder) {
if (this.#limit !== undefined && fullRecompute && this.#order) {
if (data.size >= this.#limit && mult > 0) {
// bail early. During a re-compute with a source in the same order
// as the view we can bail once we've consumed `LIMIT` items.
Expand Down Expand Up @@ -193,6 +195,10 @@ class AbstractTreeView<T> extends AbstractView<T, T[]> {
return data;
}

pullHistoricalData(): void {
this._reader.messageUpstream(createPullMessage(this.#order, 'select'));
}

#updateMinMax(value: T) {
if (this.#min === undefined || this.#max === undefined) {
this.#max = this.#min = value;
Expand Down Expand Up @@ -226,7 +232,7 @@ export class PersistentTreeView<T> extends AbstractTreeView<T> {
materialite: Materialite,
stream: DifferenceStream<T>,
comparator: Comparator<T>,
isInSourceOrder: boolean,
order: Ordering,
limit?: number,
name: string = '',
) {
Expand All @@ -235,7 +241,7 @@ export class PersistentTreeView<T> extends AbstractTreeView<T> {
stream,
comparator,
new PersistentTreap(comparator),
isInSourceOrder,
order,
limit,
name,
);
Expand All @@ -247,16 +253,18 @@ export class MutableTreeView<T> extends AbstractTreeView<T> {
materialite: Materialite,
stream: DifferenceStream<T>,
comparator: Comparator<T>,
isInSourceOrder: boolean,
limit?: number,
// TODO: type `ordering` so it has a relationship
// to `Commparator`
order: Ordering | undefined,
limit?: number | undefined,
name: string = '',
) {
super(
materialite,
stream,
comparator,
new Treap(comparator),
isInSourceOrder,
order,
limit,
name,
);
Expand Down
4 changes: 2 additions & 2 deletions src/zql/query/statement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ export class Statement<Return> implements IStatement<Return> {
this.#pipeline as DifferenceStream<
Return extends [] ? Return[number] : never
>,
this.#ast.orderBy?.[1] === 'asc' ? ascComparator : descComparator,
true, // TODO: since we're going to control everything we can make this so.
this.#ast.orderBy[1] === 'asc' ? ascComparator : descComparator,
this.#ast.orderBy,
this.#ast.limit,
) as unknown as View<Return extends [] ? Return[number] : Return>;
}
Expand Down

0 comments on commit 58c54f6

Please sign in to comment.