From 242a65c4b3f00b2c0a3125a301afd131bcf9ed7a Mon Sep 17 00:00:00 2001 From: oliver-oloughlin Date: Wed, 1 Nov 2023 23:46:49 +0100 Subject: [PATCH] feat: improved performance of deleteMany and deleteAll --- src/collection.ts | 23 ++++++++++++++++++ src/constants.ts | 4 +++- src/kvdex.ts | 42 ++++++++++----------------------- src/types.ts | 12 +++++++--- src/utils.ts | 48 ++++++++++++++++++++++++++++++++++++-- tests/db/deleteAll.test.ts | 13 ++++++----- 6 files changed, 100 insertions(+), 42 deletions(-) diff --git a/src/collection.ts b/src/collection.ts index 0c5ae00..efbcfd7 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -27,6 +27,7 @@ import type { } from "./types.ts" import { allFulfilled, + atomicDelete, createHandlerId, createListSelector, extendKey, @@ -440,6 +441,28 @@ export class Collection< * @returns A promise that resovles to an object containing the iterator cursor */ async deleteMany(options?: ListOptions) { + // Perform quick delete if all documents are to be deleted + if ( + !options?.consistency && + !options?.cursor && + !options?.endId && + !options?.startId && + !options?.filter && + !options?.limit + ) { + // Create list iterator and empty keys list + const iter = this.kv.list({ prefix: this._keys.baseKey }, options) + const keys: Deno.KvKey[] = [] + + // Collect all collection entry keys + for await (const { key } of iter) { + keys.push(key) + } + + // Delete all keys and return + return await atomicDelete(this.kv, keys, options?.batchSize) + } + // Execute delete operation for each document entry const { cursor } = await this.handleMany( this._keys.idKey, diff --git a/src/constants.ts b/src/constants.ts index 71ad083..6cb2a83 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -12,7 +12,9 @@ export const SEGMENT_KEY_PREFIX = "__segment__" export const UNDELIVERED_KEY_PREFIX = "__undelivered__" // Fixed limits -export const ATOMIC_OPERATION_MUTATION_LIMIT = 20 +export const ATOMIC_OPERATION_MUTATION_LIMIT = 1_000 + +export const ATOMIC_OPERATION_SAFE_MUTATION_LIMIT = 20 export const GET_MANY_KEY_LIMIT = 10 diff --git a/src/kvdex.ts b/src/kvdex.ts index 33fcf38..6d317ba 100644 --- a/src/kvdex.ts +++ b/src/kvdex.ts @@ -20,6 +20,7 @@ import { Collection } from "./collection.ts" import { Document } from "./document.ts" import { allFulfilled, + atomicDelete, createHandlerId, extendKey, parseQueueMessage, @@ -189,7 +190,17 @@ export class KvDex> { * @returns Promise resolving to void. */ async deleteAll(options?: DeleteAllOptions) { - return await _deleteAll(this.kv, this.schema, options) + // Create iterator + const iter = this.kv.list({ prefix: [KVDEX_KEY_PREFIX] }) + + // Collect all kvdex keys + const keys: Deno.KvKey[] = [] + for await (const { key } of iter) { + keys.push(key) + } + + // Delete all entries + await atomicDelete(this.kv, keys, options?.atomicBatchSize) } /** @@ -514,32 +525,3 @@ async function _countAll( // Return the sum of collection counts return counts.reduce((sum, c) => sum + c, 0) } - -/** - * Delete all documents in the KV store. - * - * @param kv - Deno KV instance. - * @param schemaOrCollection - Schema or Collection object. - * @param options - DeleteAll options. - * @returns Promise resolving to void. - */ -async function _deleteAll( - kv: Deno.Kv, - schemaOrCollection: - | Schema - | Collection>, - options?: DeleteAllOptions, -) { - // If input is a collection, delete all documents in the collection - if (schemaOrCollection instanceof Collection) { - await schemaOrCollection.deleteMany(options) - return - } - - // Recursively delete all documents from schema collections - await allFulfilled( - Object.values(schemaOrCollection).map((val) => - _deleteAll(kv, val, options) - ), - ) -} diff --git a/src/types.ts b/src/types.ts index 9eb73ee..bca14f3 100644 --- a/src/types.ts +++ b/src/types.ts @@ -208,6 +208,7 @@ export type LargeDocumentEntry = { // Method Option types export type SetOptions = NonNullable["2"]> & { + /** Number of retry attempts before returning failed operation */ retry?: number } @@ -215,14 +216,19 @@ export type ListOptions = Deno.KvListOptions & { /** * Filter documents based on predicate. * - * @param doc - Document - * @returns true or false + * @param doc - Document. + * @returns true or false. */ filter?: (doc: Document) => boolean + /** Id of document to start from. */ startId?: KvId + /** Id of document to end at. */ endId?: KvId + + /** Batch size of atomic operations where applicable */ + atomicBatchSize?: number } export type CountOptions = @@ -237,7 +243,7 @@ export type UpdateManyOptions = ListOptions & SetOptions export type CountAllOptions = Pick -export type DeleteAllOptions = Pick +export type DeleteAllOptions = Pick, "atomicBatchSize"> export type EnqueueOptions = & Omit< diff --git a/src/utils.ts b/src/utils.ts index e954283..00e9d60 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,6 @@ import { ATOMIC_OPERATION_MUTATION_LIMIT, + ATOMIC_OPERATION_SAFE_MUTATION_LIMIT, GET_MANY_KEY_LIMIT, UNDELIVERED_KEY_PREFIX, } from "./constants.ts" @@ -321,8 +322,14 @@ export async function useAtomics( const slicedElements: T[][] = [] // Slice elements based on atomic mutations limit - for (let i = 0; i < elements.length; i += ATOMIC_OPERATION_MUTATION_LIMIT) { - slicedElements.push(elements.slice(i, i + ATOMIC_OPERATION_MUTATION_LIMIT)) + for ( + let i = 0; + i < elements.length; + i += ATOMIC_OPERATION_SAFE_MUTATION_LIMIT + ) { + slicedElements.push( + elements.slice(i, i + ATOMIC_OPERATION_SAFE_MUTATION_LIMIT), + ) } // Invoke callback function for each element and execute atomic operation @@ -479,3 +486,40 @@ export function createListSelector( end, } } + +/** + * Perform multiple delete operations with optimal efficiency using atomic operations. + * + * @param kv - Deno KV instance. + * @param keys - Keys of documents to be deleted. + * @param batchSize - Batch size of deletes in a single atomic operation. + */ +export async function atomicDelete( + kv: Deno.Kv, + keys: Deno.KvKey[], + batchSize = ATOMIC_OPERATION_MUTATION_LIMIT / 2, +) { + // Initiate atomic operation and check + let atomic = kv.atomic() + let check = 0 + + // Loop over and add delete operation for each key + for (const key of keys) { + atomic.delete(key) + + // If check is at limit, commit atomic operation + if (check >= batchSize - 1) { + await atomic.commit() + + // Reset atomic operation and check + atomic = kv.atomic() + check = 0 + } + + // Increment check + check++ + } + + // Commit final atomic operation + await atomic.commit() +} diff --git a/tests/db/deleteAll.test.ts b/tests/db/deleteAll.test.ts index 06bfb59..0b3e447 100644 --- a/tests/db/deleteAll.test.ts +++ b/tests/db/deleteAll.test.ts @@ -6,21 +6,22 @@ Deno.test("db - deleteAll", async (t) => { "Should delete all documents from the database", async () => { await useDb(async (db) => { - const users = generateUsers(10) + const users = generateUsers(100) + const u64s = [ + new Deno.KvU64(0n), + new Deno.KvU64(0n), + ] const crs1 = await db.i_users.addMany(users) const crs2 = await db.l_users.addMany(users) - const crs3 = await db.u64s.addMany([ - new Deno.KvU64(0n), - new Deno.KvU64(0n), - ]) + const crs3 = await db.u64s.addMany(u64s) assert(crs1.every((cr) => cr.ok)) assert(crs2.every((cr) => cr.ok)) assert(crs3.every((cr) => cr.ok)) const count1 = await db.countAll() - assert(count1 === 22) + assert(count1 === users.length * 2 + u64s.length) await db.deleteAll()