Skip to content

Commit

Permalink
Merge pull request #85 from oliver-oloughlin/patch/delete-undelivered
Browse files Browse the repository at this point in the history
added missing deleteUndelivered method + fixed cron job timing
  • Loading branch information
oliver-oloughlin authored Oct 5, 2023
2 parents b514b23 + 3d9e2ee commit df25ee6
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 52 deletions.
29 changes: 22 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ like atomic operations and queue listeners.
- [enqueue()](#enqueue-1)
- [listenQueue()](#listenqueue-1)
- [findUndelivered()](#findundelivered-1)
- [deleteUndelivered()](#deleteundelivered)
- [cron()](#cron)
- [atomic()](#atomic)
- [Atomic Operations](#atomic-operations)
Expand Down Expand Up @@ -653,23 +654,37 @@ const doc2 = await db.findUndelivered("undelivered_id", {
})
```

### deleteUndelivered()

Delete an undelivered document entry by id.

```ts
await db.deleteUndelivered("id")
```

### cron()

Create a cron job that will run on interval, either indefinitely or until an
exit condition is met. If no interval is set, the next job will run immediately
after the previous has finished. Like with queue listeners, there can be
multiple cron jobs defined.
exit condition is met. Interval defaults to 1 second if not set. Like with queue
listeners, there can be multiple cron jobs defined.

```ts
// Will repeat indeefinitely without delay
// Will repeat indefinitely with 1 second interval
db.cron(() => console.log("Hello World!"))

// First job starts with a 10 second delay, after that there is a 5 second delay between jobs
// Will terminate after the 10th run (count starts at 0), or if the job returns n < 0.25
db.cron(() => Math.random(), {
db.cron(() => console.log("I terminate after running 10 times"), {
// Delay before the first job is invoked
startDelay: 10_000,

// Fixed interval
interval: 5_000,
exit: ({ count, result }) => count >= 9 || result < 0.25,

// If this is set it will override the fixed interval
setInterval: ({ count }) => count * 500

// Count starts at 0 and is given before the current job is run
exit: ({ count }) => count === 10,
})
```

Expand Down
15 changes: 15 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,21 @@ export class Collection<
})
}

/**
* Delete an undelivered document entry by id from the collection queue.
*
* @example
* ```ts
* db.users.deleteUndelivered("id")
* ```
*
* @param id - Id of undelivered document.
*/
async deleteUndelivered(id: KvId) {
const key = extendKey(this._keys.baseKey, UNDELIVERED_KEY_PREFIX, id)
await this.kv.delete(key)
}

/** PROTECTED METHODS */

/**
Expand Down
3 changes: 3 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ export const ATOMIC_OPERATION_MUTATION_LIMIT = 20
export const GET_MANY_KEY_LIMIT = 10

export const LARGE_COLLECTION_STRING_LIMIT = 25_000

// Time constants
export const DEFAULT_CRON_INTERVAL = 1_000
91 changes: 66 additions & 25 deletions src/kvdex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ import {
prepareEnqueue,
} from "./utils.ts"
import { AtomicBuilder } from "./atomic_builder.ts"
import { KVDEX_KEY_PREFIX, UNDELIVERED_KEY_PREFIX } from "./constants.ts"
import {
DEFAULT_CRON_INTERVAL,
KVDEX_KEY_PREFIX,
UNDELIVERED_KEY_PREFIX,
} from "./constants.ts"

/**
* Create a new database instance.
Expand Down Expand Up @@ -284,45 +288,67 @@ export class KvDex<const T extends Schema<SchemaDefinition>> {
})
}

/**
* Delete an undelivered document entry by id from the database queue.
*
* @example
* ```ts
* db.deleteUndelivered("id")
* ```
*
* @param id - Id of undelivered document.
*/
async deleteUndelivered(id: KvId) {
const key = extendKey([KVDEX_KEY_PREFIX], UNDELIVERED_KEY_PREFIX, id)
await this.kv.delete(key)
}

/**
* Create a cron job that will repeat at a given interval.
*
* If no interval is set the next job will start immediately after the previous has finished.
* Interval defaults to 1 second if not set.
*
* Will repeat indefinitely if no exit predicate is set.
* Will repeat indefinitely if no exit condition is set.
*
* @example
* ```ts
* // Will repeat indeefinitely without delay
* db.cron(() => console.log("Hello World!"))
*
* // First job starts with a 10 second delay, after that there is a 5 second delay between jobs
* // Will terminate after the 10th run (count starts at 0), or if the job returns n < 0.25
* db.cron(() => Math.random(), {
* db.cron(() => console.log("I terminate after the 10th run"), {
* // Delay before the first job is invoked
* startDelay: 10_000,
*
* // Fixed interval
* interval: 5_000,
* exit: ({ count, result }) => count >= 9 || result < 0.25,
*
* // If this is set it will override the fixed interval
* setInterval: ({ count }) => count * 500
*
* // Count starts at 0 and is given before the current job is run
* exit: ({ count }) => count === 10,
* })
* ```
*
* @param job - Work that will be run for each job interval.
* @param options - Cron options.
*/
async cron<T1>(
job: () => T1,
options?: CronOptions<Awaited<T1>>,
async cron(
job: (msg: CronMessage) => unknown,
options?: CronOptions,
) {
// Create cron handler id
const id = crypto.randomUUID()

// Create cron job enqueuer
const enqueue = async (
cronMsg: CronMessage,
msg: CronMessage,
delay: number | undefined,
) => {
// Enqueue cron job until delivered
for (let i = 0; i < (options?.retries ?? 10); i++) {
await this.enqueue(cronMsg, {
for (let i = 0; i <= (options?.retries ?? 10); i++) {
await this.enqueue(msg, {
idsIfUndelivered: [id],
delay,
topic: id,
Expand All @@ -334,41 +360,56 @@ export class KvDex<const T extends Schema<SchemaDefinition>> {
if (doc === null) {
break
}

// Delete undelivered entry before retrying
await this.deleteUndelivered(id)
}
}

// Add cron job listener
this.listenQueue<CronMessage>(async (msg) => {
// Run cron job
const result = await job()

// Check if exit criteria is met, end repeating cron job if true
const exit = await options?.exitOn?.(
{ count: msg.count, result },
) ?? false
const exit = await options?.exitOn?.(msg) ?? false

if (exit) {
return
}

// Set the next interval
const interval = options?.setInterval
? await options?.setInterval!({ count: msg.count, result })
: options?.interval

// Enqueue next cron job
await enqueue({
count: msg.count + 1,
}, interval)
? await options?.setInterval!(msg)
: options?.interval ?? DEFAULT_CRON_INTERVAL

await allFulfilled([
// Enqueue next job
enqueue({
count: msg.count + 1,
previousInterval: interval,
isFirstJob: false,
enqueueTimestamp: new Date(),
}, interval),

// Run cron job
job(msg),
])
}, { topic: id })

// Enqueue first cron job
await enqueue({
count: 0,
previousInterval: options?.startDelay ?? 0,
isFirstJob: true,
enqueueTimestamp: new Date(),
}, options?.startDelay)
}
}

/*************************/
/* */
/* UTILITY FUNCTIONS */
/* */
/**************************/

/**
* Create a database schema from schema definition.
*
Expand Down
31 changes: 17 additions & 14 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,19 @@ export type CommitResult<T1 extends KvValue> = {
export type IdGenerator<T extends KvValue> = (data: T) => KvId

// Cron types
export type CronOptions<T> = {
export type CronOptions = {
/**
* Interval in milliseconds for cron job.
*
* If not set, job will repeat without delay.
* @default 1000 // Defaults to 1 second
*/
interval?: number

/** Conditionally set the next interval in milliseconds. */
setInterval?: (
{ count, result }: { count: number; result: T },
) => number | Promise<number>
setInterval?: (msg: CronMessage) => number | Promise<number>

/** Exit predicate used to end cron job. */
exitOn?: (
{ count, result }: { count: number; result: T },
) => boolean | Promise<boolean>
exitOn?: (msg: CronMessage) => boolean | Promise<boolean>

/**
* Delay before running the first job.
Expand All @@ -65,14 +61,21 @@ export type CronOptions<T> = {
}

export type CronMessage = {
/** Job number, starts at 0. */
count: number
}

export type ParsedCronMessage = {
ok: true
msg: CronMessage
} | {
ok: false
/** Previously set interval. */
previousInterval: number

/** Indicates whether the current job is the first to be run. */
isFirstJob: boolean

/**
* Timestamp of enqueue.
*
* Equal to the start time of the previous job.
*/
enqueueTimestamp: Date
}

// Atomic Builder Types
Expand Down
23 changes: 17 additions & 6 deletions tests/db/cron.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,28 @@ Deno.test("db - cron", async (t) => {

let count1 = 0
let count2 = 0
let count3 = 0

db.cron(() => count1++, { exitOn: ({ count }) => count === 2 })
db.cron(() => count1++, {
interval: 10,
exitOn: ({ count }) => count === 2,
})

db.cron(() => count2++, { exitOn: ({ count }) => count === 5 })
db.cron(() => count2++, {
interval: 10,
exitOn: ({ isFirstJob }) => isFirstJob,
})

await sleep(1_000)
db.cron(() => count3++, {
interval: 10,
exitOn: ({ previousInterval }) => previousInterval > 0,
})

console.log(count1, count2)
await sleep(1_000)

assert(count1 === 3)
assert(count2 === 6)
assert(count1 === 2)
assert(count2 === 0)
assert(count3 === 1)

kv.close()
})
Expand Down

0 comments on commit df25ee6

Please sign in to comment.