From 1bd827b3868e2c9a25e4878bd3dab4611ca3f540 Mon Sep 17 00:00:00 2001 From: Aaron Boodman Date: Sun, 17 Dec 2023 03:57:04 -1000 Subject: [PATCH] implement client diffing via cvr too --- server/src/data.ts | 52 ++++---------- server/src/pull/cvr.ts | 52 ++++---------- server/src/pull/pull.ts | 155 ++++++++++++++++------------------------ server/src/push.ts | 3 - server/src/schema.ts | 18 ++--- todo.md | 2 +- 6 files changed, 92 insertions(+), 190 deletions(-) diff --git a/server/src/data.ts b/server/src/data.ts index 3228ea3..c901f46 100644 --- a/server/src/data.ts +++ b/server/src/data.ts @@ -10,14 +10,12 @@ export type SearchResult = { export type ClientGroupRecord = { id: string; cvrVersion: number | null; - clientVersion: number; }; export type ClientRecord = { id: string; clientGroupID: string; lastMutationID: number; - clientVersion: number; }; export type Affected = { @@ -224,15 +222,15 @@ export async function putClientGroup( executor: Executor, clientGroup: ClientGroupRecord, ) { - const {id, cvrVersion, clientVersion} = clientGroup; + const {id, cvrVersion} = clientGroup; await executor( /*sql*/ `insert into replicache_client_group - (id, cvrversion, clientversion, lastmodified) + (id, cvrversion, lastmodified) values - ($1, $2, $3, now()) + ($1, $2, now()) on conflict (id) do update set - cvrversion = $2, clientversion = $3, lastmodified = now()`, - [id, cvrVersion, clientVersion], + cvrversion = $2, lastmodified = now()`, + [id, cvrVersion], ); } @@ -247,7 +245,6 @@ export async function getClientGroupForUpdate( prevClientGroup ?? { id: clientGroupID, cvrVersion: null, - clientVersion: 0, } ); } @@ -258,7 +255,7 @@ export async function getClientGroup( {forUpdate}: {forUpdate?: boolean} = {}, ) { const {rows} = await executor( - /*sql*/ `select cvrversion, clientversion from replicache_client_group where id = $1 ${ + /*sql*/ `select cvrversion from replicache_client_group where id = $1 ${ forUpdate ? 'for update' : '' }`, [clientGroupID], @@ -268,33 +265,10 @@ export async function getClientGroup( const res: ClientGroupRecord = { id: clientGroupID, cvrVersion: r.cvrversion, - clientVersion: r.clientversion, }; return res; } -export async function searchClients( - executor: Executor, - { - clientGroupID, - sinceClientVersion, - }: {clientGroupID: string; sinceClientVersion: number}, -) { - const {rows} = await executor( - /*sql*/ `select id, lastmutationid, clientversion from replicache_client where clientGroupID = $1 and clientversion > $2`, - [clientGroupID, sinceClientVersion], - ); - return rows.map(r => { - const client: ClientRecord = { - id: r.id, - clientGroupID, - lastMutationID: r.lastmutationid, - clientVersion: r.clientversion, - }; - return client; - }); -} - export async function getClientForUpdate(executor: Executor, clientID: string) { const prevClient = await getClient(executor, clientID, {forUpdate: true}); return ( @@ -302,7 +276,6 @@ export async function getClientForUpdate(executor: Executor, clientID: string) { id: clientID, clientGroupID: '', lastMutationID: 0, - clientVersion: 0, } ); } @@ -313,7 +286,7 @@ export async function getClient( {forUpdate}: {forUpdate?: boolean} = {}, ) { const {rows} = await executor( - /*sql*/ `select clientgroupid, lastmutationid, clientversion from replicache_client where id = $1 ${ + /*sql*/ `select clientgroupid, lastmutationid from replicache_client where id = $1 ${ forUpdate ? 'for update' : '' }`, [clientID], @@ -324,23 +297,22 @@ export async function getClient( id: r.id, clientGroupID: r.clientgroupid, lastMutationID: r.lastmutationid, - clientVersion: r.lastclientversion, }; return res; } export async function putClient(executor: Executor, client: ClientRecord) { - const {id, clientGroupID, lastMutationID, clientVersion} = client; + const {id, clientGroupID, lastMutationID} = client; await executor( /*sql*/ ` insert into replicache_client - (id, clientgroupid, lastmutationid, clientversion, lastmodified) + (id, clientgroupid, lastmutationid, lastmodified) values - ($1, $2, $3, $4, now()) + ($1, $2, $3, now()) on conflict (id) do update set - lastmutationid = $3, clientversion = $4, lastmodified = now() + lastmutationid = $3, lastmodified = now() `, - [id, clientGroupID, lastMutationID, clientVersion], + [id, clientGroupID, lastMutationID], ); } diff --git a/server/src/pull/cvr.ts b/server/src/pull/cvr.ts index c09a06d..d11e8b9 100644 --- a/server/src/pull/cvr.ts +++ b/server/src/pull/cvr.ts @@ -1,10 +1,12 @@ import type {Comment, Description, Issue} from 'shared'; import type {Executor} from '../pg.js'; -export type CVR = { - clientGroupID: string; - clientVersion: number; - order: number; +// This represents the same row as ClientRecord in data.ts, but with the same +// casing as is used in the db rather than camelCase. This is needed because +// this code in pull and cvr.ts assumes they match. +export type Client = { + id: string; + lastmutationid: number; }; // These two consts drive the tables to sync. @@ -14,11 +16,15 @@ export const TableOrdinal = { issue: 1, description: 2, comment: 3, + // eslint-disable-next-line @typescript-eslint/naming-convention + replicache_client: 4, } as const; export type TableType = { - issue: Issue & {version: number}; - description: Description & {version: number}; - comment: Comment & {version: number}; + issue: Issue; + description: Description; + comment: Comment; + // eslint-disable-next-line @typescript-eslint/naming-convention + replicache_client: Client; }; export type SyncedTables = keyof typeof TableOrdinal; @@ -30,38 +36,6 @@ export type Deletes = { [P in keyof TableType]: string[]; }; -export async function getCVR( - executor: Executor, - clientGroupID: string, - order: number, -): Promise { - const result = await executor( - /*sql*/ `SELECT "client_version" FROM "client_view" WHERE "client_group_id" = $1 AND "version" = $2`, - [clientGroupID, order], - ); - if (result.rowCount === 0) { - return undefined; - } - return { - clientGroupID, - order, - clientVersion: result.rows[0].client_version, - }; -} - -export function putCVR(executor: Executor, cvr: CVR) { - return executor( - /*sql*/ `INSERT INTO client_view ( - "client_group_id", - "client_version", - "version" - ) VALUES ($1, $2, $3) ON CONFLICT ("client_group_id", "version") DO UPDATE SET - client_version = excluded.client_version - `, - [cvr.clientGroupID, cvr.clientVersion, cvr.order], - ); -} - export async function recordCreates( executor: Executor, table: T, diff --git a/server/src/pull/pull.ts b/server/src/pull/pull.ts index 1efa181..f887f7c 100644 --- a/server/src/pull/pull.ts +++ b/server/src/pull/pull.ts @@ -1,12 +1,9 @@ import {z} from 'zod'; import type {PatchOperation, PullResponse, PullResponseOKV1} from 'replicache'; import {transact, Executor} from '../pg'; -import {getClientGroupForUpdate, putClientGroup, searchClients} from '../data'; +import {getClientGroupForUpdate, putClientGroup} from '../data'; import type Express from 'express'; import { - CVR, - getCVR, - putCVR, syncedTables, Puts, Deletes, @@ -48,98 +45,66 @@ export async function pull( async function pullInner(pull: PullRequest) { const {clientGroupID} = pull; - const {clientChanges, response, prevCVR, nextCVR} = await transact( - async executor => { - // Get a write lock on the client group first to serialize with other - // requests from the CG and avoid deadlocks. - const baseClientGroupRecord = await getClientGroupForUpdate( - executor, - clientGroupID, - ); - const prevCVR = pull.cookie - ? await getCVR(executor, pull.cookie.clientGroupID, pull.cookie.order) - : undefined; - const baseCVR: CVR = prevCVR ?? { - clientGroupID, - clientVersion: 0, - order: 0, - }; - - // From: https://github.com/rocicorp/todo-row-versioning/blob/d8f351d040db9feae02b4847ea613fbc40aacd17/server/src/pull.ts#L103 - // If there is no clientGroupRecord this means one of two things: - // 1. The client group is actually new - // 2. The client group was forked from an existing client group - // - // In the latter case the cookie will be filled in. This cookie identifies to us - // that while the client group is new it actually has data from the client group it was forked from. - // - // That data it has is represented by the cvr version taken out of the cookie. - let prevCVRVersion = baseClientGroupRecord.cvrVersion; - if (prevCVRVersion === null) { - if (pull.cookie !== null) { - prevCVRVersion = pull.cookie.order; - } else { - prevCVRVersion = 0; - } - console.log( - `ClientGroup ${clientGroupID} is new, initializing to ${prevCVRVersion}`, - ); - } - - const nextClientGroupRecord = { - ...baseClientGroupRecord, - cvrVersion: prevCVRVersion + 1, - }; + const {nextClientGroupRecord, response} = await transact(async executor => { + // Get a write lock on the client group first to serialize with other + // requests from the CG and avoid deadlocks. + const baseClientGroupRecord = await getClientGroupForUpdate( + executor, + clientGroupID, + ); - const nextCVR: CVR = { - clientGroupID, - clientVersion: baseClientGroupRecord.clientVersion, - order: nextClientGroupRecord.cvrVersion, - }; + // From: https://github.com/rocicorp/todo-row-versioning/blob/d8f351d040db9feae02b4847ea613fbc40aacd17/server/src/pull.ts#L103 + // If there is no clientGroupRecord this means one of two things: + // 1. The client group is actually new + // 2. The client group was forked from an existing client group + // + // In the latter case the cookie will be filled in. This cookie identifies to us + // that while the client group is new it actually has data from the client group it was forked from. + // + // That data it has is represented by the cvr version taken out of the cookie. + const prevCVRVersion = + baseClientGroupRecord.cvrVersion ?? pull.cookie?.order ?? 0; + console.log( + `ClientGroup ${clientGroupID} is new, initializing to ${prevCVRVersion}`, + ); - const clientChanges = await searchClients(executor, { - clientGroupID, - sinceClientVersion: baseCVR.clientVersion, - }); - console.log('Got client changes', clientChanges); + const nextClientGroupRecord = { + ...baseClientGroupRecord, + cvrVersion: prevCVRVersion + 1, + }; - await updateClientView(executor, clientGroupID, nextCVR.order); + await updateClientView( + executor, + clientGroupID, + nextClientGroupRecord.cvrVersion, + ); - // For everything the client already has, - // find all deletes and updates to those items. - const [puts, deletes] = await Promise.all([ - time( - () => getAllPuts(executor, clientGroupID, baseCVR.order), - 'getAllPuts', - ), - time( - () => getAllDels(executor, clientGroupID, baseCVR.order), - 'getAllDeletes', - ), - ]); + // For everything the client already has, + // find all deletes and updates to those items. + const [puts, deletes] = await Promise.all([ + time( + () => getAllPuts(executor, clientGroupID, prevCVRVersion), + 'getAllPuts', + ), + time( + () => getAllDels(executor, clientGroupID, prevCVRVersion), + 'getAllDeletes', + ), + ]); - console.log('puts', [...Object.values(puts)].length); + console.log('puts', [...Object.values(puts)].length); - const response = { - puts, - deletes, - }; + const response = { + puts, + deletes, + }; - // TODO: Consider optimizing the case where there are no changes. + // TODO: Consider optimizing the case where there are no changes. - await Promise.all([ - putClientGroup(executor, nextClientGroupRecord), - putCVR(executor, nextCVR), - ]); + await putClientGroup(executor, nextClientGroupRecord); - return { - clientChanges, - response, - prevCVR, - nextCVR, - }; - }, - ); + return {nextClientGroupRecord, response}; + }); const patch: PatchOperation[] = []; @@ -154,11 +119,11 @@ async function pullInner(pull: PullRequest) { ); } - if (prevCVR === undefined) { - patch.push({op: 'clear'}); - } - for (const t of syncedTables) { + if (t === 'replicache_client') { + // Handled specially below. + continue; + } for (const put of response.puts[t]) { // Postgres rightly returns bigint as string given 64 bit ints are > js ints. // JS ints are 53 bits so we can safely convert to number for dates here. @@ -192,6 +157,8 @@ async function pullInner(pull: PullRequest) { } } + console.log('all puts', JSON.stringify(response.puts, null, ' ')); + // TODO: Reimplement partial sync. patch.push({ op: 'put', @@ -201,12 +168,14 @@ async function pullInner(pull: PullRequest) { const respCookie: Cookie = { clientGroupID, - order: nextCVR.order, + order: nextClientGroupRecord.cvrVersion, }; const resp: PullResponseOKV1 = { cookie: respCookie, lastMutationIDChanges: Object.fromEntries( - clientChanges.map(e => [e.id, e.lastMutationID] as const), + response.puts['replicache_client'].map( + ({id, lastmutationid}) => [id, lastmutationid] as const, + ), ), patch, }; diff --git a/server/src/push.ts b/server/src/push.ts index b39fff2..2b577a0 100644 --- a/server/src/push.ts +++ b/server/src/push.ts @@ -80,7 +80,6 @@ async function processMutation( console.log({baseClientGroup, baseClient}); - const nextClientVersion = baseClientGroup.clientVersion + 1; const nextMutationID = baseClient.lastMutationID + 1; if (mutation.id < nextMutationID) { @@ -109,14 +108,12 @@ async function processMutation( const nextClientGroup = { id: clientGroupID, cvrVersion: baseClientGroup.cvrVersion, - clientVersion: nextClientVersion, }; const nextClient = { id: mutation.clientID, clientGroupID, lastMutationID: nextMutationID, - clientVersion: nextClientVersion, }; await Promise.all([ diff --git a/server/src/schema.ts b/server/src/schema.ts index 506f87d..02a86b4 100644 --- a/server/src/schema.ts +++ b/server/src/schema.ts @@ -28,8 +28,6 @@ export async function createSchemaVersion1(executor: Executor) { -- TODO(aa): Need to standardize on either "cvr" or "cv". -- Both are used in this codebase. cvrversion INTEGER null, - -- Last client version used by this Client Group - clientversion INTEGER NOT NULL, lastmodified TIMESTAMP(6) NOT NULL )`); @@ -40,8 +38,10 @@ export async function createSchemaVersion1(executor: Executor) { clientgroupid VARCHAR(36) NOT NULL, -- Last mutation processed from this client. lastmutationid INTEGER NOT NULL, - -- Value of replicache_client_group.clientversion last time this client's last_mutation_id changed. - clientversion INTEGER NOT NULL, + -- This column is generated for compatibility with the row-versioning related code. + -- This way we can treat this table as just another row-versioned table. + -- The only thing that changes about a client is its lmid, so we can use that. + version INTEGER NOT NULL GENERATED ALWAYS AS (lastmutationid) STORED, lastmodified TIMESTAMP(6) NOT NULL )`); @@ -79,16 +79,6 @@ export async function createSchemaVersion1(executor: Executor) { "version" INTEGER NOT NULL )`); - await executor(/*sql*/ `CREATE TABLE "client_view" ( - -- Client Group this Client View was generated for - "client_group_id" VARCHAR(36) NOT NULL, - -- Version of this Client View - -- Note that CV's are recursive: CV_n = CV_n-1 + (changes since CV_n-1) - "version" INTEGER NOT NULL, - "client_version" INTEGER NOT NULL, - PRIMARY KEY ("client_group_id", "version") - )`); - await executor(/*sql*/ `CREATE TABLE "client_view_entry" ( -- Client Group the CV was generated for. "client_group_id" VARCHAR(36) NOT NULL, diff --git a/todo.md b/todo.md index 9484a57..78ccfb0 100644 --- a/todo.md +++ b/todo.md @@ -1,7 +1,7 @@ +- fitler client returned in pull to just those in same client group - don't bump client version when no changes - decide on client_view_record vs client_view - make sure all files are snake_case - make sure all tables and columns are snake_case - add lease to pull - put paging back (by way of expanding a window incrementally that controls what we are syncing) -- move client diffing into cvr (get rid of client_view table entirely)