From 8c036f5d572f02dc7b05986455cb778d16722d32 Mon Sep 17 00:00:00 2001 From: Tal Derei <70081547+TalDerei@users.noreply.github.com> Date: Sun, 20 Oct 2024 04:05:28 -0700 Subject: [PATCH] database batching (#1858) * concurrent promises * changeset * move subscriber notification after tx * remove await inside indexdb transaction * satisy linter by ignoring promise with void --- .changeset/spicy-terms-reflect.md | 6 ++++ packages/query/src/block-processor.ts | 39 +++++++++++++++++----- packages/storage/src/indexed-db/updater.ts | 10 ++++-- 3 files changed, 45 insertions(+), 10 deletions(-) create mode 100644 .changeset/spicy-terms-reflect.md diff --git a/.changeset/spicy-terms-reflect.md b/.changeset/spicy-terms-reflect.md new file mode 100644 index 0000000000..525420f339 --- /dev/null +++ b/.changeset/spicy-terms-reflect.md @@ -0,0 +1,6 @@ +--- +'@penumbra-zone/storage': minor +'@penumbra-zone/query': minor +--- + +batching storage operations with promises diff --git a/packages/query/src/block-processor.ts b/packages/query/src/block-processor.ts index 6c422d4868..34def28c0c 100644 --- a/packages/query/src/block-processor.ts +++ b/packages/query/src/block-processor.ts @@ -263,11 +263,11 @@ export class BlockProcessor implements BlockProcessorInterface { // flushing is slow, avoid it until // - wasm says - // - every 1000th block + // - every 5000th block // - every block at tip const flushReasons = { scannerWantsFlush, - interval: compactBlock.height % 1000n === 0n, + interval: compactBlock.height % 5000n === 0n, new: compactBlock.height > latestKnownBlockHeight, }; @@ -431,14 +431,18 @@ export class BlockProcessor implements BlockProcessorInterface { } private async identifyNewAssets(notes: SpendableNoteRecord[]) { + const saveOperations = []; + for (const note of notes) { const assetId = note.note?.value?.assetId; if (!assetId) { continue; } - await this.saveAndReturnMetadata(assetId); + saveOperations.push(this.saveAndReturnMetadata(assetId)); } + + await Promise.all(saveOperations); } // TODO: refactor. there is definitely a better way to do this. batch @@ -491,11 +495,25 @@ export class BlockProcessor implements BlockProcessorInterface { // Nullifier is published in network when a note is spent or swap is claimed. private async resolveNullifiers(nullifiers: Nullifier[], height: bigint) { const spentNullifiers = new Set(); + const readOperations = []; + const writeOperations = []; for (const nullifier of nullifiers) { - const record = - (await this.indexedDb.getSpendableNoteByNullifier(nullifier)) ?? - (await this.indexedDb.getSwapByNullifier(nullifier)); + const readPromise = (async () => { + const record = + (await this.indexedDb.getSpendableNoteByNullifier(nullifier)) ?? + (await this.indexedDb.getSwapByNullifier(nullifier)); + return { nullifier, record }; + })(); + + readOperations.push(readPromise); + } + + // Await all reads in parallel + const readResults = await Promise.all(readOperations); + + // Process the read results and queue up write operations + for (const { nullifier, record } of readResults) { if (!record) { continue; } @@ -504,19 +522,24 @@ export class BlockProcessor implements BlockProcessorInterface { if (record instanceof SpendableNoteRecord) { record.heightSpent = height; - await this.indexedDb.saveSpendableNote({ + const writePromise = this.indexedDb.saveSpendableNote({ ...toPlainMessage(record), noteCommitment: toPlainMessage(getSpendableNoteRecordCommitment(record)), }); + writeOperations.push(writePromise); } else if (record instanceof SwapRecord) { record.heightClaimed = height; - await this.indexedDb.saveSwap({ + const writePromise = this.indexedDb.saveSwap({ ...toPlainMessage(record), swapCommitment: toPlainMessage(getSwapRecordCommitment(record)), }); + writeOperations.push(writePromise); } } + // Await all writes in parallel + await Promise.all(writeOperations); + return spentNullifiers; } diff --git a/packages/storage/src/indexed-db/updater.ts b/packages/storage/src/indexed-db/updater.ts index adcdebd7da..148d0ecc22 100644 --- a/packages/storage/src/indexed-db/updater.ts +++ b/packages/storage/src/indexed-db/updater.ts @@ -41,12 +41,18 @@ export class IbdUpdater { const tables = updates.all.map(u => u.table); const tx = this.db.transaction(tables, 'readwrite'); + // Batch all the updates into promises for (const update of updates.all) { - await tx.objectStore(update.table).put(update.value, update.key); - this.notifySubscribers(update); + void tx.objectStore(update.table).put(update.value, update.key); } + // Await the atomic transaction to complete await tx.done; + + // Notify subscribers after the transaction has successfully committed + for (const update of updates.all) { + this.notifySubscribers(update); + } } async update>(