Skip to content

Commit

Permalink
Merge pull request permaweb#750 from permaweb/jfrain99/add-gas-used-c…
Browse files Browse the repository at this point in the history
…heckpoint-cu#744

feat(cu): add gas used as checkpoint condition permaweb#744
  • Loading branch information
jfrain99 authored May 30, 2024
2 parents 1484205 + ac28328 commit 0a0a573
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 39 deletions.
6 changes: 3 additions & 3 deletions servers/cu/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ There are a few environment variables that you can set. Besides
- `PROCESS_MEMORY_CACHE_FILE_DIR`: The directory to store cached process memory (Defaults to the os temp directory)
- `PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL`: The interval at which the CU
should Checkpoint all processes stored in it's cache. Set to `0` to disabled
(defaults to `4h`)
(defaults to `0`)
- `PROCESS_CHECKPOINT_CREATION_THROTTLE`: The amount of time, in milliseconds,
that the CU should wait before creating a process `Checkpoint` IFF it has
already created a Checkpoint for that process, since it last started. This is
Expand All @@ -110,8 +110,8 @@ There are a few environment variables that you can set. Besides
creation uploads to Arweave. Set to any value to disable `Checkpoint`
creation. (You must explicitly enable `Checkpoint` creation by setting -
`DISABLE_PROCESS_CHECKPOINT_CREATION` to `'false'`)
- `EAGER_CHECKPOINT_THRESHOLD`: If an evaluation stream evaluates this amount of
messages, then it will immediately create a Checkpoint at the end of the
- `EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD`: If a process uses this amount of
gas, then it will immediately create a Checkpoint at the end of the
evaluation stream.
- `MEM_MONITOR_INTERVAL`: The interval, in milliseconds, at which to log memory
usage on this CU.
Expand Down
18 changes: 16 additions & 2 deletions servers/cu/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ const CONFIG_ENVS = {
PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'),
DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false',
EAGER_CHECKPOINT_THRESHOLD: process.env.EAGER_CHECKPOINT_THRESHOLD || 100,
/**
* EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000)
* This was calculated by creating a process built to do continuous compute. After 2 hours, this process used
* 300 trillion gas. This is the baseline for checkpointing as no process should need to spend more than two hours
* catching up to a previous checkpoint.
*/
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: process.env.EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || 300_000_000_000_000,
PROCESS_WASM_MEMORY_MAX_LIMIT: process.env.PROCESS_WASM_MEMORY_MAX_LIMIT || 1_000_000_000, // 1GB
PROCESS_WASM_COMPUTE_MAX_LIMIT: process.env.PROCESS_WASM_COMPUTE_MAX_LIMIT || 9_000_000_000_000, // 9t
PROCESS_WASM_SUPPORTED_FORMATS: process.env.PROCESS_WASM_SUPPORTED_FORMATS || DEFAULT_PROCESS_WASM_MODULE_FORMATS,
Expand All @@ -121,7 +128,7 @@ const CONFIG_ENVS = {
PROCESS_MEMORY_CACHE_TTL: process.env.PROCESS_MEMORY_CACHE_TTL || ms('24h'),
PROCESS_MEMORY_CACHE_DRAIN_TO_FILE_THRESHOLD: process.env.PROCESS_MEMORY_CACHE_DRAIN_TO_FILE_THRESHOLD || ms('5m'),
PROCESS_MEMORY_CACHE_FILE_DIR: process.env.PROCESS_MEMORY_CACHE_FILE_DIR || tmpdir(),
PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: process.env.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL || ms('8h'),
PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: process.env.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL || 0,
BUSY_THRESHOLD: process.env.BUSY_THRESHOLD || 0, // disabled
RESTRICT_PROCESSES: process.env.RESTRICT_PROCESSES || [],
ALLOW_PROCESSES: process.env.ALLOW_PROCESSES || [],
Expand All @@ -144,6 +151,13 @@ const CONFIG_ENVS = {
PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'),
DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false', // TODO: disabled by default for now. Enable by default later
EAGER_CHECKPOINT_THRESHOLD: process.env.EAGER_CHECKPOINT_THRESHOLD || 100,
/**
* EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000)
* This was calculated by creating a process built to do continuous compute by adding and clearing a table.
* After 2 hours of nonstop compute, this process used 300 trillion gas.
* This is the baseline for checkpointing as no process should need to spend more than two hours catching up to a previous checkpoint.
*/
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: process.env.EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || 300_000_000_000_000,
PROCESS_WASM_MEMORY_MAX_LIMIT: process.env.PROCESS_WASM_MEMORY_MAX_LIMIT || 1_000_000_000, // 1GB
PROCESS_WASM_COMPUTE_MAX_LIMIT: process.env.PROCESS_WASM_COMPUTE_MAX_LIMIT || 9_000_000_000_000, // 9t
PROCESS_WASM_SUPPORTED_FORMATS: process.env.PROCESS_WASM_SUPPORTED_FORMATS || DEFAULT_PROCESS_WASM_MODULE_FORMATS,
Expand All @@ -159,7 +173,7 @@ const CONFIG_ENVS = {
PROCESS_MEMORY_CACHE_TTL: process.env.PROCESS_MEMORY_CACHE_TTL || ms('24h'),
PROCESS_MEMORY_CACHE_DRAIN_TO_FILE_THRESHOLD: process.env.PROCESS_MEMORY_CACHE_DRAIN_TO_FILE_THRESHOLD || ms('5m'),
PROCESS_MEMORY_CACHE_FILE_DIR: process.env.PROCESS_MEMORY_CACHE_FILE_DIR || tmpdir(),
PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: process.env.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL || ms('8h'),
PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: process.env.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL || 0,
BUSY_THRESHOLD: process.env.BUSY_THRESHOLD || 0, // disabled
RESTRICT_PROCESSES: process.env.RESTRICT_PROCESSES || [],
ALLOW_PROCESSES: process.env.ALLOW_PROCESSES || [],
Expand Down
28 changes: 19 additions & 9 deletions servers/cu/src/domain/client/ao-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Readable } from 'node:stream'
import { basename, join } from 'node:path'

import { fromPromise, of, Rejected, Resolved } from 'hyper-async'
import { always, applySpec, compose, defaultTo, evolve, filter, head, identity, map, omit, path, prop, transduce } from 'ramda'
import { add, always, applySpec, compose, defaultTo, evolve, filter, head, identity, map, omit, path, pathOr, pipe, prop, transduce } from 'ramda'
import { z } from 'zod'
import { LRUCache } from 'lru-cache'
import AsyncLock from 'async-lock'
Expand Down Expand Up @@ -1243,8 +1243,8 @@ export function findProcessMemoryBeforeWith ({
.toPromise()
}

export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EAGER_CHECKPOINT_THRESHOLD }) {
return async ({ processId, moduleId, messageId, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, evalCount }) => {
export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD }) {
return async ({ processId, moduleId, messageId, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, evalCount, gasUsed }) => {
const cached = cache.get(processId)

/**
Expand Down Expand Up @@ -1279,6 +1279,11 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA
// const zipped = await gzipP(Memory, { level: zlibConstants.Z_BEST_SPEED })
// stopTimer()

const incrementedGasUsed = pipe(
pathOr(BigInt(0), ['evaluation', 'gasUsed']),
add(gasUsed || BigInt(0))
)(cached)

const evaluation = {
processId,
moduleId,
Expand All @@ -1295,26 +1300,31 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA
* NOTE: this consumes more memory in the LRU In-Memory Cache
*/
encoding: undefined,
cron
cron,
gasUsed: incrementedGasUsed < EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD ? incrementedGasUsed : 0
}
// cache.set(processId, { Memory: zipped, evaluation })
cache.set(processId, { Memory, evaluation })

if (!evalCount || !EAGER_CHECKPOINT_THRESHOLD || evalCount < EAGER_CHECKPOINT_THRESHOLD) return
/**
* @deprecated
* We are no longer creating checkpoints based on eval counts. Rather, we use a gas-based checkpoint system
**/
// if (!evalCount || !EAGER_CHECKPOINT_THRESHOLD || evalCount < EAGER_CHECKPOINT_THRESHOLD) return

if (!incrementedGasUsed || !EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || incrementedGasUsed < EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD) return
/**
* Eagerly create the Checkpoint on the next event queue drain
*/
setImmediate(() => {
logger(
'Eager Checkpoint Threshold of "%d" messages met when evaluating process "%s" up to "%j" -- "%d" evaluations peformed. Eagerly creating a Checkpoint...',
EAGER_CHECKPOINT_THRESHOLD,
'Eager Checkpoint Accumulated Gas Threshold of "%d" gas used met when evaluating process "%s" up to "%j" -- "%d" gas used. Eagerly creating a Checkpoint...',
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD,
processId,
{ messageId, timestamp, ordinate, cron, blockHeight },
evalCount
incrementedGasUsed
)

// return saveCheckpoint({ Memory: zipped, ...evaluation })
/**
* Memory will always be defined at this point, so no reason
* to pass File
Expand Down
75 changes: 53 additions & 22 deletions servers/cu/src/domain/client/ao-process.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,8 @@ describe('ao-process', () => {
timestamp: tenSecondsAgo,
blockHeight: 123,
ordinate: '11',
encoding: 'gzip'
encoding: 'gzip',
gasUsed: BigInt(100)
}
const cachedEvalFuture = {
processId: PROCESS,
Expand All @@ -1102,30 +1103,33 @@ describe('ao-process', () => {
timestamp: now + 1000,
blockHeight: 123,
ordinate: '11',
encoding: 'gzip'
encoding: 'gzip',
gasUsed: BigInt(100)
}
const targetWithNoEvalCount = {
const targetWithNoGasUsed = {
processId: PROCESS,
Memory: Buffer.from('Hello World'),
timestamp: now - 1000,
ordinate: '13',
cron: undefined
}
const targetWithLessEvalCount = {
const targetWithLessGasUsed = {
processId: PROCESS,
Memory: Buffer.from('Hello World'),
timestamp: now - 1000,
ordinate: '13',
cron: undefined,
evalCount: 5
evalCount: 5,
gasUsed: BigInt(50)
}
const targetWithEvalCount = {
const targetWithGasUsed = {
processId: PROCESS,
Memory: Buffer.from('Hello World'),
timestamp: now - 1000,
ordinate: '13',
cron: undefined,
evalCount: 15
evalCount: 15,
gasUsed: BigInt(50)
}

describe('updating the cache', () => {
Expand All @@ -1139,12 +1143,12 @@ describe('ao-process', () => {
},
logger,
saveCheckpoint: () => assert.fail('should not call if found in cache'),
EAGER_CHECKPOINT_THRESHOLD: 100
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: BigInt(200)
}

test('should not update if the cache entry is ahead of provided save', async () => {
const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith(deps))
const res = await saveLatestProcessMemory(targetWithEvalCount)
const res = await saveLatestProcessMemory(targetWithGasUsed)
assert.equal(res, undefined)
})

Expand All @@ -1160,7 +1164,7 @@ describe('ao-process', () => {
set: () => { cacheUpdated = true }
}
}))
await saveLatestProcessMemory(targetWithEvalCount)
await saveLatestProcessMemory(targetWithGasUsed)
assert.ok(cacheUpdated)
})

Expand All @@ -1173,9 +1177,27 @@ describe('ao-process', () => {
set: () => { cacheUpdated = true }
}
}))
await saveLatestProcessMemory(targetWithEvalCount)
await saveLatestProcessMemory(targetWithGasUsed)
assert.ok(cacheUpdated)
})

test('should increment gasUsed when updating cache', async () => {
const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({
...deps,
cache: {
get: () => ({
Memory,
evaluation: cachedEval
}),
set: (processId, { evaluation }) => {
assert.equal(processId, 'process-123')
assert.equal(evaluation.gasUsed, BigInt(150))
}
},
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: BigInt(200)
}))
await saveLatestProcessMemory(targetWithGasUsed)
})
})

describe('creating a checkpoint', () => {
Expand All @@ -1189,37 +1211,47 @@ describe('ao-process', () => {
},
logger,
saveCheckpoint: () => assert.fail('should not call if found in cache'),
EAGER_CHECKPOINT_THRESHOLD: 10
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: BigInt(200)
}

test('should not create checkpoint if eval count less than checkpoint threshold', async () => {
const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith(deps))
const res = await saveLatestProcessMemory(targetWithLessEvalCount)
test('should not create checkpoint if gasUsed less than checkpoint threshold', async () => {
const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({ ...deps }))
const res = await saveLatestProcessMemory(targetWithLessGasUsed)
assert.equal(res, undefined)
})

test('should not create checkpoint if no eval count', async () => {
test('should not create checkpoint if no gasUsed', async () => {
const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith(deps))
const res = await saveLatestProcessMemory(targetWithNoEvalCount)
const res = await saveLatestProcessMemory(targetWithNoGasUsed)
assert.equal(res, undefined)
})

test('should not create checkpoint if no checkpoint threshold', async () => {
const saveLatestProcessMemoryWithNoThreshold = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({ ...deps, EAGER_CHECKPOINT_THRESHOLD: 0 }))
const res = await saveLatestProcessMemoryWithNoThreshold(targetWithEvalCount)
const saveLatestProcessMemoryWithNoThreshold = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({ ...deps, EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: 0 }))
const res = await saveLatestProcessMemoryWithNoThreshold(targetWithGasUsed)
assert.equal(res, undefined)
})

test('should create checkpoint if eval count greater than threshold', async () => {
test('should create checkpoint if gas used greater than threshold, reset gas to 0', async () => {
let checkpointSaved = false
const saveLatestProcessMemoryWithNoThreshold = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({
...deps,
cache: {
get: () => ({
Memory,
evaluation: cachedEval
}),
set: (_, { evaluation }) => {
assert.equal(evaluation.gasUsed, 0)
}
},
saveCheckpoint: async () => {
checkpointSaved = true
}
}))
await saveLatestProcessMemoryWithNoThreshold(targetWithEvalCount)
await saveLatestProcessMemoryWithNoThreshold({ ...targetWithGasUsed, gasUsed: BigInt(150) })
await new Promise(resolve => setTimeout(resolve, 100))

assert.ok(checkpointSaved)
})
})
Expand Down Expand Up @@ -1740,6 +1772,5 @@ describe('ao-process', () => {
})
})

describe.todo('saveLatestProcessMemoryWith')
describe.todo('saveCheckpointWith')
})
3 changes: 2 additions & 1 deletion servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ export const saveLatestProcessMemorySchema = z.function()
cron: z.string().nullish(),
blockHeight: z.coerce.number().nullish(),
Memory: bufferSchema,
evalCount: z.number().nullish()
evalCount: z.number().nullish(),
gasUsed: z.bigint().nullish()
}))
.returns(z.promise(z.any()))

Expand Down
2 changes: 1 addition & 1 deletion servers/cu/src/domain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ export const createApis = async (ctx) => {
}),
saveLatestProcessMemory: AoProcessClient.saveLatestProcessMemoryWith({
cache: wasmMemoryCache,
EAGER_CHECKPOINT_THRESHOLD: ctx.EAGER_CHECKPOINT_THRESHOLD,
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: ctx.EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD,
saveCheckpoint,
logger
}),
Expand Down
7 changes: 6 additions & 1 deletion servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ export function evaluateWith (env) {
of(ctx)
.chain(loadEvaluator)
.chain(fromPromise(async (ctx) => {
// A running tally of gas used in the eval stream
let totalGasUsed = BigInt(0)
let prev = applySpec({
/**
* Ensure all result fields are initialized
Expand Down Expand Up @@ -214,6 +216,8 @@ export function evaluateWith (env) {
*/
.then(mergeLeft({ noSave, message, cron, ordinate }))
.then(async (output) => {
if (output.GasUsed) totalGasUsed += BigInt(output.GasUsed ?? 0)

if (cron) ctx.stats.messages.cron++
else ctx.stats.messages.scheduled++

Expand Down Expand Up @@ -269,7 +273,8 @@ export function evaluateWith (env) {
ordinate,
cron,
Memory: prev.Memory,
evalCount: ctx.stats.messages.scheduled + ctx.stats.messages.cron
evalCount: ctx.stats.messages.scheduled + ctx.stats.messages.cron,
gasUsed: totalGasUsed
})
}

Expand Down
7 changes: 7 additions & 0 deletions servers/cu/src/domain/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,18 @@ export const domainConfigSchema = z.object({
*/
DISABLE_PROCESS_CHECKPOINT_CREATION: z.preprocess((val) => !!val, z.boolean()),
/**
* @deprecated
* If an evaluation stream evaluates this amount of messages,
* then it will immediately create a Checkpoint at the end of the
* evaluation stream
*/
EAGER_CHECKPOINT_THRESHOLD: positiveIntSchema,
/**
* If a process uses this amount of
* gas, then it will immediately create a Checkpoint at the end of the
* evaluation stream.
*/
EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: positiveIntSchema,
/**
* The number of workers to use for evaluating messages
*/
Expand Down

0 comments on commit 0a0a573

Please sign in to comment.