diff --git a/Cargo.lock b/Cargo.lock index 4df66b05..7704a1a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6872,6 +6872,7 @@ dependencies = [ "alloy-rpc-types-engine", "alloy-rpc-types-eth 0.11.1", "alloy-serde 0.7.3", + "alloy-sol-types", "alloy-transport", "alloy-transport-http", "async-trait", @@ -6926,6 +6927,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "thiserror 2.0.11", "tikv-jemallocator", "time", "tokio", @@ -6934,6 +6936,7 @@ dependencies = [ "tower 0.4.13", "tracing", "tracing-subscriber", + "url", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 6070c111..37d3897c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ ethereum_ssz_derive = "0.8" ethereum_ssz = "0.8" alloy-primitives = { version = "0.8.15", default-features = false } +alloy-sol-types = { version = "0.8.15", default-features = false } alloy-rlp = "0.3.10" alloy-chains = "0.1.33" alloy-provider = { version = "0.11.1", features = ["ipc", "pubsub"] } diff --git a/crates/op-rbuilder/Cargo.toml b/crates/op-rbuilder/Cargo.toml index 16ac91cb..1085ff58 100644 --- a/crates/op-rbuilder/Cargo.toml +++ b/crates/op-rbuilder/Cargo.toml @@ -50,6 +50,7 @@ alloy-rpc-types-eth.workspace = true alloy-rpc-client.workspace = true alloy-transport.workspace = true alloy-network.workspace = true +alloy-sol-types.workspace = true # op op-alloy-consensus.workspace = true @@ -85,6 +86,8 @@ alloy-serde = "0.7" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } rollup-boost = { git = "http://github.com/flashbots/rollup-boost", rev = "e74a1fd01366e4ddd13515da4efda59cdc8fbce0" } +thiserror = "2.0.11" +url = "2.5.3" [target.'cfg(unix)'.dependencies] tikv-jemallocator = { version = "0.6", optional = true } diff --git a/crates/op-rbuilder/src/args.rs b/crates/op-rbuilder/src/args.rs index dd465c1c..c5410021 100644 --- a/crates/op-rbuilder/src/args.rs +++ b/crates/op-rbuilder/src/args.rs @@ -6,6 +6,7 @@ use reth_optimism_node::args::RollupArgs; use crate::tx_signer::Signer; +use alloy_transport_http::reqwest::Url; /// Parameters for rollup configuration #[derive(Debug, Clone, Default, PartialEq, Eq, clap::Args)] @@ -17,4 +18,7 @@ pub struct OpRbuilderArgs { /// Builder secret key for signing last transaction in block #[arg(long = "rollup.builder-secret-key", env = "BUILDER_SECRET_KEY")] pub builder_signer: Option, + /// URL of the supervisor service for transaction validation + #[arg(long = "rollup.supervisor-url", env = "SUPERVISOR_URL")] + pub supervisor_url: Option, } diff --git a/crates/op-rbuilder/src/lib.rs b/crates/op-rbuilder/src/lib.rs index aee80672..211b1ddb 100644 --- a/crates/op-rbuilder/src/lib.rs +++ b/crates/op-rbuilder/src/lib.rs @@ -1,3 +1,4 @@ pub mod integration; +pub mod primitives; pub mod tester; pub mod tx_signer; diff --git a/crates/op-rbuilder/src/main.rs b/crates/op-rbuilder/src/main.rs index 770c3610..d3167f5a 100644 --- a/crates/op-rbuilder/src/main.rs +++ b/crates/op-rbuilder/src/main.rs @@ -21,6 +21,7 @@ mod monitoring; pub mod payload_builder; #[cfg(not(feature = "flashblocks"))] mod payload_builder_vanilla; +mod primitives; #[cfg(test)] mod tester; mod tx_signer; @@ -33,11 +34,10 @@ fn main() { let op_node = OpNode::new(rollup_args.clone()); let handle = builder .with_types::() - .with_components( - op_node - .components() - .payload(CustomOpPayloadBuilder::new(builder_args.builder_signer)), - ) + .with_components(op_node.components().payload(CustomOpPayloadBuilder::new( + builder_args.builder_signer, + builder_args.supervisor_url, + ))) .with_add_ons( OpAddOnsBuilder::default() .with_sequencer(rollup_args.sequencer_http.clone()) diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index 4451d0b5..5e8e426f 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -44,6 +44,14 @@ pub struct OpRBuilderMetrics { pub tx_byte_size: Histogram, /// Number of reverted transactions pub num_reverted_tx: Counter, + /// Number of cross-chain transactions + pub num_cross_chain_tx: Counter, + /// Number of cross-chain transactions that didn't pass supervisor validation + pub num_cross_chain_tx_fail: Counter, + /// Number of cross-chain transactions that weren't verified because of the timeout + pub num_cross_chain_tx_timeout: Counter, + /// Number of cross-chain transactions that weren't verified because of the server error + pub num_cross_chain_tx_server_error: Counter, } impl OpRBuilderMetrics { @@ -70,4 +78,20 @@ impl OpRBuilderMetrics { pub fn set_builder_balance(&self, balance: f64) { self.builder_balance.set(balance); } + + pub fn inc_num_cross_chain_tx_fail(&self) { + self.num_cross_chain_tx_fail.increment(1); + } + + pub fn inc_num_cross_chain_tx(&self) { + self.num_cross_chain_tx.increment(1); + } + + pub fn inc_num_cross_chain_tx_timeout(&self) { + self.num_cross_chain_tx_timeout.increment(1); + } + + pub fn inc_num_cross_chain_tx_server_error(&self) { + self.num_cross_chain_tx_server_error.increment(1); + } } diff --git a/crates/op-rbuilder/src/payload_builder_vanilla.rs b/crates/op-rbuilder/src/payload_builder_vanilla.rs index 03f3e469..22a2233f 100644 --- a/crates/op-rbuilder/src/payload_builder_vanilla.rs +++ b/crates/op-rbuilder/src/payload_builder_vanilla.rs @@ -1,8 +1,10 @@ -use crate::generator::BlockPayloadJobGenerator; -use crate::generator::BuildArguments; use crate::{ - generator::{BlockCell, PayloadBuilder}, + generator::{BlockCell, BlockPayloadJobGenerator, BuildArguments, PayloadBuilder}, metrics::OpRBuilderMetrics, + primitives::kona::{ + ExecutingMessage, ExecutingMessageValidator, ExecutingMessageValidatorError, SafetyLevel, + SupervisorValidator, + }, tx_signer::Signer, }; use alloy_consensus::constants::EMPTY_WITHDRAWALS; @@ -15,6 +17,7 @@ use alloy_primitives::private::alloy_rlp::Encodable; use alloy_primitives::{Address, Bytes, TxHash, TxKind, B256, U256}; use alloy_rpc_types_engine::PayloadId; use alloy_rpc_types_eth::Withdrawals; +use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; use op_alloy_consensus::{OpDepositReceipt, OpTypedTransaction}; use reth::builder::{components::PayloadServiceBuilder, node::FullNodeTypes, BuilderContext}; use reth::core::primitives::InMemorySize; @@ -78,15 +81,21 @@ use std::{fmt::Display, sync::Arc, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{info, trace, warn}; -#[derive(Debug, Clone, Copy, Default)] +use url::Url; + +#[derive(Debug, Clone, Default)] #[non_exhaustive] pub struct CustomOpPayloadBuilder { builder_signer: Option, + supervisor_url: Option, } impl CustomOpPayloadBuilder { - pub fn new(builder_signer: Option) -> Self { - Self { builder_signer } + pub fn new(builder_signer: Option, supervisor_url: Option) -> Self { + Self { + builder_signer, + supervisor_url, + } } } @@ -116,6 +125,7 @@ where pool, ctx.provider().clone(), Arc::new(BasicOpReceiptBuilder::default()), + self.supervisor_url.clone(), )) } @@ -197,6 +207,8 @@ pub struct OpPayloadBuilderVanilla>, + /// Client to execute supervisor validation + pub supervisor_client: Option, } impl @@ -209,6 +221,7 @@ impl pool: Pool, client: Client, receipt_builder: Arc>, + supervisor_url: Option, ) -> Self { Self::with_builder_config( evm_config, @@ -216,6 +229,7 @@ impl pool, client, receipt_builder, + supervisor_url, Default::default(), ) } @@ -226,8 +240,14 @@ impl pool: Pool, client: Client, receipt_builder: Arc>, + supervisor_url: Option, config: OpBuilderConfig, ) -> Self { + let supervisor_client = supervisor_url.map(|url| { + HttpClientBuilder::default() + .build(url) + .expect("building supervisor http client") + }); Self { pool, client, @@ -237,6 +257,7 @@ impl best_transactions: (), metrics: Default::default(), builder_signer, + supervisor_client, } } } @@ -270,7 +291,7 @@ where }, |hashes| { #[allow(clippy::unit_arg)] - self.best_transactions.remove_reverted(pool.clone(), hashes) + self.best_transactions.remove_invalid(pool.clone(), hashes) }, )? { BuildOutcome::Better { payload, .. } => { @@ -344,6 +365,7 @@ where receipt_builder: self.receipt_builder.clone(), builder_signer: self.builder_signer, metrics: Default::default(), + supervisor_client: self.supervisor_client.clone(), }; let builder = OpBuilder::new(best, remove_reverted); @@ -406,7 +428,7 @@ pub struct OpBuilder<'a, Txs> { best: Box Txs + 'a>, /// Removes reverted transactions from the tx pool #[debug(skip)] - remove_reverted: Box) + 'a>, + remove_invalid: Box) + 'a>, } impl<'a, Txs> OpBuilder<'a, Txs> { @@ -416,7 +438,7 @@ impl<'a, Txs> OpBuilder<'a, Txs> { ) -> Self { Self { best: Box::new(best), - remove_reverted: Box::new(remove_reverted), + remove_invalid: Box::new(remove_reverted), } } } @@ -438,7 +460,7 @@ impl OpBuilder<'_, Txs> { { let Self { best, - remove_reverted, + remove_invalid, } = self; info!(target: "payload_builder", id=%ctx.payload_id(), parent_header = ?ctx.parent().hash(), parent_number = ctx.parent().number, "building new payload"); @@ -535,7 +557,7 @@ impl OpBuilder<'_, Txs> { None }; - remove_reverted(info.reverted_tx_hashes.iter().copied().collect()); + remove_invalid(info.invalid_tx_hashes.iter().copied().collect()); let payload = ExecutedPayload { info, @@ -704,8 +726,8 @@ pub trait OpPayloadTransactions: Clone + Send + Sync + Unpin + 'sta attr: BestTransactionsAttributes, ) -> impl PayloadTransactions; - /// Removes reverted transactions from the tx pool - fn remove_reverted>( + /// Removes invalid transactions from the tx pool + fn remove_invalid>( &self, pool: Pool, hashes: Vec, @@ -721,7 +743,7 @@ impl OpPayloadTransactions for () { BestPayloadTransactions::new(pool.best_transactions_with_attributes(attr)) } - fn remove_reverted>( + fn remove_invalid>( &self, pool: Pool, hashes: Vec, @@ -755,7 +777,7 @@ pub struct ExecutionInfo { /// Tracks fees from executed mempool transactions pub total_fees: U256, /// Tracks the reverted transaction hashes to remove from the transaction pool - pub reverted_tx_hashes: HashSet, + pub invalid_tx_hashes: HashSet, } impl ExecutionInfo { @@ -768,7 +790,7 @@ impl ExecutionInfo { cumulative_gas_used: 0, cumulative_da_bytes_used: 0, total_fees: U256::ZERO, - reverted_tx_hashes: HashSet::new(), + invalid_tx_hashes: HashSet::new(), } } @@ -820,6 +842,8 @@ pub struct OpPayloadBuilderCtx, /// The metrics for the builder pub metrics: OpRBuilderMetrics, + /// Client to execute supervisor validation + pub supervisor_client: Option, } impl OpPayloadBuilderCtx @@ -1110,6 +1134,27 @@ where return Err(PayloadBuilderError::EvmExecutionError(Box::new(err))); } }; + // op-supervisor validation + match self.validate_supervisor_messages(&result)? { + Ok(()) => (), + Err(err) => match err { + ExecutingMessageValidatorError::SupervisorServerError(err) => { + warn!(target: "payload_builder", %err, ?sequencer_tx, "Supervisor error, skipping."); + self.metrics.inc_num_cross_chain_tx_server_error(); + continue; + } + ExecutingMessageValidatorError::ValidationTimeout(_) => { + trace!(target: "payload_builder", %err, ?sequencer_tx, "Executing message validation timed out, skipping."); + self.metrics.inc_num_cross_chain_tx_timeout(); + continue; + } + err => { + trace!(target: "payload_builder", %err, ?sequencer_tx, "Executing message rejected."); + self.metrics.inc_num_cross_chain_tx_fail(); + continue; + } + }, + } // commit changes evm.db_mut().commit(state); @@ -1208,6 +1253,34 @@ where } }; + match self.validate_supervisor_messages(&result)? { + Ok(()) => (), + Err(err) => { + match err { + ExecutingMessageValidatorError::SupervisorServerError(err) => { + trace!(target: "payload_builder", %err, ?tx, "Supervisor error, skipping."); + self.metrics.inc_num_cross_chain_tx_server_error(); + continue; + } + ExecutingMessageValidatorError::ValidationTimeout(_) => { + trace!(target: "payload_builder", %err, ?tx, "Executing message validation timed out, skipping."); + self.metrics.inc_num_cross_chain_tx_timeout(); + continue; + } + err => { + trace!(target: "payload_builder", %err, ?tx, "Executing message rejected."); + self.metrics.inc_num_cross_chain_tx_fail(); + // It's possible that transaction invalid now, but would be valid later. + // We should keep limited queue for transactions that could become valid. + // We should have the limit to ensure that builder won't get overwhelmed. + best_txs.mark_invalid(tx.signer(), tx.nonce()); + info.invalid_tx_hashes.insert(*tx.tx_hash()); + continue; + } + } + } + } + self.metrics .tx_simulation_duration .record(tx_simulation_start_time.elapsed()); @@ -1219,7 +1292,7 @@ where num_txs_simulated_fail += 1; trace!(target: "payload_builder", ?tx, "skipping reverted transaction"); best_txs.mark_invalid(tx.signer(), tx.nonce()); - info.reverted_tx_hashes.insert(*tx.tx_hash()); + info.invalid_tx_hashes.insert(*tx.tx_hash()); continue; } @@ -1266,6 +1339,36 @@ where Ok(None) } + pub fn validate_supervisor_messages( + &self, + result: &ExecutionResult, + ) -> Result, PayloadBuilderError> { + if let Some(client) = &self.supervisor_client { + let executing_messages = + SupervisorValidator::parse_messages(result.clone().into_logs().as_slice()) + .flatten() + .collect::>(); + if !executing_messages.is_empty() { + self.metrics.inc_num_cross_chain_tx(); + let (channel_tx, rx) = std::sync::mpsc::channel(); + tokio::task::block_in_place(move || { + let res = tokio::runtime::Handle::current().block_on(async { + SupervisorValidator::validate_messages( + client, + executing_messages.as_slice(), + SafetyLevel::CrossUnsafe, + Some(core::time::Duration::from_millis(100)), + ) + .await + }); + let _ = channel_tx.send(res); + }); + return rx.recv().map_err(|_| PayloadBuilderError::ChannelClosed); + } + } + Ok(Ok(())) + } + pub fn add_builder_tx( &self, info: &mut ExecutionInfo, diff --git a/crates/op-rbuilder/src/primitives/kona/api.rs b/crates/op-rbuilder/src/primitives/kona/api.rs new file mode 100644 index 00000000..3bc6fa0a --- /dev/null +++ b/crates/op-rbuilder/src/primitives/kona/api.rs @@ -0,0 +1,15 @@ +//! [Source](https://github.com/op-rs/kona/blob/a1d8ea603960cb4bd3cc19784f7c3365352f1849/crates/node/rpc/src/api.rs) +use crate::primitives::kona::{ExecutingMessage, SafetyLevel}; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; + +/// Supervisor API for interop. +#[rpc(server, client, namespace = "supervisor")] +pub trait SupervisorApi { + /// Checks if the given messages meet the given minimum safety level. + #[method(name = "checkMessages")] + async fn check_messages( + &self, + messages: Vec, + min_safety: SafetyLevel, + ) -> RpcResult<()>; +} diff --git a/crates/op-rbuilder/src/primitives/kona/constants.rs b/crates/op-rbuilder/src/primitives/kona/constants.rs new file mode 100644 index 00000000..ae08b413 --- /dev/null +++ b/crates/op-rbuilder/src/primitives/kona/constants.rs @@ -0,0 +1,6 @@ +//! [Source](https://github.com/op-rs/kona/blob/a1d8ea603960cb4bd3cc19784f7c3365352f1849/crates/protocol/interop/src/constants.rs) + +use alloy_primitives::{address, Address}; + +/// The address of the L2 cross chain inbox predeploy proxy. +pub const CROSS_L2_INBOX_ADDRESS: Address = address!("4200000000000000000000000000000000000022"); diff --git a/crates/op-rbuilder/src/primitives/kona/message.rs b/crates/op-rbuilder/src/primitives/kona/message.rs new file mode 100644 index 00000000..16360faf --- /dev/null +++ b/crates/op-rbuilder/src/primitives/kona/message.rs @@ -0,0 +1,68 @@ +//! [Source](https://github.com/op-rs/kona/blob/a1d8ea603960cb4bd3cc19784f7c3365352f1849/crates/protocol/interop/src/message.rs) + +use alloy_primitives::{keccak256, Bytes, Log}; +use alloy_sol_types::sol; +use derive_more::{AsRef, From}; + +sol! { + /// @notice The struct for a pointer to a message payload in a remote (or local) chain. + #[derive(Default, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] + struct MessageIdentifier { + address origin; + uint256 blockNumber; + uint256 logIndex; + uint256 timestamp; + #[serde(rename = "chainID")] + uint256 chainId; + } + + /// @notice Emitted when a cross chain message is being executed. + /// @param payloadHash Hash of message payload being executed. + /// @param identifier Encoded Identifier of the message. + /// + /// Parameter names are derived from the `op-supervisor` JSON field names. + /// See the relevant definition in the Optimism repository: + /// [Ethereum-Optimism/op-supervisor](https://github.com/ethereum-optimism/optimism/blob/4ba2eb00eafc3d7de2c8ceb6fd83913a8c0a2c0d/op-supervisor/supervisor/types/types.go#L61-L64). + #[derive(Default, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] + event ExecutingMessage(bytes32 indexed payloadHash, MessageIdentifier identifier); + + /// @notice Executes a cross chain message on the destination chain. + /// @param _id Identifier of the message. + /// @param _target Target address to call. + /// @param _message Message payload to call target with. + function executeMessage( + MessageIdentifier calldata _id, + address _target, + bytes calldata _message + ) external; +} + +/// A [RawMessagePayload] is the raw payload of an initiating message. +#[derive(Debug, Clone, From, AsRef, PartialEq, Eq)] +pub struct RawMessagePayload(Bytes); + +impl From<&Log> for RawMessagePayload { + fn from(log: &Log) -> Self { + let mut data = vec![0u8; log.topics().len() * 32 + log.data.data.len()]; + for (i, topic) in log.topics().iter().enumerate() { + data[i * 32..(i + 1) * 32].copy_from_slice(topic.as_ref()); + } + data[(log.topics().len() * 32)..].copy_from_slice(log.data.data.as_ref()); + data.into() + } +} + +impl From> for RawMessagePayload { + fn from(data: Vec) -> Self { + Self(Bytes::from(data)) + } +} + +impl From for ExecutingMessage { + fn from(call: executeMessageCall) -> Self { + Self { + identifier: call._id, + payloadHash: keccak256(call._message.as_ref()), + } + } +} diff --git a/crates/op-rbuilder/src/primitives/kona/mod.rs b/crates/op-rbuilder/src/primitives/kona/mod.rs new file mode 100644 index 00000000..39d3b6ce --- /dev/null +++ b/crates/op-rbuilder/src/primitives/kona/mod.rs @@ -0,0 +1,12 @@ +pub mod traits; +pub use traits::{ExecutingMessageValidator, ExecutingMessageValidatorError}; +pub mod supervisor; +pub use supervisor::SupervisorValidator; +pub mod safety; +pub use safety::SafetyLevel; +pub mod api; +pub use api::SupervisorApiClient; +pub mod message; +pub use message::ExecutingMessage; +pub mod constants; +pub use constants::CROSS_L2_INBOX_ADDRESS; diff --git a/crates/op-rbuilder/src/primitives/kona/safety.rs b/crates/op-rbuilder/src/primitives/kona/safety.rs new file mode 100644 index 00000000..a55d70fa --- /dev/null +++ b/crates/op-rbuilder/src/primitives/kona/safety.rs @@ -0,0 +1,41 @@ +//! [Source](https://github.com/op-rs/kona/blob/a1d8ea603960cb4bd3cc19784f7c3365352f1849/crates/protocol/interop/src/safety.rs) +use derive_more::Display; +/// The safety level of a message. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Display, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum SafetyLevel { + /// The message is finalized. + Finalized, + /// The message is safe. + Safe, + /// The message is safe locally. + LocalSafe, + /// The message is unsafe across chains. + CrossUnsafe, + /// The message is unsafe. + Unsafe, + /// The message is invalid. + Invalid, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_safety_level_serde() { + let level = SafetyLevel::Finalized; + let json = serde_json::to_string(&level).unwrap(); + assert_eq!(json, r#""finalized""#); + + let level: SafetyLevel = serde_json::from_str(&json).unwrap(); + assert_eq!(level, SafetyLevel::Finalized); + } + + #[test] + fn test_serde_safety_level_fails() { + let json = r#""failed""#; + let level: Result = serde_json::from_str(json); + assert!(level.is_err()); + } +} diff --git a/crates/op-rbuilder/src/primitives/kona/supervisor.rs b/crates/op-rbuilder/src/primitives/kona/supervisor.rs new file mode 100644 index 00000000..8d7df304 --- /dev/null +++ b/crates/op-rbuilder/src/primitives/kona/supervisor.rs @@ -0,0 +1,12 @@ +//! This is our custom implementation of validator struct + +use crate::primitives::kona::ExecutingMessageValidator; +use jsonrpsee::http_client::HttpClient; +use std::time::Duration; + +pub struct SupervisorValidator; + +impl ExecutingMessageValidator for SupervisorValidator { + type SupervisorClient = HttpClient; + const DEFAULT_TIMEOUT: Duration = Duration::from_millis(100); +} diff --git a/crates/op-rbuilder/src/primitives/kona/traits.rs b/crates/op-rbuilder/src/primitives/kona/traits.rs new file mode 100644 index 00000000..4e13a296 --- /dev/null +++ b/crates/op-rbuilder/src/primitives/kona/traits.rs @@ -0,0 +1,191 @@ +//! [Source](https://github.com/op-rs/kona/blob/a1d8ea603960cb4bd3cc19784f7c3365352f1849/crates/node/rpc/src/traits.rs) + +use crate::primitives::kona::{ + ExecutingMessage, SafetyLevel, SupervisorApiClient, CROSS_L2_INBOX_ADDRESS, +}; +use alloy_primitives::Log; +use alloy_sol_types::SolEvent; +use async_trait::async_trait; +use core::time::Duration; +use jsonrpsee::{core::ClientError, types::ErrorObjectOwned}; +use tokio::time::error::Elapsed; + +/// Derived from op-supervisor +const UNKNOWN_CHAIN_MSG: &str = "unknown chain: "; +/// Derived from [op-supervisor](https://github.com/ethereum-optimism/optimism/blob/4ba2eb00eafc3d7de2c8ceb6fd83913a8c0a2c0d/op-supervisor/supervisor/backend/backend.go#L479) +const MINIMUM_SAFETY_MSG: &str = "does not meet the minimum safety"; + +/// Failures occurring during validation of [`ExecutingMessage`]s. +#[derive(thiserror::Error, Debug)] +pub enum ExecutingMessageValidatorError { + /// Message does not meet minimum safety level + #[error("message does not meet min safety level, got: {got}, expected: {expected}")] + MinimumSafety { + /// Actual level of the message + got: SafetyLevel, + /// Minimum acceptable level that was passed to supervisor + expected: SafetyLevel, + }, + /// Invalid chain + #[error("unsupported chain id: {0}")] + UnknownChain(u64), + /// Failure from the [`SupervisorApiClient`] when validating messages. + #[error("supervisor determined messages are invalid: {0}")] + RpcClientError(ClientError), + + /// Message validation against the Supervisor took longer than allowed. + #[error("message validation timed out: {0}")] + ValidationTimeout(#[from] Elapsed), + + /// Catch-all variant for other supervisor server errors. + #[error("unexpected error from supervisor: {0}")] + SupervisorServerError(ErrorObjectOwned), +} + +/// Interacts with a Supervisor to validate [`ExecutingMessage`]s. +#[async_trait] +pub trait ExecutingMessageValidator { + /// The supervisor client type. + type SupervisorClient: SupervisorApiClient + Send + Sync; + + /// Default duration that message validation is not allowed to exceed. + const DEFAULT_TIMEOUT: Duration; + + /// Extracts [`ExecutingMessage`]s from the [`Log`] if there are any. + fn parse_messages(logs: &[Log]) -> impl Iterator> { + logs.iter().map(|log| { + (log.address == CROSS_L2_INBOX_ADDRESS && log.topics().len() == 2) + .then(|| ExecutingMessage::decode_log_data(&log.data, true).ok()) + .flatten() + }) + } + + /// Validates a list of [`ExecutingMessage`]s against a Supervisor. + async fn validate_messages( + supervisor: &Self::SupervisorClient, + messages: &[ExecutingMessage], + safety: SafetyLevel, + timeout: Option, + ) -> Result<(), ExecutingMessageValidatorError> { + // Set timeout duration based on input if provided. + let timeout = timeout.map_or(Self::DEFAULT_TIMEOUT, |t| t); + + // Construct the future to validate all messages using supervisor. + let fut = async { supervisor.check_messages(messages.to_vec(), safety).await }; + + // Await the validation future with timeout. + match tokio::time::timeout(timeout, fut) + .await + .map_err(ExecutingMessageValidatorError::ValidationTimeout)? + { + Ok(_) => Ok(()), + Err(err) => match err { + ClientError::Call(err) => Err(err.into()), + _ => Err(ExecutingMessageValidatorError::RpcClientError(err)), + }, + } + } +} + +impl From for ExecutingMessageValidatorError { + fn from(value: ErrorObjectOwned) -> Self { + // Check if it's invalid message call, message example: + // `failed to check message: failed to check log: unknown chain: 14417` + if value.message().contains(UNKNOWN_CHAIN_MSG) { + if let Ok(chain_id) = value + .message() + .split(' ') + .last() + .expect("message contains chain id") + .parse::() + { + return Self::UnknownChain(chain_id); + } + // Check if it's `does not meet the minimum safety` error, message example: + // `message {0x4200000000000000000000000000000000000023 4 1 1728507701 901} + // (safety level: unsafe) does not meet the minimum safety cross-unsafe"` + } else if value.message().contains(MINIMUM_SAFETY_MSG) { + let message_safety = if value.message().contains("safety level: safe") { + SafetyLevel::Safe + } else if value.message().contains("safety level: local-safe") { + SafetyLevel::LocalSafe + } else if value.message().contains("safety level: cross-unsafe") { + SafetyLevel::CrossUnsafe + } else if value.message().contains("safety level: unsafe") { + SafetyLevel::Unsafe + } else if value.message().contains("safety level: invalid") { + SafetyLevel::Invalid + } else { + // Unexpected level name, return generic error + return Self::SupervisorServerError(value); + }; + let expected_safety = if value.message().contains("safety finalized") { + SafetyLevel::Finalized + } else if value.message().contains("safety safe") { + SafetyLevel::Safe + } else if value.message().contains("safety local-safe") { + SafetyLevel::LocalSafe + } else if value.message().contains("safety cross-unsafe") { + SafetyLevel::CrossUnsafe + } else if value.message().contains("safety unsafe") { + SafetyLevel::Unsafe + } else { + // Unexpected level name, return generic error + return Self::SupervisorServerError(value); + }; + + return Self::MinimumSafety { + expected: expected_safety, + got: message_safety, + }; + } + Self::SupervisorServerError(value) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const MIN_SAFETY_ERROR: &str = r#"{"code":-32000,"message":"message {0x4200000000000000000000000000000000000023 4 1 1728507701 901} (safety level: unsafe) does not meet the minimum safety cross-unsafe"}"#; + const INVALID_CHAIN: &str = r#"{"code":-32000,"message":"failed to check message: failed to check log: unknown chain: 14417"}"#; + const INVALID_LEVEL: &str = r#"{"code":-32000,"message":"message {0x4200000000000000000000000000000000000023 1091637521 4369 0 901} (safety level: invalid) does not meet the minimum safety unsafe"}"#; + const RANDOM_ERROR: &str = r#"{"code":-32000,"message":"gibberish error"}"#; + + #[test] + fn test_op_supervisor_error_parsing() { + assert!(matches!( + ExecutingMessageValidatorError::from( + serde_json::from_str::(MIN_SAFETY_ERROR).unwrap() + ), + ExecutingMessageValidatorError::MinimumSafety { + expected: SafetyLevel::CrossUnsafe, + got: SafetyLevel::Unsafe + } + )); + + assert!(matches!( + ExecutingMessageValidatorError::from( + serde_json::from_str::(INVALID_CHAIN).unwrap() + ), + ExecutingMessageValidatorError::UnknownChain(14417) + )); + + assert!(matches!( + ExecutingMessageValidatorError::from( + serde_json::from_str::(INVALID_LEVEL).unwrap() + ), + ExecutingMessageValidatorError::MinimumSafety { + expected: SafetyLevel::Unsafe, + got: SafetyLevel::Invalid + } + )); + + assert!(matches!( + ExecutingMessageValidatorError::from( + serde_json::from_str::(RANDOM_ERROR).unwrap() + ), + ExecutingMessageValidatorError::SupervisorServerError(_) + )); + } +} diff --git a/crates/op-rbuilder/src/primitives/mod.rs b/crates/op-rbuilder/src/primitives/mod.rs new file mode 100644 index 00000000..f06250f6 --- /dev/null +++ b/crates/op-rbuilder/src/primitives/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "optimism")] +pub mod kona; diff --git a/crates/op-rbuilder/src/tester/mod.rs b/crates/op-rbuilder/src/tester/mod.rs index a17b5228..d50454bb 100644 --- a/crates/op-rbuilder/src/tester/mod.rs +++ b/crates/op-rbuilder/src/tester/mod.rs @@ -26,7 +26,6 @@ use reth_payload_builder::PayloadId; use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret}; use rollup_boost::flashblocks::FlashblocksService; use rollup_boost::Flashblocks; -use serde_json; use serde_json::Value; use std::str::FromStr; use std::time::SystemTime;