diff --git a/servers/cu/README.md b/servers/cu/README.md index 3a9f9a771..b57bd6ece 100644 --- a/servers/cu/README.md +++ b/servers/cu/README.md @@ -100,7 +100,7 @@ There are a few environment variables that you can set. Besides - `PROCESS_MEMORY_CACHE_FILE_DIR`: The directory to store cached process memory (Defaults to the os temp directory) - `PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL`: The interval at which the CU should Checkpoint all processes stored in it's cache. Set to `0` to disabled - (defaults to `4h`) + (defaults to `0`) - `PROCESS_CHECKPOINT_CREATION_THROTTLE`: The amount of time, in milliseconds, that the CU should wait before creating a process `Checkpoint` IFF it has already created a Checkpoint for that process, since it last started. This is @@ -110,8 +110,8 @@ There are a few environment variables that you can set. Besides creation uploads to Arweave. Set to any value to disable `Checkpoint` creation. (You must explicitly enable `Checkpoint` creation by setting - `DISABLE_PROCESS_CHECKPOINT_CREATION` to `'false'`) -- `EAGER_CHECKPOINT_THRESHOLD`: If an evaluation stream evaluates this amount of - messages, then it will immediately create a Checkpoint at the end of the +- `EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD`: If a process uses this amount of + gas, then it will immediately create a Checkpoint at the end of the evaluation stream. - `MEM_MONITOR_INTERVAL`: The interval, in milliseconds, at which to log memory usage on this CU. diff --git a/servers/cu/src/config.js b/servers/cu/src/config.js index 7cecf4785..e5a42d0a2 100644 --- a/servers/cu/src/config.js +++ b/servers/cu/src/config.js @@ -106,6 +106,13 @@ const CONFIG_ENVS = { PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'), DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false', EAGER_CHECKPOINT_THRESHOLD: process.env.EAGER_CHECKPOINT_THRESHOLD || 100, + /** + * EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000) + * This was calculated by creating a process built to do continuous compute. After 2 hours, this process used + * 300 trillion gas. This is the baseline for checkpointing as no process should need to spend more than two hours + * catching up to a previous checkpoint. + */ + EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: process.env.EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || 300_000_000_000_000, PROCESS_WASM_MEMORY_MAX_LIMIT: process.env.PROCESS_WASM_MEMORY_MAX_LIMIT || 1_000_000_000, // 1GB PROCESS_WASM_COMPUTE_MAX_LIMIT: process.env.PROCESS_WASM_COMPUTE_MAX_LIMIT || 9_000_000_000_000, // 9t PROCESS_WASM_SUPPORTED_FORMATS: process.env.PROCESS_WASM_SUPPORTED_FORMATS || DEFAULT_PROCESS_WASM_MODULE_FORMATS, @@ -121,7 +128,7 @@ const CONFIG_ENVS = { PROCESS_MEMORY_CACHE_TTL: process.env.PROCESS_MEMORY_CACHE_TTL || ms('24h'), PROCESS_MEMORY_CACHE_DRAIN_TO_FILE_THRESHOLD: process.env.PROCESS_MEMORY_CACHE_DRAIN_TO_FILE_THRESHOLD || ms('5m'), PROCESS_MEMORY_CACHE_FILE_DIR: process.env.PROCESS_MEMORY_CACHE_FILE_DIR || tmpdir(), - PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: process.env.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL || ms('8h'), + PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: process.env.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL || 0, BUSY_THRESHOLD: process.env.BUSY_THRESHOLD || 0, // disabled RESTRICT_PROCESSES: process.env.RESTRICT_PROCESSES || [], ALLOW_PROCESSES: process.env.ALLOW_PROCESSES || [], @@ -144,6 +151,13 @@ const CONFIG_ENVS = { PROCESS_CHECKPOINT_CREATION_THROTTLE: process.env.PROCESS_CHECKPOINT_CREATION_THROTTLE || ms('30m'), DISABLE_PROCESS_CHECKPOINT_CREATION: process.env.DISABLE_PROCESS_CHECKPOINT_CREATION !== 'false', // TODO: disabled by default for now. Enable by default later EAGER_CHECKPOINT_THRESHOLD: process.env.EAGER_CHECKPOINT_THRESHOLD || 100, + /** + * EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: Amount of gas for 2 hours of continuous compute (300_000_000_000_000) + * This was calculated by creating a process built to do continuous compute by adding and clearing a table. + * After 2 hours of nonstop compute, this process used 300 trillion gas. + * This is the baseline for checkpointing as no process should need to spend more than two hours catching up to a previous checkpoint. + */ + EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: process.env.EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || 300_000_000_000_000, PROCESS_WASM_MEMORY_MAX_LIMIT: process.env.PROCESS_WASM_MEMORY_MAX_LIMIT || 1_000_000_000, // 1GB PROCESS_WASM_COMPUTE_MAX_LIMIT: process.env.PROCESS_WASM_COMPUTE_MAX_LIMIT || 9_000_000_000_000, // 9t PROCESS_WASM_SUPPORTED_FORMATS: process.env.PROCESS_WASM_SUPPORTED_FORMATS || DEFAULT_PROCESS_WASM_MODULE_FORMATS, @@ -159,7 +173,7 @@ const CONFIG_ENVS = { PROCESS_MEMORY_CACHE_TTL: process.env.PROCESS_MEMORY_CACHE_TTL || ms('24h'), PROCESS_MEMORY_CACHE_DRAIN_TO_FILE_THRESHOLD: process.env.PROCESS_MEMORY_CACHE_DRAIN_TO_FILE_THRESHOLD || ms('5m'), PROCESS_MEMORY_CACHE_FILE_DIR: process.env.PROCESS_MEMORY_CACHE_FILE_DIR || tmpdir(), - PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: process.env.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL || ms('8h'), + PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL: process.env.PROCESS_MEMORY_CACHE_CHECKPOINT_INTERVAL || 0, BUSY_THRESHOLD: process.env.BUSY_THRESHOLD || 0, // disabled RESTRICT_PROCESSES: process.env.RESTRICT_PROCESSES || [], ALLOW_PROCESSES: process.env.ALLOW_PROCESSES || [], diff --git a/servers/cu/src/domain/client/ao-process.js b/servers/cu/src/domain/client/ao-process.js index d8e194043..0224480fa 100644 --- a/servers/cu/src/domain/client/ao-process.js +++ b/servers/cu/src/domain/client/ao-process.js @@ -4,7 +4,7 @@ import { Readable } from 'node:stream' import { basename, join } from 'node:path' import { fromPromise, of, Rejected, Resolved } from 'hyper-async' -import { always, applySpec, compose, defaultTo, evolve, filter, head, identity, map, omit, path, prop, transduce } from 'ramda' +import { add, always, applySpec, compose, defaultTo, evolve, filter, head, identity, map, omit, path, pathOr, pipe, prop, transduce } from 'ramda' import { z } from 'zod' import { LRUCache } from 'lru-cache' import AsyncLock from 'async-lock' @@ -1243,8 +1243,8 @@ export function findProcessMemoryBeforeWith ({ .toPromise() } -export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EAGER_CHECKPOINT_THRESHOLD }) { - return async ({ processId, moduleId, messageId, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, evalCount }) => { +export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD }) { + return async ({ processId, moduleId, messageId, timestamp, epoch, nonce, ordinate, cron, blockHeight, Memory, evalCount, gasUsed }) => { const cached = cache.get(processId) /** @@ -1279,6 +1279,11 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA // const zipped = await gzipP(Memory, { level: zlibConstants.Z_BEST_SPEED }) // stopTimer() + const incrementedGasUsed = pipe( + pathOr(BigInt(0), ['evaluation', 'gasUsed']), + add(gasUsed || BigInt(0)) + )(cached) + const evaluation = { processId, moduleId, @@ -1295,26 +1300,31 @@ export function saveLatestProcessMemoryWith ({ cache, logger, saveCheckpoint, EA * NOTE: this consumes more memory in the LRU In-Memory Cache */ encoding: undefined, - cron + cron, + gasUsed: incrementedGasUsed < EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD ? incrementedGasUsed : 0 } // cache.set(processId, { Memory: zipped, evaluation }) cache.set(processId, { Memory, evaluation }) - if (!evalCount || !EAGER_CHECKPOINT_THRESHOLD || evalCount < EAGER_CHECKPOINT_THRESHOLD) return + /** + * @deprecated + * We are no longer creating checkpoints based on eval counts. Rather, we use a gas-based checkpoint system + **/ + // if (!evalCount || !EAGER_CHECKPOINT_THRESHOLD || evalCount < EAGER_CHECKPOINT_THRESHOLD) return + if (!incrementedGasUsed || !EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD || incrementedGasUsed < EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD) return /** * Eagerly create the Checkpoint on the next event queue drain */ setImmediate(() => { logger( - 'Eager Checkpoint Threshold of "%d" messages met when evaluating process "%s" up to "%j" -- "%d" evaluations peformed. Eagerly creating a Checkpoint...', - EAGER_CHECKPOINT_THRESHOLD, + 'Eager Checkpoint Accumulated Gas Threshold of "%d" gas used met when evaluating process "%s" up to "%j" -- "%d" gas used. Eagerly creating a Checkpoint...', + EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD, processId, { messageId, timestamp, ordinate, cron, blockHeight }, - evalCount + incrementedGasUsed ) - // return saveCheckpoint({ Memory: zipped, ...evaluation }) /** * Memory will always be defined at this point, so no reason * to pass File diff --git a/servers/cu/src/domain/client/ao-process.test.js b/servers/cu/src/domain/client/ao-process.test.js index 719563590..b91b528a0 100644 --- a/servers/cu/src/domain/client/ao-process.test.js +++ b/servers/cu/src/domain/client/ao-process.test.js @@ -1092,7 +1092,8 @@ describe('ao-process', () => { timestamp: tenSecondsAgo, blockHeight: 123, ordinate: '11', - encoding: 'gzip' + encoding: 'gzip', + gasUsed: BigInt(100) } const cachedEvalFuture = { processId: PROCESS, @@ -1102,30 +1103,33 @@ describe('ao-process', () => { timestamp: now + 1000, blockHeight: 123, ordinate: '11', - encoding: 'gzip' + encoding: 'gzip', + gasUsed: BigInt(100) } - const targetWithNoEvalCount = { + const targetWithNoGasUsed = { processId: PROCESS, Memory: Buffer.from('Hello World'), timestamp: now - 1000, ordinate: '13', cron: undefined } - const targetWithLessEvalCount = { + const targetWithLessGasUsed = { processId: PROCESS, Memory: Buffer.from('Hello World'), timestamp: now - 1000, ordinate: '13', cron: undefined, - evalCount: 5 + evalCount: 5, + gasUsed: BigInt(50) } - const targetWithEvalCount = { + const targetWithGasUsed = { processId: PROCESS, Memory: Buffer.from('Hello World'), timestamp: now - 1000, ordinate: '13', cron: undefined, - evalCount: 15 + evalCount: 15, + gasUsed: BigInt(50) } describe('updating the cache', () => { @@ -1139,12 +1143,12 @@ describe('ao-process', () => { }, logger, saveCheckpoint: () => assert.fail('should not call if found in cache'), - EAGER_CHECKPOINT_THRESHOLD: 100 + EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: BigInt(200) } test('should not update if the cache entry is ahead of provided save', async () => { const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith(deps)) - const res = await saveLatestProcessMemory(targetWithEvalCount) + const res = await saveLatestProcessMemory(targetWithGasUsed) assert.equal(res, undefined) }) @@ -1160,7 +1164,7 @@ describe('ao-process', () => { set: () => { cacheUpdated = true } } })) - await saveLatestProcessMemory(targetWithEvalCount) + await saveLatestProcessMemory(targetWithGasUsed) assert.ok(cacheUpdated) }) @@ -1173,9 +1177,27 @@ describe('ao-process', () => { set: () => { cacheUpdated = true } } })) - await saveLatestProcessMemory(targetWithEvalCount) + await saveLatestProcessMemory(targetWithGasUsed) assert.ok(cacheUpdated) }) + + test('should increment gasUsed when updating cache', async () => { + const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({ + ...deps, + cache: { + get: () => ({ + Memory, + evaluation: cachedEval + }), + set: (processId, { evaluation }) => { + assert.equal(processId, 'process-123') + assert.equal(evaluation.gasUsed, BigInt(150)) + } + }, + EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: BigInt(200) + })) + await saveLatestProcessMemory(targetWithGasUsed) + }) }) describe('creating a checkpoint', () => { @@ -1189,37 +1211,47 @@ describe('ao-process', () => { }, logger, saveCheckpoint: () => assert.fail('should not call if found in cache'), - EAGER_CHECKPOINT_THRESHOLD: 10 + EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: BigInt(200) } - test('should not create checkpoint if eval count less than checkpoint threshold', async () => { - const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith(deps)) - const res = await saveLatestProcessMemory(targetWithLessEvalCount) + test('should not create checkpoint if gasUsed less than checkpoint threshold', async () => { + const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({ ...deps })) + const res = await saveLatestProcessMemory(targetWithLessGasUsed) assert.equal(res, undefined) }) - test('should not create checkpoint if no eval count', async () => { + test('should not create checkpoint if no gasUsed', async () => { const saveLatestProcessMemory = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith(deps)) - const res = await saveLatestProcessMemory(targetWithNoEvalCount) + const res = await saveLatestProcessMemory(targetWithNoGasUsed) assert.equal(res, undefined) }) test('should not create checkpoint if no checkpoint threshold', async () => { - const saveLatestProcessMemoryWithNoThreshold = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({ ...deps, EAGER_CHECKPOINT_THRESHOLD: 0 })) - const res = await saveLatestProcessMemoryWithNoThreshold(targetWithEvalCount) + const saveLatestProcessMemoryWithNoThreshold = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({ ...deps, EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: 0 })) + const res = await saveLatestProcessMemoryWithNoThreshold(targetWithGasUsed) assert.equal(res, undefined) }) - test('should create checkpoint if eval count greater than threshold', async () => { + test('should create checkpoint if gas used greater than threshold, reset gas to 0', async () => { let checkpointSaved = false const saveLatestProcessMemoryWithNoThreshold = saveLatestProcessMemorySchema.implement(saveLatestProcessMemoryWith({ ...deps, + cache: { + get: () => ({ + Memory, + evaluation: cachedEval + }), + set: (_, { evaluation }) => { + assert.equal(evaluation.gasUsed, 0) + } + }, saveCheckpoint: async () => { checkpointSaved = true } })) - await saveLatestProcessMemoryWithNoThreshold(targetWithEvalCount) + await saveLatestProcessMemoryWithNoThreshold({ ...targetWithGasUsed, gasUsed: BigInt(150) }) await new Promise(resolve => setTimeout(resolve, 100)) + assert.ok(checkpointSaved) }) }) @@ -1740,6 +1772,5 @@ describe('ao-process', () => { }) }) - describe.todo('saveLatestProcessMemoryWith') describe.todo('saveCheckpointWith') }) diff --git a/servers/cu/src/domain/dal.js b/servers/cu/src/domain/dal.js index 138a7c6a6..c327bfba7 100644 --- a/servers/cu/src/domain/dal.js +++ b/servers/cu/src/domain/dal.js @@ -73,7 +73,8 @@ export const saveLatestProcessMemorySchema = z.function() cron: z.string().nullish(), blockHeight: z.coerce.number().nullish(), Memory: bufferSchema, - evalCount: z.number().nullish() + evalCount: z.number().nullish(), + gasUsed: z.bigint().nullish() })) .returns(z.promise(z.any())) diff --git a/servers/cu/src/domain/index.js b/servers/cu/src/domain/index.js index cd02b351e..b1c044eb0 100644 --- a/servers/cu/src/domain/index.js +++ b/servers/cu/src/domain/index.js @@ -191,7 +191,7 @@ export const createApis = async (ctx) => { }), saveLatestProcessMemory: AoProcessClient.saveLatestProcessMemoryWith({ cache: wasmMemoryCache, - EAGER_CHECKPOINT_THRESHOLD: ctx.EAGER_CHECKPOINT_THRESHOLD, + EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: ctx.EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD, saveCheckpoint, logger }), diff --git a/servers/cu/src/domain/lib/evaluate.js b/servers/cu/src/domain/lib/evaluate.js index 4bd6ddab3..d23a9d70f 100644 --- a/servers/cu/src/domain/lib/evaluate.js +++ b/servers/cu/src/domain/lib/evaluate.js @@ -87,6 +87,8 @@ export function evaluateWith (env) { of(ctx) .chain(loadEvaluator) .chain(fromPromise(async (ctx) => { + // A running tally of gas used in the eval stream + let totalGasUsed = BigInt(0) let prev = applySpec({ /** * Ensure all result fields are initialized @@ -214,6 +216,8 @@ export function evaluateWith (env) { */ .then(mergeLeft({ noSave, message, cron, ordinate })) .then(async (output) => { + if (output.GasUsed) totalGasUsed += BigInt(output.GasUsed ?? 0) + if (cron) ctx.stats.messages.cron++ else ctx.stats.messages.scheduled++ @@ -269,7 +273,8 @@ export function evaluateWith (env) { ordinate, cron, Memory: prev.Memory, - evalCount: ctx.stats.messages.scheduled + ctx.stats.messages.cron + evalCount: ctx.stats.messages.scheduled + ctx.stats.messages.cron, + gasUsed: totalGasUsed }) } diff --git a/servers/cu/src/domain/model.js b/servers/cu/src/domain/model.js index d280379a0..0eb1f3aed 100644 --- a/servers/cu/src/domain/model.js +++ b/servers/cu/src/domain/model.js @@ -85,11 +85,18 @@ export const domainConfigSchema = z.object({ */ DISABLE_PROCESS_CHECKPOINT_CREATION: z.preprocess((val) => !!val, z.boolean()), /** + * @deprecated * If an evaluation stream evaluates this amount of messages, * then it will immediately create a Checkpoint at the end of the * evaluation stream */ EAGER_CHECKPOINT_THRESHOLD: positiveIntSchema, + /** + * If a process uses this amount of + * gas, then it will immediately create a Checkpoint at the end of the + * evaluation stream. + */ + EAGER_CHECKPOINT_ACCUMULATED_GAS_THRESHOLD: positiveIntSchema, /** * The number of workers to use for evaluating messages */