Skip to content

Commit

Permalink
feat(nbstore): add idb implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Nov 18, 2024
1 parent 22e6c6b commit 375f706
Show file tree
Hide file tree
Showing 13 changed files with 721 additions and 4 deletions.
10 changes: 9 additions & 1 deletion packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@
"sideEffects": false,
"exports": {
".": "./src/index.ts",
"./op": "./src/op/index.ts"
"./op": "./src/op/index.ts",
"./idb": "./src/impls/idb/index.ts",
"./idb/v1": "./src/impls/idb/v1/index.ts"
},
"dependencies": {
"@toeverything/infra": "workspace:*",
"eventemitter2": "^6.4.9",
"lodash-es": "^4.17.21",
"rxjs": "^7.8.1",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
"idb": "^8.0.0"
},
"peerDependencies": {
"idb": "^8.0.0"
}
}
89 changes: 89 additions & 0 deletions packages/common/nbstore/src/impls/idb/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { share } from '../../connection';
import {
type BlobRecord,
BlobStorage,
type ListedBlobRecord,
} from '../../storage';
import { IDBConnection } from './db';

export class IndexedDBBlobStorage extends BlobStorage {
readonly connection = share(new IDBConnection(this.options));

get db() {
return this.connection.inner;
}

override async get(key: string) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readonly');
const blob = await trx.objectStore('blobs').get(key);
const data = await trx.objectStore('blobData').get(key);

if (!blob || blob.deletedAt || !data) {
return null;
}

return {
...blob,
data: data.data,
};
}

override async set(blob: BlobRecord) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');
await trx.objectStore('blobs').put({
key: blob.key,
mime: blob.mime,
size: blob.data.byteLength,
createdAt: new Date(),
deletedAt: null,
});
await trx.objectStore('blobData').put({
key: blob.key,
data: blob.data,
});
}

override async delete(key: string, permanently: boolean) {
if (permanently) {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');
await trx.objectStore('blobs').delete(key);
await trx.objectStore('blobData').delete(key);
} else {
const trx = this.db.transaction('blobs', 'readwrite');
const blob = await trx.store.get(key);
if (blob) {
await trx.store.put({
...blob,
deletedAt: new Date(),
});
}
}
}

override async release() {
const trx = this.db.transaction(['blobs', 'blobData'], 'readwrite');

const it = trx.objectStore('blobs').iterate();

for await (const item of it) {
if (item.value.deletedAt) {
await item.delete();
await trx.objectStore('blobData').delete(item.value.key);
}
}
}

override async list() {
const trx = this.db.transaction('blobs', 'readonly');
const it = trx.store.iterate();

const blobs: ListedBlobRecord[] = [];
for await (const item of it) {
if (!item.value.deletedAt) {
blobs.push(item.value);
}
}

return blobs;
}
}
43 changes: 43 additions & 0 deletions packages/common/nbstore/src/impls/idb/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { type IDBPDatabase, openDB } from 'idb';

import { Connection } from '../../connection';
import type { StorageOptions } from '../../storage';
import { type DocStorageSchema, migrator } from './schema';

export class IDBConnection extends Connection<IDBPDatabase<DocStorageSchema>> {
private readonly dbName = `${this.opts.peer}:${this.opts.type}:${this.opts.id}`;

override get shareId() {
return `idb(${migrator.version}):${this.dbName}`;
}

constructor(private readonly opts: StorageOptions) {
super();
}

override async doConnect() {
return openDB<DocStorageSchema>(this.dbName, migrator.version, {
upgrade: migrator.migrate,
blocking: () => {
// if, for example, an tab with newer version is opened, this function will be called.
// we should close current connection to allow the new version to upgrade the db.
this.close(
new Error('Blocking a new version. Closing the connection.')
);
},
blocked: () => {
// fallback to retry auto retry
this.setStatus('error', new Error('Blocked by other tabs.'));
},
});
}

override async doDisconnect() {
this.close();
}

private close(error?: Error) {
this.maybeConnection?.close();
this.setStatus('closed', error);
}
}
118 changes: 118 additions & 0 deletions packages/common/nbstore/src/impls/idb/doc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { share } from '../../connection';
import {
type DocClocks,
type DocRecord,
DocStorage,
type DocUpdate,
} from '../../storage';
import { IDBConnection } from './db';

export class IndexedDBDocStorage extends DocStorage {
readonly connection = share(new IDBConnection(this.options));

get db() {
return this.connection.inner;
}

override async pushDocUpdate(update: DocUpdate) {
const trx = this.db.transaction(['updates', 'clocks'], 'readwrite');
const timestamp = new Date();
await trx.objectStore('updates').add({
...update,
createdAt: timestamp,
});

await trx.objectStore('clocks').put({ docId: update.docId, timestamp });

return { docId: update.docId, timestamp };
}

protected override async getDocSnapshot(docId: string) {
const trx = this.db.transaction('snapshots', 'readonly');
const record = await trx.store.get(docId);

if (!record) {
return null;
}

return {
docId,
bin: record.bin,
timestamp: record.updatedAt,
};
}

override async deleteDoc(docId: string) {
const trx = this.db.transaction(
['snapshots', 'updates', 'clocks'],
'readwrite'
);

const idx = trx.objectStore('updates').index('docId');
const iter = idx.iterate(IDBKeyRange.only(docId));

for await (const { value } of iter) {
await trx.objectStore('updates').delete([value.docId, value.createdAt]);
}

await trx.objectStore('snapshots').delete(docId);
await trx.objectStore('clocks').delete(docId);
}

override async getDocTimestamps(after: Date = new Date(0)) {
const trx = this.db.transaction('clocks', 'readonly');

const clocks = await trx.store.getAll();

return clocks.reduce((ret, cur) => {
if (cur.timestamp > after) {
ret[cur.docId] = cur.timestamp;
}
return ret;
}, {} as DocClocks);
}

protected override async setDocSnapshot(
snapshot: DocRecord
): Promise<boolean> {
const trx = this.db.transaction('snapshots', 'readwrite');
const record = await trx.store.get(snapshot.docId);

if (!record || record.updatedAt < snapshot.timestamp) {
await trx.store.put({
docId: snapshot.docId,
bin: snapshot.bin,
createdAt: record?.createdAt ?? snapshot.timestamp,
updatedAt: snapshot.timestamp,
});
}

trx.commit();
return true;
}

protected override async getDocUpdates(docId: string): Promise<DocRecord[]> {
const trx = this.db.transaction('updates', 'readonly');
const updates = await trx.store.index('docId').getAll(docId);

return updates.map(update => ({
docId,
bin: update.bin,
timestamp: update.createdAt,
}));
}

protected override async markUpdatesMerged(
docId: string,
updates: DocRecord[]
): Promise<number> {
const trx = this.db.transaction('updates', 'readwrite');

await Promise.all(
updates.map(update => trx.store.delete([docId, update.timestamp]))
);

trx.commit();
return updates.length;
}
}
3 changes: 3 additions & 0 deletions packages/common/nbstore/src/impls/idb/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './blob';
export * from './doc';
export * from './sync';
Loading

0 comments on commit 375f706

Please sign in to comment.