From 19849360d88f2b8f19676f876c86151432211fb3 Mon Sep 17 00:00:00 2001 From: Matej Sima Date: Mon, 9 Dec 2024 10:44:07 +0100 Subject: [PATCH] starter kit integration WIP --- packages/deployment/src/queue/BullQueue.ts | 85 +++++++- packages/indexer/test/IndexerNotifier.test.ts | 2 - .../prisma/PrismaSettlementStorage.ts | 3 + .../prisma/mappers/SettlementMapper.ts | 4 +- .../contracts/SettlementSmartContract.ts | 1 + .../production/BatchProducerModule.ts | 10 +- .../production/BlockTaskFlowService.ts | 2 + .../production/flow/ReductionTaskFlow.ts | 9 + .../sequencing/BlockProducerModule.ts | 2 +- .../production/trigger/BlockTrigger.ts | 13 +- .../production/trigger/TimedBlockTrigger.ts | 23 +++ .../src/settlement/SettlementModule.ts | 192 +++++++++++------- .../settlement/messages/WithdrawalQueue.ts | 2 + .../transactions/MinaTransactionSimulator.ts | 3 +- .../sequencer/src/storage/model/Settlement.ts | 2 +- 15 files changed, 265 insertions(+), 88 deletions(-) diff --git a/packages/deployment/src/queue/BullQueue.ts b/packages/deployment/src/queue/BullQueue.ts index 36e646d56..19f253d68 100644 --- a/packages/deployment/src/queue/BullQueue.ts +++ b/packages/deployment/src/queue/BullQueue.ts @@ -1,5 +1,5 @@ import { MetricsTime, Queue, QueueEvents, Worker } from "bullmq"; -import { log, noop } from "@proto-kit/common"; +import { log, mapSequential, noop } from "@proto-kit/common"; import { TaskPayload, Closeable, @@ -14,6 +14,7 @@ export interface BullQueueConfig { port: number; username?: string; password?: string; + db?: number; }; retryAttempts?: number; } @@ -25,6 +26,8 @@ export class BullQueue extends SequencerModule implements TaskQueue { + private activePromise?: Promise; + public createWorker( name: string, executor: (data: TaskPayload) => Promise, @@ -32,10 +35,32 @@ export class BullQueue ): Closeable { const worker = new Worker( name, - async (job) => await executor(job.data), + async (job) => { + // This weird promise logic is needed to make sure the worker is not proving in parallel + // This is by far not optimal - since it still picks up 1 task per queue but waits until + // computing them, so that leads to bad performance over multiple workers. + // For that we need to restructure tasks to be flowing through a single queue however + while (this.activePromise !== undefined) { + // eslint-disable-next-line no-await-in-loop + await this.activePromise; + } + let resOutside: () => void = () => {}; + const promise = new Promise((res) => { + resOutside = res; + }); + this.activePromise = promise; + + const result = await executor(job.data); + this.activePromise = undefined; + void resOutside(); + + return result; + }, { concurrency: options?.concurrency ?? 1, connection: this.config.redis, + stalledInterval: 60000, // 1 minute + lockDuration: 60000, // 1 minute metrics: { maxDataPoints: MetricsTime.ONE_HOUR * 24 }, } @@ -54,7 +79,22 @@ export class BullQueue }; } + /** + * 14 blocks: + * + * - 14 newBlocks + * @param queueName + */ + + private initializedQueue: Record = {}; + public async getQueue(queueName: string): Promise { + // queueName = "testqeue"; + + if (this.initializedQueue[queueName] !== undefined) { + return this.initializedQueue[queueName]; + } + const { retryAttempts, redis } = this.config; const queue = new Queue(queueName, { @@ -62,12 +102,14 @@ export class BullQueue }); const events = new QueueEvents(queueName, { connection: redis }); - await queue.drain(); - - return { + const instantiatedQueue = { name: queueName, + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + listeners: [] as ((payload: TaskPayload) => Promise)[], + async addTask(payload: TaskPayload): Promise<{ taskId: string }> { + log.debug("Adding task: ", payload); const job = await queue.add(queueName, payload, { attempts: retryAttempts ?? 2, }); @@ -75,18 +117,43 @@ export class BullQueue }, async onCompleted(listener: (payload: TaskPayload) => Promise) { - events.on("completed", async (result) => { - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - await listener(result.returnvalue as unknown as TaskPayload); - }); + if (this.listeners.length === 0) { + events.on("completed", async (result) => { + log.debug("Completed task: ", result); + try { + await mapSequential( + this.listeners, + async (listener2) => + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + await listener2(result.returnvalue as unknown as TaskPayload) + ); + } catch (e) { + // Catch error explicitly since this promise is dangling, + // therefore any error will be voided as well + log.error(e); + } + }); + events.on("error", async (error) => { + log.error("Error in worker", error); + }); + } + + this.listeners.push(listener); await events.waitUntilReady(); }, async close(): Promise { await events.close(); + await queue.drain(); await queue.close(); }, }; + + this.initializedQueue[queueName] = instantiatedQueue; + + await queue.drain(); + + return instantiatedQueue; } public async start() { diff --git a/packages/indexer/test/IndexerNotifier.test.ts b/packages/indexer/test/IndexerNotifier.test.ts index e59d27710..d65858e12 100644 --- a/packages/indexer/test/IndexerNotifier.test.ts +++ b/packages/indexer/test/IndexerNotifier.test.ts @@ -137,8 +137,6 @@ async function sendTransactions( { nonce: i } ); - console.log("tx nonce", tx.transaction?.nonce.toBigInt()); - await tx.sign(); await tx.send(); } diff --git a/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts b/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts index 0748e932a..a40f4a09f 100644 --- a/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts +++ b/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts @@ -1,5 +1,6 @@ import { Settlement, SettlementStorage } from "@proto-kit/sequencer"; import { inject, injectable } from "tsyringe"; +import { Provable } from "o1js"; import type { PrismaConnection } from "../../PrismaDatabaseConnection"; @@ -17,6 +18,8 @@ export class PrismaSettlementStorage implements SettlementStorage { const dbSettlement = this.settlementMapper.mapOut(settlement); + Provable.log("db settlement", settlement); + await prismaClient.settlement.create({ data: { ...dbSettlement[0], diff --git a/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts b/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts index 5bfc77cae..b4a117dfa 100644 --- a/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts +++ b/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts @@ -13,7 +13,7 @@ export class SettlementMapper return { batches, // TODO Add this back in after we make the transactions traceable - // transactionHash: settlement.transactionHash, + transactionHash: settlement.transactionHash, promisedMessagesHash: settlement.promisedMessagesHash, }; } @@ -22,7 +22,7 @@ export class SettlementMapper return [ { promisedMessagesHash: input.promisedMessagesHash, - transactionHash: "", // input.transactionHash, + transactionHash: input.transactionHash, }, input.batches, ]; diff --git a/packages/protocol/src/settlement/contracts/SettlementSmartContract.ts b/packages/protocol/src/settlement/contracts/SettlementSmartContract.ts index a0966ce37..8ef4e76ec 100644 --- a/packages/protocol/src/settlement/contracts/SettlementSmartContract.ts +++ b/packages/protocol/src/settlement/contracts/SettlementSmartContract.ts @@ -178,6 +178,7 @@ export abstract class SettlementSmartContractBase extends TokenContract { const escapeHatchActivated = lastSettlementL1BlockHeight .add(UInt32.from(escapeHatchSlotsInterval)) .lessThan(minBlockHeightIncluded); + signatureValid .or(escapeHatchActivated) .assertTrue( diff --git a/packages/sequencer/src/protocol/production/BatchProducerModule.ts b/packages/sequencer/src/protocol/production/BatchProducerModule.ts index 967d34ac4..cbd0c83e9 100644 --- a/packages/sequencer/src/protocol/production/BatchProducerModule.ts +++ b/packages/sequencer/src/protocol/production/BatchProducerModule.ts @@ -71,7 +71,7 @@ const errors = { */ @sequencerModule() export class BatchProducerModule extends SequencerModule { - private productionInProgress = false; + public productionInProgress = false; public constructor( @inject("AsyncStateService") @@ -135,7 +135,9 @@ export class BatchProducerModule extends SequencerModule { try { this.productionInProgress = true; + log.trace("Producing new batch with blocks", blocks); const batch = await this.produceBatch(blocks, height); + log.trace("Produced new batch", batch); this.productionInProgress = false; @@ -274,7 +276,13 @@ export class BatchProducerModule extends SequencerModule { blockTraces.push(blockTrace); } + log.provable.trace("Executing flow with traces", blockTraces); const proof = await this.blockFlowService.executeFlow(blockTraces, blockId); + log.provable.trace( + "Flow executed with the following output proof", + proof.publicInput, + proof.publicOutput + ); const fromNetworkState = blocks[0].block.block.networkState.before; const toNetworkState = blocks.at(-1)!.block.result.afterNetworkState; diff --git a/packages/sequencer/src/protocol/production/BlockTaskFlowService.ts b/packages/sequencer/src/protocol/production/BlockTaskFlowService.ts index 336d9cdba..5e3f1c28c 100644 --- a/packages/sequencer/src/protocol/production/BlockTaskFlowService.ts +++ b/packages/sequencer/src/protocol/production/BlockTaskFlowService.ts @@ -214,10 +214,12 @@ export class BlockTaskFlowService { }, this.flowCreator ); + blockMergingFlow.onCompletion(async (result) => { log.debug(`Block generation finished, with proof ${result.proof}`); // TODO Remove result logging flow.resolve(result); }); + blockMergingFlow.deferErrorsTo(flow); return await flow.withFlow(async () => { diff --git a/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts b/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts index 63a5b720b..ed5215528 100644 --- a/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts +++ b/packages/sequencer/src/protocol/production/flow/ReductionTaskFlow.ts @@ -213,8 +213,17 @@ export class ReductionTaskFlow { this.flow.resolve(result); } else { this.flow.state.queue.push(result); + // await this.resolveReduction(); + } + if (this.options.inputLength === this.flow.state.queue.length) { await this.resolveReduction(); } + // if (this.options.inputLength === 1) { + // this.flow.resolve(result); + // } else { + // this.flow.state.queue.push(result); + // await this.resolveReduction(); + // } } ); } diff --git a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts index 106833f48..609d0ad8c 100644 --- a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts +++ b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts @@ -31,7 +31,7 @@ export interface BlockConfig { @sequencerModule() export class BlockProducerModule extends SequencerModule { - private productionInProgress = false; + public productionInProgress = false; public constructor( @inject("Mempool") private readonly mempool: Mempool, diff --git a/packages/sequencer/src/protocol/production/trigger/BlockTrigger.ts b/packages/sequencer/src/protocol/production/trigger/BlockTrigger.ts index 4a1862cb4..0aea93d19 100644 --- a/packages/sequencer/src/protocol/production/trigger/BlockTrigger.ts +++ b/packages/sequencer/src/protocol/production/trigger/BlockTrigger.ts @@ -103,11 +103,20 @@ export class BlockTriggerBase< "SettlementStorage module not configured, check provided database moduel" ); } - const settlement = await this.settlementModule.settleBatch(batch); - await this.settlementStorage.pushSettlement(settlement); + const settlement = await this.settlementModule.trySettleBatch(batch); + if (settlement !== undefined) { + await this.settlementStorage.pushSettlement(settlement); + } + return settlement; } + protected async rollupOutgoingMessages() { + if (this.settlementModule) { + await this.settlementModule.sendRollupTransactions(); + } + } + public async start(): Promise { noop(); } diff --git a/packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts b/packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts index 5c892329b..fada55bf8 100644 --- a/packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts +++ b/packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts @@ -101,6 +101,15 @@ export class TimedBlockTrigger try { // Trigger unproven blocks if (totalTime % blockInterval === 0) { + if ( + this.blockProducerModule.productionInProgress === true || + this.batchProducerModule?.productionInProgress === true + ) { + log.info( + "batch or block still being produced, skipping unproven block prod!" + ); + return; + } await this.produceUnprovenBlock(); } @@ -111,9 +120,23 @@ export class TimedBlockTrigger settlementInterval !== undefined && totalTime % settlementInterval === 0 ) { + if ( + this.batchProducerModule?.productionInProgress === true || + this.settlementModule?.settlementInProgress === true + ) { + log.info( + "Previous batch or settlement still in progress, skipping" + ); + return; + } const batch = await this.produceBatch(); if (batch !== undefined) { + log.info("Settling batch", batch.height); await this.settle(batch); + log.info("Batch settled"); + log.info("Rolling up outgoing messages"); + await this.rollupOutgoingMessages(); + log.info("Outgoing messages rolled up"); } } } catch (error) { diff --git a/packages/sequencer/src/settlement/SettlementModule.ts b/packages/sequencer/src/settlement/SettlementModule.ts index 85887757b..1e7dcf62b 100644 --- a/packages/sequencer/src/settlement/SettlementModule.ts +++ b/packages/sequencer/src/settlement/SettlementModule.ts @@ -22,6 +22,7 @@ import { Signature, Transaction, fetchAccount, + fetchLastBlock, } from "o1js"; import { inject } from "tsyringe"; import { @@ -54,10 +55,18 @@ import type { OutgoingMessageQueue } from "./messages/WithdrawalQueue"; import { MinaTransactionSender } from "./transactions/MinaTransactionSender"; import { ProvenSettlementPermissions } from "./permissions/ProvenSettlementPermissions"; import { SignedSettlementPermissions } from "./permissions/SignedSettlementPermissions"; +import { MinaTransactionSimulator } from "./transactions/MinaTransactionSimulator"; export interface SettlementModuleConfig { feepayer: PrivateKey; - address?: PublicKey; + addresses?: { + settlement: PublicKey; + dispatch: PublicKey; + }; + keys?: { + settlement: PrivateKey; + dispatch: PrivateKey; + }; } export type SettlementModuleEvents = { @@ -88,6 +97,8 @@ export class SettlementModule public events = new EventEmitter(); + public settlementInProgress = false; + public constructor( @inject("BaseLayer") private readonly baseLayer: MinaBaseLayer, @@ -109,6 +120,7 @@ export class SettlementModule private readonly blockProofSerializer: BlockProofSerializer, @inject("TransactionSender") private readonly transactionSender: MinaTransactionSender, + private readonly simulator: MinaTransactionSimulator, @inject("AreProofsEnabled") private readonly areProofsEnabled: AreProofsEnabled, @inject("FeeStrategy") @@ -191,14 +203,13 @@ export class SettlementModule } /* eslint-disable no-await-in-loop */ - public async sendRollupTransactions(options: { nonce: number }): Promise< + public async sendRollupTransactions(options?: { nonce: number }): Promise< { tx: Transaction; }[] > { const length = this.outgoingMessageQueue.length(); const { feepayer } = this.config; - let { nonce } = options; const txs: { tx: Transaction; @@ -213,6 +224,12 @@ export class SettlementModule this.getSettlementModuleConfig().withdrawalStatePath.split("."); const basePath = Path.fromProperty(withdrawalModule, withdrawalStateName); + const feePayerAccount = await fetchAccount({ + publicKey: feepayer.toPublicKey(), + }); + const feePayerNonce = Number(feePayerAccount.account?.nonce.toBigint()); + let nonce = options?.nonce ?? feePayerNonce; + for (let i = 0; i < length; i += OUTGOING_MESSAGE_BATCH_SIZE) { const batch = this.outgoingMessageQueue.peek(OUTGOING_MESSAGE_BATCH_SIZE); @@ -269,92 +286,127 @@ export class SettlementModule contracts !== undefined && this.baseLayer.config.network.type !== "local" ) { - await fetchAccount({ - publicKey: contracts.settlement.address, - tokenId: contracts.settlement.tokenId, - }); - await fetchAccount({ - publicKey: contracts.dispatch.address, - tokenId: contracts.dispatch.tokenId, - }); + await this.simulator.getAccount( + contracts.settlement.address, + contracts.settlement.tokenId + ); + + await this.simulator.getAccount( + contracts.dispatch.address, + contracts.dispatch.tokenId + ); } } - public async settleBatch( + public async trySettleBatch( batch: SettleableBatch, options: { nonce?: number; } = {} - ): Promise { - await this.fetchContractAccounts(); - const { settlement, dispatch } = this.getContracts(); - const { feepayer } = this.config; + ): Promise { + if (this.settlementInProgress) { + log.info("Settlement already in progress, skipping"); + return; + } - log.debug("Preparing settlement"); + // eslint-disable-next-line consistent-return + return await this.settleBatch(batch, options); + } - const lastSettlementL1BlockHeight = - settlement.lastSettlementL1BlockHeight.get().value; - const signature = Signature.create(feepayer, [ - BATCH_SIGNATURE_PREFIX, - lastSettlementL1BlockHeight, - ]); + public async settleBatch( + batch: SettleableBatch, + options: { + nonce?: number; + } = {} + ): Promise { + this.settlementInProgress = true; + try { + await this.fetchContractAccounts(); + const { settlement, dispatch } = this.getContracts(); + const { feepayer } = this.config; - const fromSequenceStateHash = BlockProverPublicOutput.fromFields( - batch.proof.publicOutput.map((x) => Field(x)) - ).incomingMessagesHash; - const latestSequenceStateHash = dispatch.account.actionState.get(); + log.debug("Preparing settlement"); - // Fetch actions and store them into the messageStorage - const actions = await this.incomingMessagesAdapter.getPendingMessages( - dispatch.address, - { - fromActionHash: fromSequenceStateHash.toString(), - toActionHash: latestSequenceStateHash.toString(), - fromL1BlockHeight: Number(lastSettlementL1BlockHeight.toString()), - } - ); - await this.messageStorage.pushMessages( - actions.from, - actions.to, - actions.messages - ); + const lastSettlementL1BlockHeight = + settlement.lastSettlementL1BlockHeight.get().value; - const blockProof = await this.blockProofSerializer - .getBlockProofSerializer() - .fromJSONProof(batch.proof); + const blockchainLength = ( + await fetchLastBlock() + ).blockchainLength.toBigint(); - const tx = await Mina.transaction( - { - sender: feepayer.toPublicKey(), - nonce: options?.nonce, - fee: this.feeStrategy.getFee(), - memo: "Protokit settle", - }, - async () => { - await settlement.settle( - blockProof, - signature, - dispatch.address, - feepayer.toPublicKey(), - batch.fromNetworkState, - batch.toNetworkState, - latestSequenceStateHash + if (blockchainLength < lastSettlementL1BlockHeight.toBigInt()) { + log.info( + "Skipping settlement due to a previous settlement not being included on the L1 yet" ); + + return; } - ); - this.signTransaction(tx, [feepayer]); + const signature = Signature.create(feepayer, [ + BATCH_SIGNATURE_PREFIX, + lastSettlementL1BlockHeight, + ]); - await this.transactionSender.proveAndSendTransaction(tx, "included"); + const fromSequenceStateHash = BlockProverPublicOutput.fromFields( + batch.proof.publicOutput.map((x) => Field(x)) + ).incomingMessagesHash; + const latestSequenceStateHash = dispatch.account.actionState.get(); - log.info("Settlement transaction send queued"); + // Fetch actions and store them into the messageStorage + const actions = await this.incomingMessagesAdapter.getPendingMessages( + dispatch.address, + { + fromActionHash: fromSequenceStateHash.toString(), + toActionHash: latestSequenceStateHash.toString(), + fromL1BlockHeight: Number(lastSettlementL1BlockHeight.toString()), + } + ); + await this.messageStorage.pushMessages( + actions.from, + actions.to, + actions.messages + ); - this.events.emit("settlement-submitted", batch); + const blockProof = await this.blockProofSerializer + .getBlockProofSerializer() + .fromJSONProof(batch.proof); - return { - batches: [batch.height], - promisedMessagesHash: latestSequenceStateHash.toString(), - }; + const tx = await Mina.transaction( + { + sender: feepayer.toPublicKey(), + nonce: options?.nonce, + fee: this.feeStrategy.getFee(), + memo: "Protokit settle", + }, + async () => { + await settlement.settle( + blockProof, + signature, + dispatch.address, + feepayer.toPublicKey(), + batch.fromNetworkState, + batch.toNetworkState, + latestSequenceStateHash + ); + } + ); + + this.signTransaction(tx, [feepayer]); + + await this.transactionSender.proveAndSendTransaction(tx, "included"); + + this.events.emit("settlement-submitted", batch); + + // eslint-disable-next-line consistent-return + return { + // TODO: use real transaction hash instead of timestamp + transactionHash: Date.now().toString(), + batches: [batch.height], + promisedMessagesHash: latestSequenceStateHash.toString(), + }; + } finally { + this.settlementInProgress = false; + } } public async deploy( @@ -439,6 +491,8 @@ export class SettlementModule } public async start(): Promise { + this.addresses = this.config.addresses; + this.keys = this.config.keys; noop(); } } diff --git a/packages/sequencer/src/settlement/messages/WithdrawalQueue.ts b/packages/sequencer/src/settlement/messages/WithdrawalQueue.ts index d7849498e..7f94d30a8 100644 --- a/packages/sequencer/src/settlement/messages/WithdrawalQueue.ts +++ b/packages/sequencer/src/settlement/messages/WithdrawalQueue.ts @@ -94,6 +94,8 @@ export class WithdrawalQueue // TODO Replace by stateservice call? if (settlementModule.addresses !== undefined) { const { settlement } = settlementModule.getContracts(); + + await settlement.outgoingMessageCursor.fetch(); this.currentIndex = Number( settlement.outgoingMessageCursor.get().toBigInt() ); diff --git a/packages/sequencer/src/settlement/transactions/MinaTransactionSimulator.ts b/packages/sequencer/src/settlement/transactions/MinaTransactionSimulator.ts index 5f00ea36f..69ddcad6c 100644 --- a/packages/sequencer/src/settlement/transactions/MinaTransactionSimulator.ts +++ b/packages/sequencer/src/settlement/transactions/MinaTransactionSimulator.ts @@ -111,7 +111,7 @@ export class MinaTransactionSimulator { Object.entries(accounts).forEach(([, account]) => { addCachedAccount(account); - this.loaded[account.publicKey.toBase58()] = account; + this.loaded[this.cacheKey(account.publicKey, account.tokenId)] = account; }); } @@ -256,6 +256,7 @@ export class MinaTransactionSimulator { if (account.zkapp !== undefined) { const { appState } = update; + for (let i = 0; i < 8; i++) { if (appState[i].isSome.toBoolean()) { account.zkapp.appState[i] = appState[i].value; diff --git a/packages/sequencer/src/storage/model/Settlement.ts b/packages/sequencer/src/storage/model/Settlement.ts index a54c089ac..08782ef79 100644 --- a/packages/sequencer/src/storage/model/Settlement.ts +++ b/packages/sequencer/src/storage/model/Settlement.ts @@ -1,7 +1,7 @@ export interface Settlement { // transaction: string; // TODO Re-add some way to link dispatched L1 transactions with proven and sent txs - // transactionHash: string; + transactionHash: string; promisedMessagesHash: string; batches: number[]; }