Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improved performance of deleteMany and deleteAll #95

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import type {
} from "./types.ts"
import {
allFulfilled,
atomicDelete,
createHandlerId,
createListSelector,
extendKey,
Expand Down Expand Up @@ -440,6 +441,28 @@ export class Collection<
* @returns A promise that resovles to an object containing the iterator cursor
*/
async deleteMany(options?: ListOptions<T1>) {
// Perform quick delete if all documents are to be deleted
if (
!options?.consistency &&
!options?.cursor &&
!options?.endId &&
!options?.startId &&
!options?.filter &&
!options?.limit
) {
// Create list iterator and empty keys list
const iter = this.kv.list({ prefix: this._keys.baseKey }, options)
const keys: Deno.KvKey[] = []

// Collect all collection entry keys
for await (const { key } of iter) {
keys.push(key)
}

// Delete all keys and return
return await atomicDelete(this.kv, keys, options?.batchSize)
}

// Execute delete operation for each document entry
const { cursor } = await this.handleMany(
this._keys.idKey,
Expand Down
4 changes: 3 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ export const SEGMENT_KEY_PREFIX = "__segment__"
export const UNDELIVERED_KEY_PREFIX = "__undelivered__"

// Fixed limits
export const ATOMIC_OPERATION_MUTATION_LIMIT = 20
export const ATOMIC_OPERATION_MUTATION_LIMIT = 1_000

export const ATOMIC_OPERATION_SAFE_MUTATION_LIMIT = 20

export const GET_MANY_KEY_LIMIT = 10

Expand Down
42 changes: 12 additions & 30 deletions src/kvdex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Collection } from "./collection.ts"
import { Document } from "./document.ts"
import {
allFulfilled,
atomicDelete,
createHandlerId,
extendKey,
parseQueueMessage,
Expand Down Expand Up @@ -189,7 +190,17 @@ export class KvDex<const T extends Schema<SchemaDefinition>> {
* @returns Promise resolving to void.
*/
async deleteAll(options?: DeleteAllOptions) {
return await _deleteAll(this.kv, this.schema, options)
// Create iterator
const iter = this.kv.list({ prefix: [KVDEX_KEY_PREFIX] })

// Collect all kvdex keys
const keys: Deno.KvKey[] = []
for await (const { key } of iter) {
keys.push(key)
}

// Delete all entries
await atomicDelete(this.kv, keys, options?.atomicBatchSize)
}

/**
Expand Down Expand Up @@ -514,32 +525,3 @@ async function _countAll(
// Return the sum of collection counts
return counts.reduce((sum, c) => sum + c, 0)
}

/**
* Delete all documents in the KV store.
*
* @param kv - Deno KV instance.
* @param schemaOrCollection - Schema or Collection object.
* @param options - DeleteAll options.
* @returns Promise resolving to void.
*/
async function _deleteAll(
kv: Deno.Kv,
schemaOrCollection:
| Schema<SchemaDefinition>
| Collection<KvValue, CollectionOptions<KvValue>>,
options?: DeleteAllOptions,
) {
// If input is a collection, delete all documents in the collection
if (schemaOrCollection instanceof Collection) {
await schemaOrCollection.deleteMany(options)
return
}

// Recursively delete all documents from schema collections
await allFulfilled(
Object.values(schemaOrCollection).map((val) =>
_deleteAll(kv, val, options)
),
)
}
12 changes: 9 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,21 +208,27 @@ export type LargeDocumentEntry = {

// Method Option types
export type SetOptions = NonNullable<Parameters<Deno.Kv["set"]>["2"]> & {
/** Number of retry attempts before returning failed operation */
retry?: number
}

export type ListOptions<T extends KvValue> = Deno.KvListOptions & {
/**
* Filter documents based on predicate.
*
* @param doc - Document
* @returns true or false
* @param doc - Document.
* @returns true or false.
*/
filter?: (doc: Document<T>) => boolean

/** Id of document to start from. */
startId?: KvId

/** Id of document to end at. */
endId?: KvId

/** Batch size of atomic operations where applicable */
atomicBatchSize?: number
}

export type CountOptions<T extends KvValue> =
Expand All @@ -237,7 +243,7 @@ export type UpdateManyOptions<T extends KvValue> = ListOptions<T> & SetOptions

export type CountAllOptions = Pick<Deno.KvListOptions, "consistency">

export type DeleteAllOptions = Pick<Deno.KvListOptions, "consistency">
export type DeleteAllOptions = Pick<ListOptions<KvValue>, "atomicBatchSize">

export type EnqueueOptions =
& Omit<
Expand Down
48 changes: 46 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
ATOMIC_OPERATION_MUTATION_LIMIT,
ATOMIC_OPERATION_SAFE_MUTATION_LIMIT,
GET_MANY_KEY_LIMIT,
UNDELIVERED_KEY_PREFIX,
} from "./constants.ts"
Expand Down Expand Up @@ -321,8 +322,14 @@ export async function useAtomics<const T>(
const slicedElements: T[][] = []

// Slice elements based on atomic mutations limit
for (let i = 0; i < elements.length; i += ATOMIC_OPERATION_MUTATION_LIMIT) {
slicedElements.push(elements.slice(i, i + ATOMIC_OPERATION_MUTATION_LIMIT))
for (
let i = 0;
i < elements.length;
i += ATOMIC_OPERATION_SAFE_MUTATION_LIMIT
) {
slicedElements.push(
elements.slice(i, i + ATOMIC_OPERATION_SAFE_MUTATION_LIMIT),
)
}

// Invoke callback function for each element and execute atomic operation
Expand Down Expand Up @@ -479,3 +486,40 @@ export function createListSelector<const T extends KvValue>(
end,
}
}

/**
* Perform multiple delete operations with optimal efficiency using atomic operations.
*
* @param kv - Deno KV instance.
* @param keys - Keys of documents to be deleted.
* @param batchSize - Batch size of deletes in a single atomic operation.
*/
export async function atomicDelete(
kv: Deno.Kv,
keys: Deno.KvKey[],
batchSize = ATOMIC_OPERATION_MUTATION_LIMIT / 2,
) {
// Initiate atomic operation and check
let atomic = kv.atomic()
let check = 0

// Loop over and add delete operation for each key
for (const key of keys) {
atomic.delete(key)

// If check is at limit, commit atomic operation
if (check >= batchSize - 1) {
await atomic.commit()

// Reset atomic operation and check
atomic = kv.atomic()
check = 0
}

// Increment check
check++
}

// Commit final atomic operation
await atomic.commit()
}
13 changes: 7 additions & 6 deletions tests/db/deleteAll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ Deno.test("db - deleteAll", async (t) => {
"Should delete all documents from the database",
async () => {
await useDb(async (db) => {
const users = generateUsers(10)
const users = generateUsers(100)
const u64s = [
new Deno.KvU64(0n),
new Deno.KvU64(0n),
]

const crs1 = await db.i_users.addMany(users)
const crs2 = await db.l_users.addMany(users)
const crs3 = await db.u64s.addMany([
new Deno.KvU64(0n),
new Deno.KvU64(0n),
])
const crs3 = await db.u64s.addMany(u64s)

assert(crs1.every((cr) => cr.ok))
assert(crs2.every((cr) => cr.ok))
assert(crs3.every((cr) => cr.ok))

const count1 = await db.countAll()
assert(count1 === 22)
assert(count1 === users.length * 2 + u64s.length)

await db.deleteAll()

Expand Down