From 4c5c8e46da27b6205d64ed65a83c4110130a119f Mon Sep 17 00:00:00 2001 From: Darick Tong <132324914+darkgnotic@users.noreply.github.com> Date: Sun, 29 Dec 2024 16:45:14 +0900 Subject: [PATCH 1/2] fix(zero-cache): fix replication of TRUNCATE statements (#3461) --- .../src/integration/integration.pg-test.ts | 40 ++++ .../view-syncer/pipeline-driver.test.ts | 13 +- .../services/view-syncer/snapshotter.test.ts | 213 +----------------- .../src/services/view-syncer/snapshotter.ts | 111 ++------- .../view-syncer/view-syncer.pg-test.ts | 165 +++++++++++++- .../src/services/view-syncer/view-syncer.ts | 8 +- 6 files changed, 241 insertions(+), 309 deletions(-) diff --git a/packages/zero-cache/src/integration/integration.pg-test.ts b/packages/zero-cache/src/integration/integration.pg-test.ts index 3ab649747..39da04db5 100644 --- a/packages/zero-cache/src/integration/integration.pg-test.ts +++ b/packages/zero-cache/src/integration/integration.pg-test.ts @@ -269,5 +269,45 @@ describe('integration', () => { 'pokeEnd', {pokeID: expect.any(String)}, ]); + + // Test TRUNCATE + await upDB`TRUNCATE TABLE foo RESTART IDENTITY`; + + // One canceled poke + expect(await downstream.dequeue()).toMatchObject([ + 'pokeStart', + {pokeID: expect.any(String)}, + ]); + expect(await downstream.dequeue()).toMatchObject([ + 'pokeEnd', + {pokeID: expect.any(String), cancel: true}, + ]); + + expect(await downstream.dequeue()).toMatchObject([ + 'pokeStart', + {pokeID: expect.any(String)}, + ]); + expect(await downstream.dequeue()).toMatchObject([ + 'pokePart', + { + pokeID: expect.any(String), + rowsPatch: [ + { + op: 'del', + tableName: 'foo', + id: {id: 'bar'}, + }, + { + op: 'del', + tableName: 'foo', + id: {id: 'voo'}, + }, + ], + }, + ]); + expect(await downstream.dequeue()).toMatchObject([ + 'pokeEnd', + {pokeID: expect.any(String)}, + ]); }); }); diff --git a/packages/zero-cache/src/services/view-syncer/pipeline-driver.test.ts b/packages/zero-cache/src/services/view-syncer/pipeline-driver.test.ts index a1abe4537..f7c934e9c 100644 --- a/packages/zero-cache/src/services/view-syncer/pipeline-driver.test.ts +++ b/packages/zero-cache/src/services/view-syncer/pipeline-driver.test.ts @@ -14,7 +14,7 @@ import { } from '../replicator/test-utils.js'; import {CREATE_STORAGE_TABLE, DatabaseStorage} from './database-storage.js'; import {PipelineDriver} from './pipeline-driver.js'; -import {Snapshotter} from './snapshotter.js'; +import {ResetPipelinesSignal, Snapshotter} from './snapshotter.js'; describe('view-syncer/pipeline-driver', () => { let dbFile: DbFile; @@ -492,6 +492,17 @@ describe('view-syncer/pipeline-driver', () => { `); }); + test('truncate', () => { + pipelines.init(); + [...pipelines.addQuery('hash1', ISSUES_AND_COMMENTS)]; + + replicator.processTransaction('134', messages.truncate('comments')); + + expect(() => [...pipelines.advance().changes]).toThrowError( + ResetPipelinesSignal, + ); + }); + test('update', () => { pipelines.init(); [...pipelines.addQuery('hash1', ISSUES_AND_COMMENTS)]; diff --git a/packages/zero-cache/src/services/view-syncer/snapshotter.test.ts b/packages/zero-cache/src/services/view-syncer/snapshotter.test.ts index 3768f0a8e..5ed64682a 100644 --- a/packages/zero-cache/src/services/view-syncer/snapshotter.test.ts +++ b/packages/zero-cache/src/services/view-syncer/snapshotter.test.ts @@ -11,12 +11,12 @@ import { ReplicationMessages, type FakeReplicator, } from '../replicator/test-utils.js'; +import {setSpecs} from './pipeline-driver.js'; import { InvalidDiffError, - SchemaChangeError, + ResetPipelinesSignal, Snapshotter, } from './snapshotter.js'; -import {setSpecs} from './pipeline-driver.js'; describe('view-syncer/snapshotter', () => { let lc: LogContext; @@ -418,22 +418,7 @@ describe('view-syncer/snapshotter', () => { s2.destroy(); }); - test('noop-truncate diff', () => { - const {version} = s.current(); - - expect(version).toBe('00'); - - replicator.processTransaction('07', messages.truncate('comments')); - - const diff = s.advance(tableSpecs); - expect(diff.prev.version).toBe('00'); - expect(diff.curr.version).toBe('01'); - expect(diff.changes).toBe(1); - - expect([...diff]).toEqual([]); - }); - - test('truncate diff', () => { + test('truncate', () => { const {version} = s.current(); expect(version).toBe('00'); @@ -445,157 +430,7 @@ describe('view-syncer/snapshotter', () => { expect(diff.curr.version).toBe('01'); expect(diff.changes).toBe(1); - expect([...diff]).toMatchInlineSnapshot(` - [ - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "handle": "alice", - "id": 10n, - }, - "table": "users", - }, - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "handle": "bob", - "id": 20n, - }, - "table": "users", - }, - ] - `); - }); - - test('consecutive truncates', () => { - const {version} = s.current(); - - expect(version).toBe('00'); - - replicator.processTransaction( - '08', - messages.truncate('issues'), - messages.truncate('users'), - ); - - const diff = s.advance(tableSpecs); - expect(diff.prev.version).toBe('00'); - expect(diff.curr.version).toBe('01'); - expect(diff.changes).toBe(2); - - expect([...diff]).toMatchInlineSnapshot(` - [ - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "desc": "foo", - "id": 1n, - "owner": 10n, - }, - "table": "issues", - }, - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "desc": "bar", - "id": 2n, - "owner": 10n, - }, - "table": "issues", - }, - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "desc": "baz", - "id": 3n, - "owner": 20n, - }, - "table": "issues", - }, - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "handle": "alice", - "id": 10n, - }, - "table": "users", - }, - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "handle": "bob", - "id": 20n, - }, - "table": "users", - }, - ] - `); - }); - - test('truncate followed by inserts into same table', () => { - const {version} = s.current(); - - expect(version).toBe('00'); - - replicator.processTransaction( - '09', - messages.truncate('users'), - messages.insert('users', {id: 20, handle: 'robert'}), - messages.insert('users', {id: 30, handle: 'candice'}), - ); - - const diff = s.advance(tableSpecs); - expect(diff.prev.version).toBe('00'); - expect(diff.curr.version).toBe('01'); - expect(diff.changes).toBe(3); - - expect([...diff]).toMatchInlineSnapshot(` - [ - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "handle": "alice", - "id": 10n, - }, - "table": "users", - }, - { - "nextValue": null, - "prevValue": { - "_0_version": "00", - "handle": "bob", - "id": 20n, - }, - "table": "users", - }, - { - "nextValue": { - "_0_version": "01", - "handle": "robert", - "id": 20, - }, - "prevValue": null, - "table": "users", - }, - { - "nextValue": { - "_0_version": "01", - "handle": "candice", - "id": 30, - }, - "prevValue": null, - "table": "users", - }, - ] - `); + expect(() => [...diff]).toThrowError(ResetPipelinesSignal); }); test('changelog iterator cleaned up on aborted iteration', () => { @@ -631,44 +466,6 @@ describe('view-syncer/snapshotter', () => { expect(diff.curr.db.statementCache.size).toBe(currStmts + 1); }); - test('truncate iterator cleaned up on aborted iteration', () => { - const s = new Snapshotter(lc, dbFile.path).init(); - const {version} = s.current(); - - expect(version).toBe('00'); - - replicator.processTransaction('07', messages.truncate('users')); - - const diff = s.advance(tableSpecs); - let currStmts = 0; - let prevStmts = 0; - - const abortError = new Error('aborted iteration'); - try { - for (const change of diff) { - expect(change).toEqual({ - nextValue: null, - prevValue: { - ['_0_version']: '00', - handle: 'alice', - id: 10n, - }, - table: 'users', - }); - currStmts = diff.curr.db.statementCache.size; - prevStmts = diff.prev.db.statementCache.size; - throw abortError; - } - } catch (e) { - expect(e).toBe(abortError); - } - - // The Statements for both the ChangeLog (curr) and truncated-row (prev) - // iterations should have been returned to the cache. - expect(diff.curr.db.statementCache.size).toBe(currStmts + 1); - expect(diff.prev.db.statementCache.size).toBe(prevStmts + 1); - }); - test('schema change diff iteration throws SchemaChangeError', () => { const {version} = s.current(); @@ -684,6 +481,6 @@ describe('view-syncer/snapshotter', () => { expect(diff.curr.version).toBe('01'); expect(diff.changes).toBe(1); - expect(() => [...diff]).toThrow(SchemaChangeError); + expect(() => [...diff]).toThrow(ResetPipelinesSignal); }); }); diff --git a/packages/zero-cache/src/services/view-syncer/snapshotter.ts b/packages/zero-cache/src/services/view-syncer/snapshotter.ts index cd425729c..183d3f618 100644 --- a/packages/zero-cache/src/services/view-syncer/snapshotter.ts +++ b/packages/zero-cache/src/services/view-syncer/snapshotter.ts @@ -6,7 +6,7 @@ import {Database} from '../../../../zqlite/src/db.js'; import {fromSQLiteTypes} from '../../../../zqlite/src/table-source.js'; import type {LiteAndZqlSpec, LiteTableSpec} from '../../db/specs.js'; import {StatementRunner} from '../../db/statements.js'; -import {jsonObjectSchema, type JSONValue} from '../../types/bigint-json.js'; +import {type JSONValue} from '../../types/bigint-json.js'; import { normalizedKeyOrder, type RowKey, @@ -219,13 +219,14 @@ export interface SnapshotDiff extends Iterable { /** * Thrown during an iteration of a {@link SnapshotDiff} when a schema - * change is encountered. + * change or truncate is encountered, which result in aborting the + * advancement and resetting / rehydrating the pipelines. */ -export class SchemaChangeError extends Error { - readonly name = 'SchemaChangeError'; +export class ResetPipelinesSignal extends Error { + readonly name = 'ResetPipelinesSignal'; - constructor(table: string) { - super(`schema for table ${table} has changed`); + constructor(msg: string) { + super(msg); } } @@ -351,16 +352,13 @@ class Diff implements SnapshotDiff { [Symbol.iterator](): Iterator { const {changes, cleanup: done} = this.curr.changesSince(this.prev.version); - const truncates = new TruncateTracker(this.prev); const cleanup = () => { try { // Allow open iterators to clean up their state. - truncates.iterReturn(undefined); changes.return?.(undefined); } finally { done(); - truncates.done(); } }; @@ -368,12 +366,6 @@ class Diff implements SnapshotDiff { next: () => { try { for (;;) { - // Exhaust the TRUNCATE iteration before continuing the Change sequence. - const truncatedRow = truncates.next(); - if (truncatedRow) { - return truncatedRow; - } - const {value, done} = changes.next(); if (done) { cleanup(); @@ -383,17 +375,20 @@ class Diff implements SnapshotDiff { const {table, rowKey, op, stateVersion} = v.parse(value, schema); if (op === RESET_OP) { // The current map of `TableSpec`s may not have the correct or complete information. - throw new SchemaChangeError(table); + throw new ResetPipelinesSignal( + `schema for table ${table} has changed`, + ); } - const {tableSpec, zqlSpec} = must(this.tables.get(table)); if (op === TRUNCATE_OP) { - truncates.startTruncate(tableSpec); - continue; // loop around to pull rows from the TruncateTracker. + // Truncates are also processed by rehydrating pipelines at current. + throw new ResetPipelinesSignal( + `table ${table} has been truncated`, + ); } + const {tableSpec, zqlSpec} = must(this.tables.get(table)); assert(rowKey !== null); - let prevValue = - truncates.getRowIfNotTruncated(tableSpec, rowKey) ?? null; + let prevValue = this.prev.getRow(tableSpec, rowKey) ?? null; let nextValue = op === SET_OP ? this.curr.getRow(tableSpec, rowKey) : null; @@ -459,80 +454,6 @@ class Diff implements SnapshotDiff { } } -/** - * `TRUNCATE` changes are handled by: - * 1. Iterating over all of the rows in the `prev` Snapshot and returning - * corresponding `DELETE` row operations for them (i.e. `nextValue: null`). - * 2. Tracking the fact that a table has been truncated (i.e. all row-deletes - * have been returned) so that subsequent lookups of prevValues (e.g. for - * inserts after the truncate) correctly return `null`. - */ -class TruncateTracker { - readonly #prev: Snapshot; - readonly #truncated = new Set(); - - #truncating: { - table: string; - rows: Iterator; - cleanup: () => void; - } | null = null; - - constructor(prev: Snapshot) { - this.#prev = prev; - } - - startTruncate(table: LiteTableSpec) { - assert(this.#truncating === null); - const {rows, cleanup} = this.#prev.getRows(table); - this.#truncating = {table: table.name, rows, cleanup}; - } - - next(): IteratorResult | null { - if (this.#truncating === null) { - return null; - } - const {table} = this.#truncating; - const {value, done} = this.#truncating.rows.next(); - if (done) { - this.#truncating.cleanup(); - this.#truncating = null; - this.#truncated.add(table); - return null; - } - const prevValue = v.parse(value, jsonObjectSchema); - - // Sanity check detects if the diff is being accessed after the Snapshots have advanced. - if ((prevValue[ROW_VERSION] ?? '~') > this.#prev.version) { - throw new InvalidDiffError( - `Diff is no longer valid. prev db has advanced past ${ - this.#prev.version - }.`, - ); - } - - return {value: {table, prevValue, nextValue: null} satisfies Change}; - } - - getRowIfNotTruncated(table: LiteTableSpec, rowKey: RowKey) { - // If the row has been returned in a TRUNCATE iteration, its prevValue is henceforth null. - return this.#truncated.has(table.name) - ? null - : this.#prev.getRow(table, rowKey); - } - - iterReturn(value: unknown) { - this.#truncating?.rows.return?.(value); - } - - iterThrow(err: unknown) { - this.#truncating?.rows.throw?.(err); - } - - done() { - this.#truncating?.cleanup(); - } -} - export class InvalidDiffError extends Error { constructor(msg: string) { super(msg); diff --git a/packages/zero-cache/src/services/view-syncer/view-syncer.pg-test.ts b/packages/zero-cache/src/services/view-syncer/view-syncer.pg-test.ts index 118e3af1c..e32c86ec3 100644 --- a/packages/zero-cache/src/services/view-syncer/view-syncer.pg-test.ts +++ b/packages/zero-cache/src/services/view-syncer/view-syncer.pg-test.ts @@ -1268,7 +1268,7 @@ describe('view-syncer/service', () => { }); }); - test('process advancement', async () => { + test('process successful advancement', async () => { const client = connect(SYNC_CONTEXT, [ {op: 'put', hash: 'query-hash1', ast: ISSUES_QUERY}, {op: 'put', hash: 'query-hash2', ast: ISSUES_QUERY2}, @@ -1514,6 +1514,169 @@ describe('view-syncer/service', () => { }, ] `); + + replicator.processTransaction('124', messages.truncate('issues')); + + stateChanges.push({state: 'version-ready'}); + + // One canceled poke. + expect(await nextPoke(client)).toMatchInlineSnapshot(` + [ + [ + "pokeStart", + { + "baseCookie": "01", + "cookie": "123", + "pokeID": "123", + "schemaVersions": { + "maxSupportedVersion": 3, + "minSupportedVersion": 2, + }, + }, + ], + [ + "pokeEnd", + { + "cancel": true, + "pokeID": "123", + }, + ], + ] + `); + + // Then a poke that deletes issues rows in the CVR. + expect(await nextPoke(client)).toMatchInlineSnapshot(` + [ + [ + "pokeStart", + { + "baseCookie": "01", + "cookie": "123", + "pokeID": "123", + "schemaVersions": { + "maxSupportedVersion": 3, + "minSupportedVersion": 2, + }, + }, + ], + [ + "pokePart", + { + "pokeID": "123", + "rowsPatch": [ + { + "id": { + "id": "1", + }, + "op": "del", + "tableName": "issues", + }, + { + "id": { + "id": "3", + }, + "op": "del", + "tableName": "issues", + }, + { + "id": { + "id": "4", + }, + "op": "del", + "tableName": "issues", + }, + { + "id": { + "id": "5", + }, + "op": "del", + "tableName": "issues", + }, + ], + }, + ], + [ + "pokeEnd", + { + "pokeID": "123", + }, + ], + ] + `); + + expect(await cvrDB`SELECT * from cvr.rows`).toMatchInlineSnapshot(` + Result [ + { + "clientGroupID": "9876", + "patchVersion": "00:02", + "refCounts": { + "lmids": 1, + }, + "rowKey": { + "clientGroupID": "9876", + "clientID": "foo", + }, + "rowVersion": "00", + "schema": "", + "table": "zero_ABC.clients", + }, + { + "clientGroupID": "9876", + "patchVersion": "01", + "refCounts": null, + "rowKey": { + "id": "2", + }, + "rowVersion": "00", + "schema": "", + "table": "issues", + }, + { + "clientGroupID": "9876", + "patchVersion": "123", + "refCounts": null, + "rowKey": { + "id": "1", + }, + "rowVersion": "01", + "schema": "", + "table": "issues", + }, + { + "clientGroupID": "9876", + "patchVersion": "123", + "refCounts": null, + "rowKey": { + "id": "3", + }, + "rowVersion": "00", + "schema": "", + "table": "issues", + }, + { + "clientGroupID": "9876", + "patchVersion": "123", + "refCounts": null, + "rowKey": { + "id": "4", + }, + "rowVersion": "00", + "schema": "", + "table": "issues", + }, + { + "clientGroupID": "9876", + "patchVersion": "123", + "refCounts": null, + "rowKey": { + "id": "5", + }, + "rowVersion": "00", + "schema": "", + "table": "issues", + }, + ] + `); }); test('process advancement that results in client having an unsupported schemaVersion', async () => { diff --git a/packages/zero-cache/src/services/view-syncer/view-syncer.ts b/packages/zero-cache/src/services/view-syncer/view-syncer.ts index 00882520c..71bfcd592 100644 --- a/packages/zero-cache/src/services/view-syncer/view-syncer.ts +++ b/packages/zero-cache/src/services/view-syncer/view-syncer.ts @@ -58,7 +58,7 @@ import { type NullableCVRVersion, type RowID, } from './schema/types.js'; -import {SchemaChangeError} from './snapshotter.js'; +import {ResetPipelinesSignal} from './snapshotter.js'; export type TokenData = { readonly raw: string; @@ -210,7 +210,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService { if (result === 'success') { return; } - lc.info?.(`resetting for schema change: ${result.message}`); + lc.info?.(`resetting pipelines: ${result.message}`); this.#pipelines.reset(); } @@ -957,7 +957,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService { #advancePipelines( lc: LogContext, cvr: CVRSnapshot, - ): Promise<'success' | SchemaChangeError> { + ): Promise<'success' | ResetPipelinesSignal> { return startAsyncSpan(tracer, 'vs.#advancePipelines', async () => { assert(this.#pipelines.initialized()); const start = Date.now(); @@ -1000,7 +1000,7 @@ export class ViewSyncerService implements ViewSyncer, ActivityBasedService { transformationHashToHash, ); } catch (e) { - if (e instanceof SchemaChangeError) { + if (e instanceof ResetPipelinesSignal) { pokers.forEach(poker => poker.cancel()); return e; } From cc7ac8d347812bb25905e78e3d0d4cc73fc5d759 Mon Sep 17 00:00:00 2001 From: Darick Tong <132324914+darkgnotic@users.noreply.github.com> Date: Sun, 29 Dec 2024 17:31:36 +0900 Subject: [PATCH 2/2] feat(zero-cache): improve the error message for unsupported protocol versions (#3462) --- packages/zero-cache/src/workers/connection.ts | 17 ++++++++----- .../zero-protocol/src/protocol-version.ts | 25 +++++++++++++------ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/packages/zero-cache/src/workers/connection.ts b/packages/zero-cache/src/workers/connection.ts index 510e7f726..8f85bbfa7 100644 --- a/packages/zero-cache/src/workers/connection.ts +++ b/packages/zero-cache/src/workers/connection.ts @@ -16,7 +16,10 @@ import { type PongMessage, upstreamSchema, } from '../../../zero-protocol/src/mod.js'; -import {PROTOCOL_VERSION} from '../../../zero-protocol/src/protocol-version.js'; +import { + MIN_SERVER_SUPPORTED_PROTOCOL_VERSION, + PROTOCOL_VERSION, +} from '../../../zero-protocol/src/protocol-version.js'; import type {ConnectParams} from '../services/dispatcher/connect-params.js'; import type {Mutagen} from '../services/mutagen/mutagen.js'; import type { @@ -102,14 +105,16 @@ export class Connection { */ init() { if ( - this.#protocolVersion !== PROTOCOL_VERSION && - this.#protocolVersion !== PROTOCOL_VERSION - 1 + this.#protocolVersion > PROTOCOL_VERSION || + this.#protocolVersion < MIN_SERVER_SUPPORTED_PROTOCOL_VERSION ) { this.#closeWithError({ kind: ErrorKind.VersionNotSupported, - message: `server supports v${ - PROTOCOL_VERSION - 1 - } and v${PROTOCOL_VERSION} protocols`, + message: `server is at sync protocol v${PROTOCOL_VERSION} and does not support v${ + this.#protocolVersion + }. The ${ + this.#protocolVersion > PROTOCOL_VERSION ? 'server' : 'client' + } must be updated to a newer release.`, }); } else { const connectedMessage: ConnectedMessage = [ diff --git a/packages/zero-protocol/src/protocol-version.ts b/packages/zero-protocol/src/protocol-version.ts index 407219b6f..2af2e2369 100644 --- a/packages/zero-protocol/src/protocol-version.ts +++ b/packages/zero-protocol/src/protocol-version.ts @@ -1,4 +1,8 @@ +import {assert} from '../../shared/src/asserts.js'; + /** + * The current `PROTOCOL_VERSION` of the code. + * * The `PROTOCOL_VERSION` encompasses both the wire-protocol of the `/sync/...` * connection between the browser and `zero-cache`, as well as the format of * the `AST` objects stored in both components (i.e. IDB and CVR). @@ -7,13 +11,18 @@ * accompanied by an increment of the `PROTOCOL_VERSION` and a new major * release. The server (`zero-cache`) must be deployed before clients start * running the new code. - * - * The contract for backwards compatibility is that a `zero-cache` supports - * its current `PROTOCOL_VERSION` and the previous one (i.e. - * `PROTOCOL_VERSION - 1`, which is necessary to support old clients when - * the server is rolled out). This corresponds to supporting clients running - * the current release and the previous (major) release. Any client connections - * from earlier protocol versions are closed with a `VersionNotSupported` - * error. */ export const PROTOCOL_VERSION = 3; + +/** + * The minimum protocol version supported by the server. The contract for + * backwards compatibility is that a `zero-cache` supports the current + * `PROTOCOL_VERSION` and at least the previous one (i.e. `PROTOCOL_VERSION - 1`) + * if not earlier ones as well. This corresponds to supporting clients running + * the current release and the previous (major) release. Any client connections + * from protocol versions before `MIN_SERVER_SUPPORTED_PROTOCOL_VERSION` are + * closed with a `VersionNotSupported` error. + */ +export const MIN_SERVER_SUPPORTED_PROTOCOL_VERSION = 2; + +assert(MIN_SERVER_SUPPORTED_PROTOCOL_VERSION < PROTOCOL_VERSION);