Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Apr 8, 2024
1 parent 4f326ca commit d9faaaa
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 74 deletions.
9 changes: 7 additions & 2 deletions src/zql/ivm/graph/operators/difference-index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {Primitive} from '../../../ast/ast.js';
import {Entry, Multiset} from '../../multiset.js';
import {JoinResult, StrOrNum, isJoinResult, joinSymbol} from '../../types.js';
import {
JoinResult,
StringOrNumber,
isJoinResult,
joinSymbol,
} from '../../types.js';

/**
* Indexes difference events by a key.
Expand Down Expand Up @@ -42,7 +47,7 @@ export class DifferenceIndex<Key extends Primitive, V> {
aAlias: AAlias | undefined,
other: DifferenceIndex<Key, VO>,
bAlias: BAlias | undefined,
getBValueIdentity: (v: VO) => StrOrNum,
getBValueIdentity: (v: VO) => StringOrNumber,
): Multiset<JoinResult<V, VO, AAlias, BAlias>> {
const ret: (readonly [JoinResult<V, VO, AAlias, BAlias>, number])[] = [];
for (const [key, entry] of this.#index) {
Expand Down
10 changes: 0 additions & 10 deletions src/zql/ivm/graph/operators/join-operator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,6 @@ type Artist = {
name: string;
};

// type Playlist = {
// id: number;
// title: string;
// };

// type PlaylistTrack = {
// playlistId: number;
// trackId: number;
// };

test('unbalanced input', () => {
const trackInput = new DifferenceStream<Track>();
const albumInput = new DifferenceStream<Album>();
Expand Down
102 changes: 47 additions & 55 deletions src/zql/ivm/graph/operators/join-operator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Primitive} from '../../../ast/ast.js';
import {Entry, Multiset} from '../../multiset.js';
import {JoinResult, StrOrNum, Version} from '../../types.js';
import {JoinResult, StringOrNumber} from '../../types.js';
import {DifferenceStream} from '../difference-stream.js';
import {BinaryOperator} from './binary-operator.js';
import {DifferenceIndex} from './difference-index.js';
Expand All @@ -15,11 +15,11 @@ export type JoinArgs<
a: DifferenceStream<AValue>;
aAs: AAlias | undefined;
getAJoinKey: (value: AValue) => Key;
getAPrimaryKey: (value: AValue) => StrOrNum;
getAPrimaryKey: (value: AValue) => StringOrNumber;
b: DifferenceStream<BValue>;
bAs: BAlias | undefined;
getBJoinKey: (value: BValue) => Key;
getBPrimaryKey: (value: BValue) => StrOrNum;
getBPrimaryKey: (value: BValue) => StringOrNumber;
output: DifferenceStream<JoinResult<AValue, BValue, AAlias, BAlias>>;
};

Expand Down Expand Up @@ -58,62 +58,54 @@ export class InnerJoinOperator<
// since they're already aliased
JoinResult<AValue, BValue, AAlias, BAlias>
> {
constructor({
a,
aAs,
getAJoinKey,
getAPrimaryKey,
b,
bAs,
getBJoinKey,
getBPrimaryKey,
output,
}: JoinArgs<K, AValue, BValue, AAlias, BAlias>) {
const indexA = new DifferenceIndex<K, AValue>(getAPrimaryKey);
const indexB = new DifferenceIndex<K, BValue>(getBPrimaryKey);
readonly #indexA: DifferenceIndex<K, AValue>;
readonly #indexB: DifferenceIndex<K, BValue>;
readonly #joinArgs;

const inner = (
_version: Version,
inputA: Multiset<AValue> | undefined,
inputB: Multiset<BValue> | undefined,
) => {
const aKeysForCompaction: K[] = [];
const bKeysForCompaction: K[] = [];
const deltaA = new DifferenceIndex<K, AValue>(getAPrimaryKey);
for (const entry of inputA || []) {
const aKey = getAJoinKey(entry[0]);
deltaA.add(aKey, entry);
aKeysForCompaction.push(aKey);
}
constructor(joinArgs: JoinArgs<K, AValue, BValue, AAlias, BAlias>) {
super(joinArgs.a, joinArgs.b, joinArgs.output, (_, inputA, inputB) =>
this.#join(inputA, inputB),
);
this.#indexA = new DifferenceIndex<K, AValue>(joinArgs.getAPrimaryKey);
this.#indexB = new DifferenceIndex<K, BValue>(joinArgs.getBPrimaryKey);
this.#joinArgs = joinArgs;
}

#join(
inputA: Multiset<AValue> | undefined,
inputB: Multiset<BValue> | undefined,
) {
const {aAs, getAJoinKey, getAPrimaryKey, bAs, getBJoinKey, getBPrimaryKey} =
this.#joinArgs;
const aKeysForCompaction: K[] = [];
const bKeysForCompaction: K[] = [];
const deltaA = new DifferenceIndex<K, AValue>(getAPrimaryKey);
for (const entry of inputA || []) {
const aKey = getAJoinKey(entry[0]);
deltaA.add(aKey, entry);
aKeysForCompaction.push(aKey);
}

const deltaB = new DifferenceIndex<K, BValue>(getBPrimaryKey);
for (const entry of inputB || []) {
const bKey = getBJoinKey(entry[0]);
deltaB.add(bKey, entry);
bKeysForCompaction.push(bKey);
}
const deltaB = new DifferenceIndex<K, BValue>(getBPrimaryKey);
for (const entry of inputB || []) {
const bKey = getBJoinKey(entry[0]);
deltaB.add(bKey, entry);
bKeysForCompaction.push(bKey);
}

const result: Entry<JoinResult<AValue, BValue, AAlias, BAlias>>[] = [];
if (deltaA !== undefined) {
for (const x of deltaA.join(aAs, indexB, bAs, getBPrimaryKey)) {
result.push(x);
}
indexA.extend(deltaA);
}
const result: Entry<JoinResult<AValue, BValue, AAlias, BAlias>>[] = [];
for (const x of deltaA.join(aAs, this.#indexB, bAs, getBPrimaryKey)) {
result.push(x);
}
this.#indexA.extend(deltaA);

if (deltaB !== undefined) {
for (const x of indexA.join(aAs, deltaB, bAs, getBPrimaryKey)) {
result.push(x);
}
indexB.extend(deltaB);
}
for (const x of this.#indexA.join(aAs, deltaB, bAs, getBPrimaryKey)) {
result.push(x);
}
this.#indexB.extend(deltaB);

indexA.compact(aKeysForCompaction);
indexB.compact(bKeysForCompaction);
return result;
};
super(a, b, output, inner);
this.#indexA.compact(aKeysForCompaction);
this.#indexB.compact(bKeysForCompaction);
return result;
}
}

// export
4 changes: 2 additions & 2 deletions src/zql/ivm/graph/operators/reduce-operator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Reduction = {

test('collects all things with the same key', () => {
const input = new DifferenceStream<Thing>();
let tx = 0;
let version = 0;
function getGroupKey(t: Thing) {
return t.groupKey;
}
Expand Down Expand Up @@ -144,7 +144,7 @@ test('collects all things with the same key', () => {
]);

function check(expected: [Reduction, number][]) {
input.commit(++tx);
input.commit(++version);
expect(items).toEqual(expected);
items.length = 0;
}
Expand Down
6 changes: 3 additions & 3 deletions src/zql/ivm/graph/operators/reduce-operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Primitive} from '../../../ast/ast.js';
import {assert} from '../../../error/asserts.js';
import {flatMapIter} from '../../../util/iterables.js';
import {Entry, Multiset} from '../../multiset.js';
import {StrOrNum, Version} from '../../types.js';
import {StringOrNumber, Version} from '../../types.js';
import {DifferenceStream} from '../difference-stream.js';
import {UnaryOperator} from './unary-operator.js';

Expand Down Expand Up @@ -30,7 +30,7 @@ export class ReduceOperator<
* If a negative multiplicity comes through the pipeline,
* it reduces the multiplicity of the existing value in the map.
*/
readonly #inIndex = new Map<K, Map<StrOrNum, [V, number]>>();
readonly #inIndex = new Map<K, Map<StringOrNumber, [V, number]>>();
/**
* Our prior reduction for a given key.
*
Expand All @@ -45,7 +45,7 @@ export class ReduceOperator<
constructor(
input: DifferenceStream<V>,
output: DifferenceStream<O>,
getValueIdentity: (value: V) => StrOrNum,
getValueIdentity: (value: V) => StringOrNumber,
getGroupKey: (value: V) => K,
f: (input: Iterable<V>) => O,
) {
Expand Down
4 changes: 2 additions & 2 deletions src/zql/ivm/types.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
export type Version = number;
export type StrOrNum = string | number;
export type StringOrNumber = string | number;

export const joinSymbol = Symbol();

type JoinResultBase = {
id: StrOrNum;
id: StringOrNumber;
[joinSymbol]: true;
};

Expand Down

0 comments on commit d9faaaa

Please sign in to comment.