Skip to content

Commit

Permalink
feat: improved performance of deleteMany and deleteAll
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-oloughlin committed Nov 1, 2023
1 parent 6caaee8 commit 242a65c
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 42 deletions.
23 changes: 23 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import type {
} from "./types.ts"
import {
allFulfilled,
atomicDelete,
createHandlerId,
createListSelector,
extendKey,
Expand Down Expand Up @@ -440,6 +441,28 @@ export class Collection<
* @returns A promise that resovles to an object containing the iterator cursor
*/
async deleteMany(options?: ListOptions<T1>) {
// Perform quick delete if all documents are to be deleted
if (
!options?.consistency &&
!options?.cursor &&
!options?.endId &&
!options?.startId &&
!options?.filter &&
!options?.limit
) {
// Create list iterator and empty keys list
const iter = this.kv.list({ prefix: this._keys.baseKey }, options)
const keys: Deno.KvKey[] = []

// Collect all collection entry keys
for await (const { key } of iter) {
keys.push(key)
}

// Delete all keys and return
return await atomicDelete(this.kv, keys, options?.batchSize)
}

// Execute delete operation for each document entry
const { cursor } = await this.handleMany(
this._keys.idKey,
Expand Down
4 changes: 3 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ export const SEGMENT_KEY_PREFIX = "__segment__"
export const UNDELIVERED_KEY_PREFIX = "__undelivered__"

// Fixed limits
export const ATOMIC_OPERATION_MUTATION_LIMIT = 20
export const ATOMIC_OPERATION_MUTATION_LIMIT = 1_000

export const ATOMIC_OPERATION_SAFE_MUTATION_LIMIT = 20

export const GET_MANY_KEY_LIMIT = 10

Expand Down
42 changes: 12 additions & 30 deletions src/kvdex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Collection } from "./collection.ts"
import { Document } from "./document.ts"
import {
allFulfilled,
atomicDelete,
createHandlerId,
extendKey,
parseQueueMessage,
Expand Down Expand Up @@ -189,7 +190,17 @@ export class KvDex<const T extends Schema<SchemaDefinition>> {
* @returns Promise resolving to void.
*/
async deleteAll(options?: DeleteAllOptions) {
return await _deleteAll(this.kv, this.schema, options)
// Create iterator
const iter = this.kv.list({ prefix: [KVDEX_KEY_PREFIX] })

// Collect all kvdex keys
const keys: Deno.KvKey[] = []
for await (const { key } of iter) {
keys.push(key)
}

// Delete all entries
await atomicDelete(this.kv, keys, options?.atomicBatchSize)
}

/**
Expand Down Expand Up @@ -514,32 +525,3 @@ async function _countAll(
// Return the sum of collection counts
return counts.reduce((sum, c) => sum + c, 0)
}

/**
* Delete all documents in the KV store.
*
* @param kv - Deno KV instance.
* @param schemaOrCollection - Schema or Collection object.
* @param options - DeleteAll options.
* @returns Promise resolving to void.
*/
async function _deleteAll(
kv: Deno.Kv,
schemaOrCollection:
| Schema<SchemaDefinition>
| Collection<KvValue, CollectionOptions<KvValue>>,
options?: DeleteAllOptions,
) {
// If input is a collection, delete all documents in the collection
if (schemaOrCollection instanceof Collection) {
await schemaOrCollection.deleteMany(options)
return
}

// Recursively delete all documents from schema collections
await allFulfilled(
Object.values(schemaOrCollection).map((val) =>
_deleteAll(kv, val, options)
),
)
}
12 changes: 9 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,21 +208,27 @@ export type LargeDocumentEntry = {

// Method Option types
export type SetOptions = NonNullable<Parameters<Deno.Kv["set"]>["2"]> & {
/** Number of retry attempts before returning failed operation */
retry?: number
}

export type ListOptions<T extends KvValue> = Deno.KvListOptions & {
/**
* Filter documents based on predicate.
*
* @param doc - Document
* @returns true or false
* @param doc - Document.
* @returns true or false.
*/
filter?: (doc: Document<T>) => boolean

/** Id of document to start from. */
startId?: KvId

/** Id of document to end at. */
endId?: KvId

/** Batch size of atomic operations where applicable */
atomicBatchSize?: number
}

export type CountOptions<T extends KvValue> =
Expand All @@ -237,7 +243,7 @@ export type UpdateManyOptions<T extends KvValue> = ListOptions<T> & SetOptions

export type CountAllOptions = Pick<Deno.KvListOptions, "consistency">

export type DeleteAllOptions = Pick<Deno.KvListOptions, "consistency">
export type DeleteAllOptions = Pick<ListOptions<KvValue>, "atomicBatchSize">

export type EnqueueOptions =
& Omit<
Expand Down
48 changes: 46 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
ATOMIC_OPERATION_MUTATION_LIMIT,
ATOMIC_OPERATION_SAFE_MUTATION_LIMIT,
GET_MANY_KEY_LIMIT,
UNDELIVERED_KEY_PREFIX,
} from "./constants.ts"
Expand Down Expand Up @@ -321,8 +322,14 @@ export async function useAtomics<const T>(
const slicedElements: T[][] = []

// Slice elements based on atomic mutations limit
for (let i = 0; i < elements.length; i += ATOMIC_OPERATION_MUTATION_LIMIT) {
slicedElements.push(elements.slice(i, i + ATOMIC_OPERATION_MUTATION_LIMIT))
for (
let i = 0;
i < elements.length;
i += ATOMIC_OPERATION_SAFE_MUTATION_LIMIT
) {
slicedElements.push(
elements.slice(i, i + ATOMIC_OPERATION_SAFE_MUTATION_LIMIT),
)
}

// Invoke callback function for each element and execute atomic operation
Expand Down Expand Up @@ -479,3 +486,40 @@ export function createListSelector<const T extends KvValue>(
end,
}
}

/**
* Perform multiple delete operations with optimal efficiency using atomic operations.
*
* @param kv - Deno KV instance.
* @param keys - Keys of documents to be deleted.
* @param batchSize - Batch size of deletes in a single atomic operation.
*/
export async function atomicDelete(
kv: Deno.Kv,
keys: Deno.KvKey[],
batchSize = ATOMIC_OPERATION_MUTATION_LIMIT / 2,
) {
// Initiate atomic operation and check
let atomic = kv.atomic()
let check = 0

// Loop over and add delete operation for each key
for (const key of keys) {
atomic.delete(key)

// If check is at limit, commit atomic operation
if (check >= batchSize - 1) {
await atomic.commit()

// Reset atomic operation and check
atomic = kv.atomic()
check = 0
}

// Increment check
check++
}

// Commit final atomic operation
await atomic.commit()
}
13 changes: 7 additions & 6 deletions tests/db/deleteAll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ Deno.test("db - deleteAll", async (t) => {
"Should delete all documents from the database",
async () => {
await useDb(async (db) => {
const users = generateUsers(10)
const users = generateUsers(100)
const u64s = [
new Deno.KvU64(0n),
new Deno.KvU64(0n),
]

const crs1 = await db.i_users.addMany(users)
const crs2 = await db.l_users.addMany(users)
const crs3 = await db.u64s.addMany([
new Deno.KvU64(0n),
new Deno.KvU64(0n),
])
const crs3 = await db.u64s.addMany(u64s)

assert(crs1.every((cr) => cr.ok))
assert(crs2.every((cr) => cr.ok))
assert(crs3.every((cr) => cr.ok))

const count1 = await db.countAll()
assert(count1 === 22)
assert(count1 === users.length * 2 + u64s.length)

await db.deleteAll()

Expand Down

0 comments on commit 242a65c

Please sign in to comment.