Skip to content

Commit

Permalink
Merge branch 'main' into grgbkr/complete-solid
Browse files Browse the repository at this point in the history
  • Loading branch information
arv authored Dec 29, 2024
2 parents c90b4f0 + cc7ac8d commit cd97b2f
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 323 deletions.
40 changes: 40 additions & 0 deletions packages/zero-cache/src/integration/integration.pg-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,5 +269,45 @@ describe('integration', () => {
'pokeEnd',
{pokeID: expect.any(String)},
]);

// Test TRUNCATE
await upDB`TRUNCATE TABLE foo RESTART IDENTITY`;

// One canceled poke
expect(await downstream.dequeue()).toMatchObject([
'pokeStart',
{pokeID: expect.any(String)},
]);
expect(await downstream.dequeue()).toMatchObject([
'pokeEnd',
{pokeID: expect.any(String), cancel: true},
]);

expect(await downstream.dequeue()).toMatchObject([
'pokeStart',
{pokeID: expect.any(String)},
]);
expect(await downstream.dequeue()).toMatchObject([
'pokePart',
{
pokeID: expect.any(String),
rowsPatch: [
{
op: 'del',
tableName: 'foo',
id: {id: 'bar'},
},
{
op: 'del',
tableName: 'foo',
id: {id: 'voo'},
},
],
},
]);
expect(await downstream.dequeue()).toMatchObject([
'pokeEnd',
{pokeID: expect.any(String)},
]);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
} from '../replicator/test-utils.js';
import {CREATE_STORAGE_TABLE, DatabaseStorage} from './database-storage.js';
import {PipelineDriver} from './pipeline-driver.js';
import {Snapshotter} from './snapshotter.js';
import {ResetPipelinesSignal, Snapshotter} from './snapshotter.js';

describe('view-syncer/pipeline-driver', () => {
let dbFile: DbFile;
Expand Down Expand Up @@ -492,6 +492,17 @@ describe('view-syncer/pipeline-driver', () => {
`);
});

test('truncate', () => {
pipelines.init();
[...pipelines.addQuery('hash1', ISSUES_AND_COMMENTS)];

replicator.processTransaction('134', messages.truncate('comments'));

expect(() => [...pipelines.advance().changes]).toThrowError(
ResetPipelinesSignal,
);
});

test('update', () => {
pipelines.init();
[...pipelines.addQuery('hash1', ISSUES_AND_COMMENTS)];
Expand Down
213 changes: 5 additions & 208 deletions packages/zero-cache/src/services/view-syncer/snapshotter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import {
ReplicationMessages,
type FakeReplicator,
} from '../replicator/test-utils.js';
import {setSpecs} from './pipeline-driver.js';
import {
InvalidDiffError,
SchemaChangeError,
ResetPipelinesSignal,
Snapshotter,
} from './snapshotter.js';
import {setSpecs} from './pipeline-driver.js';

describe('view-syncer/snapshotter', () => {
let lc: LogContext;
Expand Down Expand Up @@ -418,22 +418,7 @@ describe('view-syncer/snapshotter', () => {
s2.destroy();
});

test('noop-truncate diff', () => {
const {version} = s.current();

expect(version).toBe('00');

replicator.processTransaction('07', messages.truncate('comments'));

const diff = s.advance(tableSpecs);
expect(diff.prev.version).toBe('00');
expect(diff.curr.version).toBe('01');
expect(diff.changes).toBe(1);

expect([...diff]).toEqual([]);
});

test('truncate diff', () => {
test('truncate', () => {
const {version} = s.current();

expect(version).toBe('00');
Expand All @@ -445,157 +430,7 @@ describe('view-syncer/snapshotter', () => {
expect(diff.curr.version).toBe('01');
expect(diff.changes).toBe(1);

expect([...diff]).toMatchInlineSnapshot(`
[
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"handle": "alice",
"id": 10n,
},
"table": "users",
},
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"handle": "bob",
"id": 20n,
},
"table": "users",
},
]
`);
});

test('consecutive truncates', () => {
const {version} = s.current();

expect(version).toBe('00');

replicator.processTransaction(
'08',
messages.truncate('issues'),
messages.truncate('users'),
);

const diff = s.advance(tableSpecs);
expect(diff.prev.version).toBe('00');
expect(diff.curr.version).toBe('01');
expect(diff.changes).toBe(2);

expect([...diff]).toMatchInlineSnapshot(`
[
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"desc": "foo",
"id": 1n,
"owner": 10n,
},
"table": "issues",
},
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"desc": "bar",
"id": 2n,
"owner": 10n,
},
"table": "issues",
},
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"desc": "baz",
"id": 3n,
"owner": 20n,
},
"table": "issues",
},
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"handle": "alice",
"id": 10n,
},
"table": "users",
},
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"handle": "bob",
"id": 20n,
},
"table": "users",
},
]
`);
});

test('truncate followed by inserts into same table', () => {
const {version} = s.current();

expect(version).toBe('00');

replicator.processTransaction(
'09',
messages.truncate('users'),
messages.insert('users', {id: 20, handle: 'robert'}),
messages.insert('users', {id: 30, handle: 'candice'}),
);

const diff = s.advance(tableSpecs);
expect(diff.prev.version).toBe('00');
expect(diff.curr.version).toBe('01');
expect(diff.changes).toBe(3);

expect([...diff]).toMatchInlineSnapshot(`
[
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"handle": "alice",
"id": 10n,
},
"table": "users",
},
{
"nextValue": null,
"prevValue": {
"_0_version": "00",
"handle": "bob",
"id": 20n,
},
"table": "users",
},
{
"nextValue": {
"_0_version": "01",
"handle": "robert",
"id": 20,
},
"prevValue": null,
"table": "users",
},
{
"nextValue": {
"_0_version": "01",
"handle": "candice",
"id": 30,
},
"prevValue": null,
"table": "users",
},
]
`);
expect(() => [...diff]).toThrowError(ResetPipelinesSignal);
});

test('changelog iterator cleaned up on aborted iteration', () => {
Expand Down Expand Up @@ -631,44 +466,6 @@ describe('view-syncer/snapshotter', () => {
expect(diff.curr.db.statementCache.size).toBe(currStmts + 1);
});

test('truncate iterator cleaned up on aborted iteration', () => {
const s = new Snapshotter(lc, dbFile.path).init();
const {version} = s.current();

expect(version).toBe('00');

replicator.processTransaction('07', messages.truncate('users'));

const diff = s.advance(tableSpecs);
let currStmts = 0;
let prevStmts = 0;

const abortError = new Error('aborted iteration');
try {
for (const change of diff) {
expect(change).toEqual({
nextValue: null,
prevValue: {
['_0_version']: '00',
handle: 'alice',
id: 10n,
},
table: 'users',
});
currStmts = diff.curr.db.statementCache.size;
prevStmts = diff.prev.db.statementCache.size;
throw abortError;
}
} catch (e) {
expect(e).toBe(abortError);
}

// The Statements for both the ChangeLog (curr) and truncated-row (prev)
// iterations should have been returned to the cache.
expect(diff.curr.db.statementCache.size).toBe(currStmts + 1);
expect(diff.prev.db.statementCache.size).toBe(prevStmts + 1);
});

test('schema change diff iteration throws SchemaChangeError', () => {
const {version} = s.current();

Expand All @@ -684,6 +481,6 @@ describe('view-syncer/snapshotter', () => {
expect(diff.curr.version).toBe('01');
expect(diff.changes).toBe(1);

expect(() => [...diff]).toThrow(SchemaChangeError);
expect(() => [...diff]).toThrow(ResetPipelinesSignal);
});
});
Loading

0 comments on commit cd97b2f

Please sign in to comment.