diff --git a/packages/deployment/src/queue/BullQueue.ts b/packages/deployment/src/queue/BullQueue.ts index c7fe8d5e..bd884670 100644 --- a/packages/deployment/src/queue/BullQueue.ts +++ b/packages/deployment/src/queue/BullQueue.ts @@ -14,6 +14,7 @@ export interface BullQueueConfig { port: number; username?: string; password?: string; + db?: number; }; retryAttempts?: number; } @@ -25,6 +26,8 @@ export class BullQueue extends SequencerModule implements TaskQueue { + private activePromise?: Promise; + public createWorker( name: string, executor: (data: TaskPayload) => Promise, @@ -32,10 +35,32 @@ export class BullQueue ): Closeable { const worker = new Worker( name, - async (job) => await executor(job.data), + async (job) => { + // This weird promise logic is needed to make sure the worker is not proving in parallel + // This is by far not optimal - since it still picks up 1 task per queue but waits until + // computing them, so that leads to bad performance over multiple workers. + // For that we need to restructure tasks to be flowing through a single queue however + while (this.activePromise !== undefined) { + // eslint-disable-next-line no-await-in-loop + await this.activePromise; + } + let resOutside: () => void = () => {}; + const promise = new Promise((res) => { + resOutside = res; + }); + this.activePromise = promise; + + const result = await executor(job.data); + this.activePromise = undefined; + void resOutside(); + + return result; + }, { concurrency: options?.concurrency ?? 1, connection: this.config.redis, + stalledInterval: 60000, // 1 minute + lockDuration: 60000, // 1 minute metrics: { maxDataPoints: MetricsTime.ONE_HOUR * 24 }, } @@ -68,6 +93,7 @@ export class BullQueue name: queueName, async addTask(payload: TaskPayload): Promise<{ taskId: string }> { + log.debug("Adding task: ", payload); const job = await queue.add(queueName, payload, { attempts: retryAttempts ?? 2, }); @@ -76,14 +102,25 @@ export class BullQueue async onCompleted(listener: (payload: TaskPayload) => Promise) { events.on("completed", async (result) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - await listener(JSON.parse(result.returnvalue) as TaskPayload); + log.debug("Completed task: ", result); + try { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + await listener(result.returnvalue as unknown as TaskPayload); + } catch (e) { + // Catch error explicitly since this promise is dangling, + // therefore any error will be voided as well + log.error(e); + } + }); + events.on("error", async (error) => { + log.error("Error in worker", error); }); await events.waitUntilReady(); }, async close(): Promise { await events.close(); + await queue.drain(); await queue.close(); }, }; diff --git a/packages/protocol/src/prover/statetransition/StateTransitionProver.ts b/packages/protocol/src/prover/statetransition/StateTransitionProver.ts index 0618abec..2fcf143b 100644 --- a/packages/protocol/src/prover/statetransition/StateTransitionProver.ts +++ b/packages/protocol/src/prover/statetransition/StateTransitionProver.ts @@ -201,7 +201,7 @@ export class StateTransitionProverProgrammable extends ZkProgrammable< index = 0 ) { const witness = Provable.witness(RollupMerkleTreeWitness, () => - this.witnessProvider.getWitness(transition.path) + this.witnessProvider.getWitness(Field(transition.path.toString())) ); const membershipValid = witness.checkMembership( diff --git a/packages/sequencer/src/worker/worker/FlowTaskWorker.ts b/packages/sequencer/src/worker/worker/FlowTaskWorker.ts index b9015d0c..704bd5e4 100644 --- a/packages/sequencer/src/worker/worker/FlowTaskWorker.ts +++ b/packages/sequencer/src/worker/worker/FlowTaskWorker.ts @@ -31,7 +31,7 @@ export class FlowTaskWorker[]> log.debug(`Init task handler ${task.name}`); const queueName = task.name; return this.queue.createWorker(queueName, async (data) => { - log.debug(`Received task in queue ${queueName}`); + log.debug(`Received task ${data.taskId} in queue ${queueName}`); try { // Use first handler that returns a non-undefined result @@ -51,12 +51,16 @@ export class FlowTaskWorker[]> payload: await task.resultSerializer().toJSON(output), }; + log.debug( + `Responding to task ${data.taskId} with ${result.payload.slice(0, 100)}` + ); + return result; } catch (error: unknown) { const payload = error instanceof Error ? error.message : JSON.stringify(error); - log.debug("Error in worker (detailed trace): ", error); + log.info("Error in worker (detailed trace): ", error); return { status: "error", diff --git a/packages/sequencer/src/worker/worker/startup/WorkerRegistrationFlow.ts b/packages/sequencer/src/worker/worker/startup/WorkerRegistrationFlow.ts index d95f9c17..17108ff0 100644 --- a/packages/sequencer/src/worker/worker/startup/WorkerRegistrationFlow.ts +++ b/packages/sequencer/src/worker/worker/startup/WorkerRegistrationFlow.ts @@ -18,7 +18,9 @@ export class WorkerRegistrationFlow implements Closeable { flow?: Closeable; - public async start(payload: WorkerStartupPayload): Promise { + public async start( + payload: Omit + ): Promise { const flow = this.flowCreator.createFlow("register-worker-flow", {}); this.flow = flow; @@ -28,6 +30,7 @@ export class WorkerRegistrationFlow implements Closeable { // eslint-disable-next-line no-await-in-loop await flow.withFlow(async (res, rej) => { log.trace("Pushing registration task"); + await flow.pushTask(this.task, payload, async (result) => { // Here someone could inject things to happen when the worker registers res(result); diff --git a/packages/sequencer/test-integration/workers/ChildProcessWorker.ts b/packages/sequencer/test-integration/workers/ChildProcessWorker.ts new file mode 100644 index 00000000..1e261469 --- /dev/null +++ b/packages/sequencer/test-integration/workers/ChildProcessWorker.ts @@ -0,0 +1,31 @@ +import { spawn, ChildProcess } from "node:child_process"; + +export class ChildProcessWorker { + process?: ChildProcess; + + start(forwardLogs: boolean = true) { + const s = spawn("node", [ + "--experimental-vm-modules", + "--experimental-wasm-modules", + "../../node_modules/jest/bin/jest.js", + "./test-integration/workers/worker.test.ts", + ]); + s.on("error", (err) => { + console.error(err); + }); + if (forwardLogs) { + s.stdout.on("data", (data) => { + process.stdout.write(data); + }); + } + s.stderr.on("data", (data) => { + process.stderr.write(data); + }); + + this.process = s; + } + + kill() { + this?.process?.kill(); + } +} diff --git a/packages/sequencer/test-integration/workers/WorkerModules.ts b/packages/sequencer/test-integration/workers/WorkerModules.ts new file mode 100644 index 00000000..e1d640aa --- /dev/null +++ b/packages/sequencer/test-integration/workers/WorkerModules.ts @@ -0,0 +1,6 @@ +import { LocalTaskWorkerModule, TaskQueue, TypedClass } from "../../src"; + +export interface MinimumWorkerModules { + TaskQueue: TypedClass; + LocalTaskWorkerModule: TypedClass>; +} diff --git a/packages/sequencer/test-integration/workers/modules.ts b/packages/sequencer/test-integration/workers/modules.ts new file mode 100644 index 00000000..7f3481b5 --- /dev/null +++ b/packages/sequencer/test-integration/workers/modules.ts @@ -0,0 +1,50 @@ +import { Runtime } from "@proto-kit/module"; +import { Protocol } from "@proto-kit/protocol"; +import { VanillaProtocolModules } from "@proto-kit/library"; +import { ModulesConfig } from "@proto-kit/common"; +import { BullQueueConfig } from "@proto-kit/deployment"; + +import { ProvenBalance } from "../../test/integration/mocks/ProvenBalance"; +import { ProtocolStateTestHook } from "../../test/integration/mocks/ProtocolStateTestHook"; + +export const runtimeClass = Runtime.from({ + modules: { + Balance: ProvenBalance, + }, + + config: { + Balance: {}, + }, +}); + +export const protocolClass = Protocol.from({ + modules: VanillaProtocolModules.mandatoryModules({ + ProtocolStateTestHook, + }), +}); + +export const runtimeProtocolConfig: ModulesConfig<{ + Runtime: typeof runtimeClass; + Protocol: typeof protocolClass; +}> = { + Runtime: { + Balance: {}, + }, + Protocol: { + AccountState: {}, + BlockProver: {}, + StateTransitionProver: {}, + BlockHeight: {}, + LastStateRoot: {}, + ProtocolStateTestHook: {}, + }, +}; + +export const BullConfig: BullQueueConfig = { + redis: { + host: "localhost", + port: 6379, + password: "password", + db: 1, + }, +}; diff --git a/packages/sequencer/test-integration/workers/worker.test.ts b/packages/sequencer/test-integration/workers/worker.test.ts new file mode 100644 index 00000000..dfc049ac --- /dev/null +++ b/packages/sequencer/test-integration/workers/worker.test.ts @@ -0,0 +1,66 @@ +import "reflect-metadata"; +import { AppChain } from "@proto-kit/sdk"; +import { BullQueue } from "@proto-kit/deployment"; +import { container } from "tsyringe"; +import { log, sleep } from "@proto-kit/common"; + +import { + LocalTaskWorkerModule, + Sequencer, + VanillaTaskWorkerModules, +} from "../../src"; + +import { + BullConfig, + protocolClass, + runtimeClass, + runtimeProtocolConfig, +} from "./modules"; +import { MinimumWorkerModules } from "./WorkerModules"; + +describe("worker", () => { + it("spin up and wait", async () => { + const sequencerClass = Sequencer.from({ + modules: { + TaskQueue: BullQueue, + LocalTaskWorkerModule: LocalTaskWorkerModule.from( + VanillaTaskWorkerModules.withoutSettlement() + ), + } satisfies MinimumWorkerModules, + }); + + const app = AppChain.from({ + Runtime: runtimeClass, + Sequencer: sequencerClass, + Protocol: protocolClass, + modules: {}, + }); + + app.configure({ + ...runtimeProtocolConfig, + Sequencer: { + TaskQueue: BullConfig, + LocalTaskWorkerModule: VanillaTaskWorkerModules.defaultConfig(), + }, + }); + + console.log("Starting worker..."); + + log.setLevel("DEBUG"); + + await app.start(false, container.createChildContainer()); + + console.log("Worker started..."); + + const ready = await new Promise((res) => { + app + .resolve("Sequencer") + .resolve("LocalTaskWorkerModule") + .containerEvents.on("ready", res); + }); + + console.log("Ready received!"); + + await sleep(10000000); + }, 10000000); +}); diff --git a/packages/sequencer/test-integration/workers/workers-proven.test.ts b/packages/sequencer/test-integration/workers/workers-proven.test.ts new file mode 100644 index 00000000..e9368bc1 --- /dev/null +++ b/packages/sequencer/test-integration/workers/workers-proven.test.ts @@ -0,0 +1,133 @@ +import "reflect-metadata"; +import { expectDefined, log, sleep } from "@proto-kit/common"; +import { AppChain } from "@proto-kit/sdk"; +import { container } from "tsyringe"; +import { PrivateKey, UInt64 } from "o1js"; +import { BlockTestService } from "../../test/integration/services/BlockTestService"; +import { BullQueue } from "@proto-kit/deployment"; +import { + BullConfig, + protocolClass, + runtimeClass, + runtimeProtocolConfig, +} from "./modules"; +import { + BatchProducerModule, + BlockProducerModule, + InMemoryDatabase, + ManualBlockTrigger, + NoopBaseLayer, + PrivateMempool, + Sequencer, + SequencerStartupModule, +} from "../../src"; +import { ConstantFeeStrategy } from "../../src/protocol/baselayer/fees/ConstantFeeStrategy"; +import { ChildProcessWorker } from "./ChildProcessWorker"; + +const timeout = 300000; + +describe("worker-proven", () => { + describe("sequencer", () => { + let test: BlockTestService; + + let worker: ChildProcessWorker; + + let appChain: AppChain; + + beforeAll(async () => { + worker = new ChildProcessWorker(); + worker.start(true); + }); + + afterAll(() => { + worker.kill(); + }); + + it( + "should start up and compile", + async () => { + log.setLevel(log.levels.DEBUG); + + const sequencerClass = Sequencer.from({ + modules: { + Database: InMemoryDatabase, + Mempool: PrivateMempool, + BaseLayer: NoopBaseLayer, + BatchProducerModule, + BlockProducerModule, + BlockTrigger: ManualBlockTrigger, + TaskQueue: BullQueue, + FeeStrategy: ConstantFeeStrategy, + SequencerStartupModule, + }, + }); + + const app = AppChain.from({ + Runtime: runtimeClass, + Sequencer: sequencerClass, + Protocol: protocolClass, + modules: {}, + }); + + app.configure({ + Sequencer: { + Database: {}, + BlockTrigger: {}, + Mempool: {}, + BatchProducerModule: {}, + BlockProducerModule: {}, + BaseLayer: {}, + TaskQueue: BullConfig, + FeeStrategy: {}, + SequencerStartupModule: {}, + }, + ...runtimeProtocolConfig, + }); + + try { + // Start AppChain + const childContainer = container.createChildContainer(); + await app.start(false, childContainer); + + test = app.sequencer.dependencyContainer.resolve(BlockTestService); + + appChain = app; + } catch (e) { + console.error(e); + throw e; + } + }, + timeout + ); + + it( + "should produce simple block", + async () => { + expect.assertions(6); + + const privateKey = PrivateKey.random(); + + await test.addTransaction({ + method: ["Balance", "addBalance"], + privateKey, + args: [PrivateKey.random().toPublicKey(), UInt64.from(100)], + }); + + const [block, batch] = await test.produceBlockAndBatch(); + + expectDefined(block); + + expect(block.transactions).toHaveLength(1); + expect(block.transactions[0].status.toBoolean()).toBe(true); + + expectDefined(batch); + + console.log(batch.proof); + + expect(batch.proof.proof.length).toBeGreaterThan(50); + expect(batch.blockHashes).toHaveLength(1); + }, + timeout + ); + }); +}); diff --git a/packages/sequencer/test/integration/Proven.test.ts b/packages/sequencer/test/integration/Proven.test.ts index 9dea9b6f..08026a7e 100644 --- a/packages/sequencer/test/integration/Proven.test.ts +++ b/packages/sequencer/test/integration/Proven.test.ts @@ -13,7 +13,7 @@ import { ProtocolStateTestHook } from "./mocks/ProtocolStateTestHook"; import { BlockTestService } from "./services/BlockTestService"; import { ProvenBalance } from "./mocks/ProvenBalance"; -const timeout = 500000; +const timeout = 300000; describe("Proven", () => { let test: BlockTestService; @@ -100,31 +100,35 @@ describe("Proven", () => { timeout ); - it("should produce simple block", async () => { - expect.assertions(6); + it( + "should produce simple block", + async () => { + expect.assertions(6); - log.setLevel("INFO"); + log.setLevel("INFO"); - const privateKey = PrivateKey.random(); + const privateKey = PrivateKey.random(); - await test.addTransaction({ - method: ["Balance", "addBalance"], - privateKey, - args: [PrivateKey.random().toPublicKey(), UInt64.from(100)], - }); + await test.addTransaction({ + method: ["Balance", "addBalance"], + privateKey, + args: [PrivateKey.random().toPublicKey(), UInt64.from(100)], + }); - const [block, batch] = await test.produceBlockAndBatch(); + const [block, batch] = await test.produceBlockAndBatch(); - expectDefined(block); + expectDefined(block); - expect(block.transactions).toHaveLength(1); - expect(block.transactions[0].status.toBoolean()).toBe(true); + expect(block.transactions).toHaveLength(1); + expect(block.transactions[0].status.toBoolean()).toBe(true); - expectDefined(batch); + expectDefined(batch); - console.log(batch.proof); + console.log(batch.proof); - expect(batch.proof.proof.length).toBeGreaterThan(50); - expect(batch.blockHashes).toHaveLength(1); - }, 300_000); + expect(batch.proof.proof.length).toBeGreaterThan(50); + expect(batch.blockHashes).toHaveLength(1); + }, + timeout + ); });