diff --git a/Cargo.lock b/Cargo.lock index 449214364d..34e2b9baad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1830,6 +1830,7 @@ dependencies = [ "sov-modules-stf-blueprint", "sov-prover-storage-manager", "sov-rollup-interface", + "sov-schema-db", "sov-state", "sp1-helper", "tempfile", diff --git a/bin/citrea/Cargo.toml b/bin/citrea/Cargo.toml index 5f77bcb1da..8559688319 100644 --- a/bin/citrea/Cargo.toml +++ b/bin/citrea/Cargo.toml @@ -74,6 +74,7 @@ citrea-risc0-adapter = { path = "../../crates/risc0", features = ["native", "tes sov-mock-da = { path = "../../crates/sovereign-sdk/adapters/mock-da", default-features = false } sov-prover-storage-manager = { path = "../../crates/sovereign-sdk/full-node/sov-prover-storage-manager", features = ["test-utils"] } sov-rollup-interface = { path = "../../crates/sovereign-sdk/rollup-interface", features = ["testing"] } +sov-schema-db = { path = "../../crates/sovereign-sdk/full-node/db/sov-schema-db" } alloy = { workspace = true, features = ["hyper", "consensus", "rpc-types-eth", "provider-http", "signers", "signer-local"] } alloy-rlp = { workspace = true } diff --git a/bin/citrea/src/main.rs b/bin/citrea/src/main.rs index 268b25c370..a34c07f302 100644 --- a/bin/citrea/src/main.rs +++ b/bin/citrea/src/main.rs @@ -15,7 +15,7 @@ use citrea_common::{from_toml_path, FromEnv, FullNodeConfig}; use citrea_light_client_prover::da_block_handler::StartVariant; use citrea_stf::genesis_config::GenesisPaths; use citrea_stf::runtime::{CitreaRuntime, DefaultContext}; -use citrea_storage_ops::pruning::types::PruningNodeType; +use citrea_storage_ops::pruning::types::StorageNodeType; use clap::Parser; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_util::MetricKindMask; @@ -380,7 +380,7 @@ where if let Some(pruner_service) = pruner_service { task_manager.spawn(|cancellation_token| async move { pruner_service - .run(PruningNodeType::FullNode, cancellation_token) + .run(StorageNodeType::FullNode, cancellation_token) .await }); } diff --git a/bin/citrea/tests/common/helpers.rs b/bin/citrea/tests/common/helpers.rs index 3a3640a76e..a2427e007d 100644 --- a/bin/citrea/tests/common/helpers.rs +++ b/bin/citrea/tests/common/helpers.rs @@ -17,7 +17,7 @@ use citrea_common::{ use citrea_light_client_prover::da_block_handler::StartVariant; use citrea_primitives::TEST_PRIVATE_KEY; use citrea_stf::genesis_config::GenesisPaths; -use citrea_storage_ops::pruning::types::PruningNodeType; +use citrea_storage_ops::pruning::types::StorageNodeType; use citrea_storage_ops::pruning::PruningConfig; use short_header_proof_provider::{ NativeShortHeaderProofProviderService, SHORT_HEADER_PROOF_PROVIDER, @@ -373,7 +373,7 @@ pub async fn start_rollup( if let Some(pruner) = pruner { task_manager.spawn(|cancellation_token| async move { pruner - .run(PruningNodeType::FullNode, cancellation_token) + .run(StorageNodeType::FullNode, cancellation_token) .await }); } diff --git a/bin/citrea/tests/mock/mod.rs b/bin/citrea/tests/mock/mod.rs index 1a0dbf6380..9ce05a858a 100644 --- a/bin/citrea/tests/mock/mod.rs +++ b/bin/citrea/tests/mock/mod.rs @@ -31,6 +31,7 @@ mod mempool; mod proving; mod pruning; mod reopen; +mod rollback; mod sequencer_behaviour; mod sequencer_replacement; mod soft_confirmation_rule_enforcer; diff --git a/bin/citrea/tests/mock/rollback.rs b/bin/citrea/tests/mock/rollback.rs new file mode 100644 index 0000000000..41dbf8d707 --- /dev/null +++ b/bin/citrea/tests/mock/rollback.rs @@ -0,0 +1,848 @@ +use std::net::SocketAddr; +use std::panic::{self, AssertUnwindSafe}; +use std::path::Path; +use std::str::FromStr; +use std::sync::Arc; + +use alloy_primitives::{Address, U256}; +use citrea_common::tasks::manager::TaskManager; +use citrea_common::{BatchProverConfig, SequencerConfig}; +use citrea_stf::genesis_config::GenesisPaths; +use citrea_storage_ops::pruning::types::StorageNodeType; +use citrea_storage_ops::rollback::Rollback; +use futures::FutureExt; +use reth_primitives::{BlockId, BlockNumberOrTag}; +use sov_db::ledger_db::migrations::copy_db_dir_recursive; +use sov_db::ledger_db::{LedgerDB, SharedLedgerOps}; +use sov_db::native_db::NativeDB; +use sov_db::rocks_db_config::RocksdbConfig; +use sov_db::schema::tables::{ + ProofsBySlotNumberV2, VerifiedBatchProofsBySlotNumber, BATCH_PROVER_LEDGER_TABLES, + FULL_NODE_LEDGER_TABLES, SEQUENCER_LEDGER_TABLES, +}; +use sov_db::schema::types::SlotNumber; +use sov_db::state_db::StateDB; +use sov_mock_da::{MockAddress, MockDaService}; +use sov_rollup_interface::rpc::SequencerCommitmentResponse; + +use crate::common::client::TestClient; +use crate::common::helpers::{ + create_default_rollup_config, start_rollup, tempdir_with_children, wait_for_l1_block, + wait_for_l2_block, wait_for_proof, NodeMode, +}; +use crate::common::{make_test_client, TEST_DATA_GENESIS_PATH}; +use crate::mock::evm::init_test_rollup; + +fn instantiate_dbs( + db_path: &Path, + tables: &[&str], +) -> anyhow::Result<(LedgerDB, Arc, Arc)> { + let tables = tables.iter().map(|x| x.to_string()).collect::>(); + let rocksdb_config = RocksdbConfig::new(db_path, None, Some(tables.to_vec())); + let ledger_db = LedgerDB::with_config(&rocksdb_config)?; + let native_db = Arc::new(NativeDB::setup_schema_db(&rocksdb_config)?); + let state_db = Arc::new(StateDB::setup_schema_db(&rocksdb_config)?); + + Ok((ledger_db, native_db, state_db)) +} + +async fn start_sequencer( + sequencer_db_dir: &Path, + da_db_dir: &Path, + restart: bool, +) -> (TaskManager<()>, Box, SocketAddr) { + let sequencer_config = SequencerConfig { + min_soft_confirmations_per_commitment: 10, + test_mode: true, + ..Default::default() + }; + let (seq_port_tx, seq_port_rx) = tokio::sync::oneshot::channel(); + let rollup_config = create_default_rollup_config( + true, + sequencer_db_dir, + da_db_dir, + NodeMode::SequencerNode, + None, + ); + + let seq_task_manager = start_rollup( + seq_port_tx, + GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), + None, + None, + rollup_config, + Some(sequencer_config), + None, + restart, + ) + .await; + + let seq_port = seq_port_rx.await.unwrap(); + let seq_test_client = if restart { + make_test_client(seq_port).await.unwrap() + } else { + init_test_rollup(seq_port).await + }; + + (seq_task_manager, seq_test_client, seq_port) +} + +async fn start_full_node( + full_node_db_dir: &Path, + da_db_dir: &Path, + seq_port: SocketAddr, + restart: bool, +) -> (TaskManager<()>, Box) { + let (full_node_port_tx, full_node_port_rx) = tokio::sync::oneshot::channel(); + let rollup_config = create_default_rollup_config( + true, + full_node_db_dir, + da_db_dir, + NodeMode::FullNode(seq_port), + None, + ); + let full_node_task_manager = start_rollup( + full_node_port_tx, + GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), + None, + None, + rollup_config, + None, + None, + false, + ) + .await; + let full_node_port = full_node_port_rx.await.unwrap(); + let full_node_test_client = if restart { + make_test_client(full_node_port).await.unwrap() + } else { + init_test_rollup(full_node_port).await + }; + + (full_node_task_manager, full_node_test_client) +} + +async fn start_batch_prover( + batch_prover_db_dir: &Path, + da_db_dir: &Path, + seq_port: SocketAddr, + restart: bool, +) -> (TaskManager<()>, Box) { + let (batch_prover_port_tx, batch_prover_port_rx) = tokio::sync::oneshot::channel(); + let rollup_config = create_default_rollup_config( + true, + batch_prover_db_dir, + da_db_dir, + NodeMode::Prover(seq_port), + None, + ); + let batch_prover_task_manager = start_rollup( + batch_prover_port_tx, + GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH), + Some(BatchProverConfig { + proving_mode: citrea_common::ProverGuestRunConfig::Execute, + proof_sampling_number: 0, + enable_recovery: true, + }), + None, + rollup_config, + None, + None, + false, + ) + .await; + + let batch_prover_port = batch_prover_port_rx.await.unwrap(); + let batch_prover_test_client = if restart { + make_test_client(batch_prover_port).await.unwrap() + } else { + init_test_rollup(batch_prover_port).await + }; + + (batch_prover_task_manager, batch_prover_test_client) +} + +async fn rollback_node( + node_type: StorageNodeType, + tables: &[&str], + old_path: &Path, + new_path: &Path, + rollback_l2_height: u64, + rollback_l1_height: u64, + commitment_l2_height: u64, +) -> anyhow::Result<()> { + copy_db_dir_recursive(old_path, new_path).unwrap(); + + let (ledger_db, native_db, state_db) = instantiate_dbs(new_path, tables).unwrap(); + let rollback = Rollback::new(ledger_db.inner(), state_db.clone(), native_db.clone()); + + rollback + .execute( + node_type, + 50, + rollback_l2_height, + rollback_l1_height, + commitment_l2_height, + ) + .await + .unwrap(); + + drop(rollback); + drop(state_db); + drop(native_db); + drop(ledger_db); + + Ok(()) +} + +async fn fill_blocks( + test_client: &TestClient, + da_service: &MockDaService, + addr: &Address, + fullnode_test_client: Option<&TestClient>, +) { + for i in 1..=50 { + // send one ether to some address + let _ = test_client + .send_eth(*addr, None, None, None, 1e18 as u128) + .await + .unwrap(); + + test_client.spam_publish_batch_request().await.unwrap(); + + if i % 10 == 0 { + wait_for_l2_block(test_client, i, None).await; + wait_for_l1_block(da_service, 3 + (i / 10), None).await; + if let Some(fullnode_test_client) = fullnode_test_client { + wait_for_proof(fullnode_test_client, 3 + ((i / 10) * 2), None).await; + } + } + } +} + +async fn assert_dbs( + test_client: &TestClient, + addr: Address, + check_l1_block: Option, + check_l2_block: u64, + balance_at_l2_height: u128, +) { + // Check soft confirmations have been rolled back in Ledger DB + wait_for_l2_block(test_client, check_l2_block, None).await; + + // Suppress output of panics + let prev_hook = panic::take_hook(); + panic::set_hook(Box::new(|_| {})); + + // Check state DB is rolled back. + let get_balance_result = test_client + .eth_get_balance( + addr, + Some(BlockId::Number(BlockNumberOrTag::Number(check_l2_block))), + ) + .await; + assert!(get_balance_result.is_ok()); + assert_eq!( + get_balance_result.unwrap(), + U256::from(balance_at_l2_height) + ); + + // Check native DB is rolled back + let check_block_by_number_result = + AssertUnwindSafe(test_client.eth_get_block_by_number_with_detail(Some( + BlockNumberOrTag::Number(check_l2_block + 1), + ))) + .catch_unwind() + .await; + assert!(check_block_by_number_result.is_err()); + panic::set_hook(prev_hook); + + // Should NOT panic as the data we're requesting here is correct + test_client + .eth_get_block_by_number_with_detail(Some(BlockNumberOrTag::Number(check_l2_block))) + .await; + + let Some(check_l1_block) = check_l1_block else { + return; + }; + let commitments: Vec = test_client + .ledger_get_sequencer_commitments_on_slot_by_number(check_l1_block) + .await + .unwrap() + .unwrap(); + assert_eq!(commitments.len(), 1); +} + +/// Trigger rollback DB data. +#[tokio::test(flavor = "multi_thread")] +async fn test_sequencer_rollback() -> Result<(), anyhow::Error> { + // citrea::initialize_logging(tracing::Level::DEBUG); + + let storage_dir = tempdir_with_children(&["DA", "sequencer"]); + let da_db_dir = storage_dir.path().join("DA").to_path_buf(); + let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); + + let da_service = MockDaService::new(MockAddress::default(), &da_db_dir.clone()); + + // start rollup on da block 3 + for _ in 0..3 { + da_service.publish_test_block().await.unwrap(); + } + wait_for_l1_block(&da_service, 3, None).await; + + let (seq_task_manager, seq_test_client, _seq_port) = + start_sequencer(&sequencer_db_dir, &da_db_dir, false).await; + + let addr = Address::from_str("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92265").unwrap(); + + fill_blocks(&seq_test_client, &da_service, &addr, None).await; + + wait_for_l2_block(&seq_test_client, 50, None).await; + + let get_balance_result = seq_test_client + .eth_get_balance(addr, Some(BlockId::Number(BlockNumberOrTag::Number(50)))) + .await; + assert!(get_balance_result.is_ok()); + assert_eq!( + get_balance_result.unwrap(), + U256::from(50000000000000000000u128) + ); + + seq_task_manager.abort().await; + + // rollback 10 L2 blocks + let rollback_l2_height = 30; + // We have 8 L1 blocks by now and we want to rollback + // the last one. + let rollback_l1_height = 6; + let new_sequencer_db_dir = storage_dir.path().join("sequencer2").to_path_buf(); + rollback_node( + StorageNodeType::Sequencer, + SEQUENCER_LEDGER_TABLES, + &sequencer_db_dir, + &new_sequencer_db_dir, + rollback_l2_height, + rollback_l1_height, + rollback_l2_height, + ) + .await + .unwrap(); + + let (seq_task_manager, seq_test_client, _) = + start_sequencer(&new_sequencer_db_dir, &da_db_dir, true).await; + + assert_dbs(&seq_test_client, addr, None, 30, 30000000000000000000).await; + + seq_task_manager.abort().await; + + Ok(()) +} + +/// Trigger rollback DB data. +#[tokio::test(flavor = "multi_thread")] +async fn test_fullnode_rollback() -> Result<(), anyhow::Error> { + // citrea::initialize_logging(tracing::Level::DEBUG); + + let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]); + let da_db_dir = storage_dir.path().join("DA").to_path_buf(); + let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); + let full_node_db_dir = storage_dir.path().join("full-node").to_path_buf(); + + let da_service = MockDaService::new(MockAddress::default(), &da_db_dir.clone()); + + // start rollup on da block 3 + for _ in 0..3 { + da_service.publish_test_block().await.unwrap(); + } + wait_for_l1_block(&da_service, 3, None).await; + + //------------------ + // Start nodes + //------------------ + let (seq_task_manager, seq_test_client, seq_port) = + start_sequencer(&sequencer_db_dir, &da_db_dir, false).await; + + let (full_node_task_manager, full_node_test_client) = + start_full_node(&full_node_db_dir, &da_db_dir, seq_port, false).await; + + let addr = Address::from_str("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92265").unwrap(); + + //------------------ + // Fill blocks + //------------------ + fill_blocks(&seq_test_client, &da_service, &addr, None).await; + + wait_for_l2_block(&seq_test_client, 50, None).await; + wait_for_l2_block(&full_node_test_client, 50, None).await; + + //------------------ + // Assert data + //------------------ + let get_balance_result = seq_test_client + .eth_get_balance(addr, Some(BlockId::Number(BlockNumberOrTag::Number(50)))) + .await; + assert!(get_balance_result.is_ok()); + assert_eq!( + get_balance_result.unwrap(), + U256::from(50000000000000000000u128) + ); + + let get_balance_result = full_node_test_client + .eth_get_balance(addr, Some(BlockId::Number(BlockNumberOrTag::Number(50)))) + .await; + assert!(get_balance_result.is_ok()); + assert_eq!( + get_balance_result.unwrap(), + U256::from(50000000000000000000u128) + ); + + seq_task_manager.abort().await; + full_node_task_manager.abort().await; + + //------------------ + // Rollback + //------------------ + // rollback 10 L2 blocks + let rollback_l2_height = 30; + // We have 8 L1 blocks by now and we want to rollback + // the last one. + let rollback_l1_height = 6; + + let new_sequencer_db_dir = storage_dir.path().join("sequencer2").to_path_buf(); + rollback_node( + StorageNodeType::Sequencer, + SEQUENCER_LEDGER_TABLES, + &sequencer_db_dir, + &new_sequencer_db_dir, + rollback_l2_height, + rollback_l1_height, + rollback_l2_height, + ) + .await + .unwrap(); + + //------------------ + // Assert state after rollback + //------------------ + let new_full_node_db_dir = storage_dir.path().join("full-node2").to_path_buf(); + rollback_node( + StorageNodeType::FullNode, + FULL_NODE_LEDGER_TABLES, + &full_node_db_dir, + &new_full_node_db_dir, + rollback_l2_height, + rollback_l1_height, + rollback_l2_height, + ) + .await + .unwrap(); + + //------------------ + // Make sure nodes are able to sync after rollback + //------------------ + let new_sequencer_db_dir = storage_dir.path().join("sequencer3").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("sequencer2"), + &new_sequencer_db_dir, + ) + .unwrap(); + let (seq_task_manager, seq_test_client, seq_port) = + start_sequencer(&new_sequencer_db_dir, &da_db_dir, true).await; + + let new_full_node_db_dir = storage_dir.path().join("full-node3").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("full-node2"), + &new_full_node_db_dir, + ) + .unwrap(); + let (full_node_task_manager, full_node_test_client) = + start_full_node(&new_full_node_db_dir, &da_db_dir, seq_port, true).await; + + assert_dbs( + &full_node_test_client, + addr, + Some(rollback_l1_height), + 30, + 30000000000000000000, + ) + .await; + + for _ in 0..10 { + seq_test_client.spam_publish_batch_request().await.unwrap(); + } + wait_for_l2_block(&seq_test_client, 40, None).await; + wait_for_l2_block(&full_node_test_client, 40, None).await; + + seq_task_manager.abort().await; + full_node_task_manager.abort().await; + + Ok(()) +} + +/// Trigger rollback DB data. +/// This test makes sure that a rollback on fullnode withour rolling back sequencer +/// enables fullnode to sync from the rollback point up until latest sequencer block. +#[tokio::test(flavor = "multi_thread")] +async fn test_fullnode_rollback_without_sequencer_rollback() -> Result<(), anyhow::Error> { + // citrea::initialize_logging(tracing::Level::DEBUG); + + let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node"]); + let da_db_dir = storage_dir.path().join("DA").to_path_buf(); + let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); + let full_node_db_dir = storage_dir.path().join("full-node").to_path_buf(); + + let da_service = MockDaService::new(MockAddress::default(), &da_db_dir.clone()); + + // start rollup on da block 3 + for _ in 0..3 { + da_service.publish_test_block().await.unwrap(); + } + wait_for_l1_block(&da_service, 3, None).await; + + //------------------ + // Start nodes + //------------------ + let (seq_task_manager, seq_test_client, seq_port) = + start_sequencer(&sequencer_db_dir, &da_db_dir, false).await; + + let (full_node_task_manager, full_node_test_client) = + start_full_node(&full_node_db_dir, &da_db_dir, seq_port, false).await; + + let addr = Address::from_str("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92265").unwrap(); + + //------------------ + // Fill blocks + //------------------ + fill_blocks(&seq_test_client, &da_service, &addr, None).await; + + wait_for_l2_block(&seq_test_client, 50, None).await; + wait_for_l2_block(&full_node_test_client, 50, None).await; + + seq_task_manager.abort().await; + full_node_task_manager.abort().await; + + //------------------ + // Rollback + //------------------ + // rollback 10 L2 blocks + let rollback_l2_height = 30; + // We have 8 L1 blocks by now and we want to rollback + // the last one. + let rollback_l1_height = 6; + + let new_full_node_db_dir = storage_dir.path().join("full-node2").to_path_buf(); + rollback_node( + StorageNodeType::FullNode, + FULL_NODE_LEDGER_TABLES, + &full_node_db_dir, + &new_full_node_db_dir, + rollback_l2_height, + rollback_l1_height, + rollback_l2_height, + ) + .await + .unwrap(); + + //------------------ + // Make sure nodes are able to sync after rollback + //------------------ + let new_sequencer_db_dir = storage_dir.path().join("sequencer2").to_path_buf(); + copy_db_dir_recursive(&sequencer_db_dir, &new_sequencer_db_dir).unwrap(); + let (seq_task_manager, seq_test_client, seq_port) = + start_sequencer(&new_sequencer_db_dir, &da_db_dir, true).await; + + let new_full_node_db_dir = storage_dir.path().join("full-node3").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("full-node2"), + &new_full_node_db_dir, + ) + .unwrap(); + let (full_node_task_manager, full_node_test_client) = + start_full_node(&new_full_node_db_dir, &da_db_dir, seq_port, true).await; + + for _ in 0..10 { + seq_test_client.spam_publish_batch_request().await.unwrap(); + } + wait_for_l2_block(&seq_test_client, 40, None).await; + wait_for_l2_block(&full_node_test_client, 40, None).await; + + let seq_soft_confirmation = seq_test_client + .ledger_get_head_soft_confirmation() + .await + .unwrap() + .unwrap(); + let full_node_soft_confirmation = full_node_test_client + .ledger_get_head_soft_confirmation() + .await + .unwrap() + .unwrap(); + + assert_eq!( + seq_soft_confirmation.state_root, + full_node_soft_confirmation.state_root + ); + + seq_task_manager.abort().await; + full_node_task_manager.abort().await; + + Ok(()) +} + +/// Trigger rollback DB data. +#[tokio::test(flavor = "multi_thread")] +async fn test_batch_prover_rollback() -> Result<(), anyhow::Error> { + // citrea::initialize_logging(tracing::Level::DEBUG); + + let storage_dir = tempdir_with_children(&["DA", "sequencer", "full-node", "batch-prover"]); + let da_db_dir = storage_dir.path().join("DA").to_path_buf(); + let sequencer_db_dir = storage_dir.path().join("sequencer").to_path_buf(); + let full_node_db_dir = storage_dir.path().join("full-node").to_path_buf(); + let batch_prover_db_dir = storage_dir.path().join("batch-prover").to_path_buf(); + + let da_service = MockDaService::new(MockAddress::default(), &da_db_dir.clone()); + + // start rollup on da block 3 + for _ in 0..3 { + da_service.publish_test_block().await.unwrap(); + } + wait_for_l1_block(&da_service, 3, None).await; + + //------------------ + // Start nodes + //------------------ + let (seq_task_manager, seq_test_client, seq_port) = + start_sequencer(&sequencer_db_dir, &da_db_dir, false).await; + + let (full_node_task_manager, full_node_test_client) = + start_full_node(&full_node_db_dir, &da_db_dir, seq_port, false).await; + + let (batch_prover_task_manager, batch_prover_test_client) = + start_batch_prover(&batch_prover_db_dir, &da_db_dir, seq_port, false).await; + + let addr = Address::from_str("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92265").unwrap(); + + fill_blocks( + &seq_test_client, + &da_service, + &addr, + Some(&full_node_test_client), + ) + .await; + + wait_for_l2_block(&full_node_test_client, 50, None).await; + wait_for_l2_block(&batch_prover_test_client, 50, None).await; + + //------------------ + // Assert sequencer state + //------------------ + let get_balance_result = seq_test_client + .eth_get_balance(addr, Some(BlockId::Number(BlockNumberOrTag::Number(50)))) + .await; + assert!(get_balance_result.is_ok()); + assert_eq!( + get_balance_result.unwrap(), + U256::from(50000000000000000000u128) + ); + + let get_balance_result = full_node_test_client + .eth_get_balance(addr, Some(BlockId::Number(BlockNumberOrTag::Number(50)))) + .await; + assert!(get_balance_result.is_ok()); + assert_eq!( + get_balance_result.unwrap(), + U256::from(50000000000000000000u128) + ); + + let get_balance_result = batch_prover_test_client + .eth_get_balance(addr, Some(BlockId::Number(BlockNumberOrTag::Number(50)))) + .await; + assert!(get_balance_result.is_ok()); + assert_eq!( + get_balance_result.unwrap(), + U256::from(50000000000000000000u128) + ); + + seq_task_manager.abort().await; + full_node_task_manager.abort().await; + batch_prover_task_manager.abort().await; + + //------------------ + // Assert fullnode state + //------------------ + let new_full_node_db_dir = storage_dir.path().join("full-node2").to_path_buf(); + copy_db_dir_recursive(&full_node_db_dir, &new_full_node_db_dir).unwrap(); + + let new_batch_prover_db_dir = storage_dir.path().join("batch-prover2").to_path_buf(); + copy_db_dir_recursive(&batch_prover_db_dir, &new_batch_prover_db_dir).unwrap(); + + // At block 22, full node SHOULD have a verified proof + let (ledger_db, _native_db, _state_db) = + instantiate_dbs(&new_full_node_db_dir, FULL_NODE_LEDGER_TABLES).unwrap(); + let ledger_db = ledger_db.inner(); + assert!(ledger_db + .get::(&SlotNumber(7)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(9)) + .unwrap() + .is_some()); + + // rollback 10 L2 blocks + let rollback_l2_height = 30; + // We have 9 L1 blocks by now and we want to rollback. + let rollback_l1_height = 9; + + let new_full_node_db_dir = storage_dir.path().join("full-node3").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("full-node2"), + &new_full_node_db_dir, + ) + .unwrap(); + let new_batch_prover_db_dir = storage_dir.path().join("batch-prover3").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("batch-prover2"), + &new_batch_prover_db_dir, + ) + .unwrap(); + + //------------------ + // Rollback nodes + //------------------ + let new_sequencer_db_dir = storage_dir.path().join("sequencer3").to_path_buf(); + rollback_node( + StorageNodeType::Sequencer, + SEQUENCER_LEDGER_TABLES, + &sequencer_db_dir, + &new_sequencer_db_dir, + rollback_l2_height, + rollback_l1_height, + rollback_l2_height, + ) + .await + .unwrap(); + + rollback_node( + StorageNodeType::FullNode, + FULL_NODE_LEDGER_TABLES, + &full_node_db_dir, + &new_full_node_db_dir, + rollback_l2_height, + rollback_l1_height, + rollback_l2_height, + ) + .await + .unwrap(); + + rollback_node( + StorageNodeType::BatchProver, + BATCH_PROVER_LEDGER_TABLES, + &batch_prover_db_dir, + &new_batch_prover_db_dir, + rollback_l2_height, + rollback_l1_height, + rollback_l2_height, + ) + .await + .unwrap(); + + //------------------ + // Assert state after rollback + //------------------ + let new_sequencer_db_dir = storage_dir.path().join("sequencer4").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("sequencer3"), + &new_sequencer_db_dir, + ) + .unwrap(); + let new_full_node_db_dir = storage_dir.path().join("full-node4").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("full-node3"), + &new_full_node_db_dir, + ) + .unwrap(); + let new_batch_prover_db_dir = storage_dir.path().join("batch-prover4").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("batch-prover3"), + &new_batch_prover_db_dir, + ) + .unwrap(); + + // At block 11, verified proof in full node should have been pruned. + let (fn_ledger_db, _, _) = + instantiate_dbs(&new_full_node_db_dir, FULL_NODE_LEDGER_TABLES).unwrap(); + let fn_ledger_db = fn_ledger_db.inner(); + assert!(fn_ledger_db + .get::(&SlotNumber(9)) + .unwrap() + .is_some()); + assert!(fn_ledger_db + .get::(&SlotNumber(11)) + .unwrap() + .is_none()); + + // At block 11, verified proof in prover should have been pruned. + let (bp_ledger_db, _, _) = + instantiate_dbs(&new_batch_prover_db_dir, BATCH_PROVER_LEDGER_TABLES).unwrap(); + let bp_ledger_db = bp_ledger_db.inner(); + assert!(bp_ledger_db + .get::(&SlotNumber(8)) + .unwrap() + .is_some()); + assert!(bp_ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + + //------------------ + // Start nodes and make sure they are able to sync + //------------------ + let new_sequencer_db_dir = storage_dir.path().join("sequencer5").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("sequencer4"), + &new_sequencer_db_dir, + ) + .unwrap(); + let (seq_task_manager, seq_test_client, seq_port) = + start_sequencer(&new_sequencer_db_dir, &da_db_dir, true).await; + + let new_full_node_db_dir = storage_dir.path().join("full-node5").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("full-node4"), + &new_full_node_db_dir, + ) + .unwrap(); + let (full_node_task_manager, full_node_test_client) = + start_full_node(&new_full_node_db_dir, &da_db_dir, seq_port, true).await; + + let new_batch_prover_db_dir = storage_dir.path().join("batch-prover5").to_path_buf(); + copy_db_dir_recursive( + &storage_dir.path().join("batch-prover4"), + &new_batch_prover_db_dir, + ) + .unwrap(); + let (batch_prover_task_manager, batch_prover_test_client) = + start_batch_prover(&new_batch_prover_db_dir, &da_db_dir, seq_port, true).await; + + assert_dbs( + &batch_prover_test_client, + addr, + None, + 30, + 30000000000000000000, + ) + .await; + + for _ in 0..10 { + seq_test_client.spam_publish_batch_request().await.unwrap(); + } + wait_for_l2_block(&seq_test_client, 40, None).await; + wait_for_l2_block(&full_node_test_client, 40, None).await; + wait_for_l2_block(&batch_prover_test_client, 40, None).await; + + seq_task_manager.abort().await; + full_node_task_manager.abort().await; + batch_prover_task_manager.abort().await; + + Ok(()) +} diff --git a/bin/cli/src/commands/mod.rs b/bin/cli/src/commands/mod.rs index 58e864a9b8..7142ae85ba 100644 --- a/bin/cli/src/commands/mod.rs +++ b/bin/cli/src/commands/mod.rs @@ -1,7 +1,43 @@ +pub(crate) use backup::*; +use citrea_storage_ops::pruning::types::StorageNodeType; +use clap::ValueEnum; +pub(crate) use prune::*; +pub(crate) use rollback::*; +use sov_db::schema::tables::{ + BATCH_PROVER_LEDGER_TABLES, FULL_NODE_LEDGER_TABLES, LIGHT_CLIENT_PROVER_LEDGER_TABLES, + SEQUENCER_LEDGER_TABLES, +}; + mod backup; mod prune; mod rollback; -pub(crate) use backup::*; -pub(crate) use prune::*; -pub(crate) use rollback::*; +#[derive(Copy, Clone, ValueEnum)] +pub enum StorageNodeTypeArg { + Sequencer, + FullNode, + BatchProver, + LightClient, +} + +impl From for StorageNodeType { + fn from(value: StorageNodeTypeArg) -> Self { + match value { + StorageNodeTypeArg::Sequencer => StorageNodeType::Sequencer, + StorageNodeTypeArg::FullNode => StorageNodeType::FullNode, + StorageNodeTypeArg::BatchProver => StorageNodeType::BatchProver, + StorageNodeTypeArg::LightClient => StorageNodeType::LightClient, + } + } +} + +pub(crate) fn cfs_from_node_type(node_type: StorageNodeTypeArg) -> Vec { + let cfs = match node_type { + StorageNodeTypeArg::Sequencer => SEQUENCER_LEDGER_TABLES, + StorageNodeTypeArg::FullNode => FULL_NODE_LEDGER_TABLES, + StorageNodeTypeArg::BatchProver => BATCH_PROVER_LEDGER_TABLES, + StorageNodeTypeArg::LightClient => LIGHT_CLIENT_PROVER_LEDGER_TABLES, + }; + + cfs.iter().map(|x| x.to_string()).collect::>() +} diff --git a/bin/cli/src/commands/prune.rs b/bin/cli/src/commands/prune.rs index 60498053b7..b459ed390c 100644 --- a/bin/cli/src/commands/prune.rs +++ b/bin/cli/src/commands/prune.rs @@ -1,40 +1,18 @@ use std::path::PathBuf; use std::sync::Arc; -use citrea_storage_ops::pruning::types::PruningNodeType; use citrea_storage_ops::pruning::{Pruner, PruningConfig}; -use clap::ValueEnum; use sov_db::ledger_db::{LedgerDB, SharedLedgerOps}; use sov_db::native_db::NativeDB; use sov_db::rocks_db_config::RocksdbConfig; -use sov_db::schema::tables::{ - BATCH_PROVER_LEDGER_TABLES, FULL_NODE_LEDGER_TABLES, LIGHT_CLIENT_PROVER_LEDGER_TABLES, - SEQUENCER_LEDGER_TABLES, -}; use sov_db::state_db::StateDB; use tracing::{debug, info}; -#[derive(Copy, Clone, ValueEnum)] -pub enum PruningNodeTypeArg { - Sequencer, - FullNode, - BatchProver, - LightClient, -} - -impl From for PruningNodeType { - fn from(value: PruningNodeTypeArg) -> Self { - match value { - PruningNodeTypeArg::Sequencer => PruningNodeType::Sequencer, - PruningNodeTypeArg::FullNode => PruningNodeType::FullNode, - PruningNodeTypeArg::BatchProver => PruningNodeType::BatchProver, - PruningNodeTypeArg::LightClient => PruningNodeType::LightClient, - } - } -} +use super::StorageNodeTypeArg; +use crate::commands::cfs_from_node_type; pub(crate) async fn prune( - node_type: PruningNodeTypeArg, + node_type: StorageNodeTypeArg, db_path: PathBuf, distance: u64, ) -> anyhow::Result<()> { @@ -75,14 +53,3 @@ pub(crate) async fn prune( } Ok(()) } - -fn cfs_from_node_type(node_type: PruningNodeTypeArg) -> Vec { - let cfs = match node_type { - PruningNodeTypeArg::Sequencer => SEQUENCER_LEDGER_TABLES, - PruningNodeTypeArg::FullNode => FULL_NODE_LEDGER_TABLES, - PruningNodeTypeArg::BatchProver => BATCH_PROVER_LEDGER_TABLES, - PruningNodeTypeArg::LightClient => LIGHT_CLIENT_PROVER_LEDGER_TABLES, - }; - - cfs.iter().map(|x| x.to_string()).collect::>() -} diff --git a/bin/cli/src/commands/rollback.rs b/bin/cli/src/commands/rollback.rs index da40db8f4e..040d2bcd27 100644 --- a/bin/cli/src/commands/rollback.rs +++ b/bin/cli/src/commands/rollback.rs @@ -1,3 +1,51 @@ -pub(crate) async fn rollback(_num_block: u32) -> anyhow::Result<()> { +use std::path::PathBuf; +use std::sync::Arc; + +use citrea_storage_ops::rollback::Rollback; +use sov_db::ledger_db::{LedgerDB, SharedLedgerOps}; +use sov_db::native_db::NativeDB; +use sov_db::rocks_db_config::RocksdbConfig; +use sov_db::state_db::StateDB; +use tracing::info; + +use super::StorageNodeTypeArg; +use crate::commands::cfs_from_node_type; + +pub(crate) async fn rollback( + node_type: StorageNodeTypeArg, + db_path: PathBuf, + l2_target: u64, + l1_target: u64, + last_sequencer_commitment_l2_height: u64, +) -> anyhow::Result<()> { + info!( + "Rolling back DB at {} down to L2 {}, L1 {}", + db_path.display(), + l2_target, + l1_target, + ); + + let column_families = cfs_from_node_type(node_type); + + let rocksdb_config = RocksdbConfig::new(&db_path, None, Some(column_families.to_vec())); + let ledger_db = LedgerDB::with_config(&rocksdb_config)?; + let native_db = NativeDB::setup_schema_db(&rocksdb_config)?; + let state_db = StateDB::setup_schema_db(&rocksdb_config)?; + + let Some(soft_confirmation_number) = ledger_db.get_head_soft_confirmation_height()? else { + return Ok(()); + }; + + let rollback = Rollback::new(ledger_db.inner(), Arc::new(state_db), Arc::new(native_db)); + rollback + .execute( + node_type.into(), + soft_confirmation_number, + l2_target, + l1_target, + last_sequencer_commitment_l2_height, + ) + .await?; + Ok(()) } diff --git a/bin/cli/src/main.rs b/bin/cli/src/main.rs index 77bdd7d6af..6690ca60ae 100644 --- a/bin/cli/src/main.rs +++ b/bin/cli/src/main.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use clap::{Parser, Subcommand, ValueEnum}; -use commands::PruningNodeTypeArg; +use commands::StorageNodeTypeArg; use tracing_subscriber::fmt; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -41,7 +41,7 @@ enum Commands { /// Prune old DB entries Prune { #[arg(long)] - node_type: PruningNodeTypeArg, + node_type: StorageNodeTypeArg, /// The path of the database to prune #[arg(long)] db_path: PathBuf, @@ -51,12 +51,20 @@ enum Commands { }, /// Rollback the most recent N blocks Rollback { + #[arg(long)] + node_type: StorageNodeTypeArg, /// The path of the database to prune #[arg(long)] db_path: PathBuf, - /// The number of blocks to rollback + /// The target L2 block number to rollback to (non-inclusive) + #[arg(long)] + l2_target: u64, + /// The target L1 block number to rollback to (non-inclusive) + #[arg(long)] + l1_target: u64, + /// The L2 block number at which there was a sequencer commitment that was sent. #[arg(long)] - blocks: u32, + sequencer_commitment_l2_height: u64, }, /// Backup DBs Backup { @@ -89,10 +97,20 @@ async fn main() -> anyhow::Result<()> { commands::prune(node_type, db_path.clone(), distance).await?; } Commands::Rollback { - db_path: _db_path, - blocks, + node_type, + db_path, + l2_target, + l1_target, + sequencer_commitment_l2_height, } => { - commands::rollback(blocks).await?; + commands::rollback( + node_type, + db_path.clone(), + l2_target, + l1_target, + sequencer_commitment_l2_height, + ) + .await?; } Commands::Backup { db_path, diff --git a/crates/storage-ops/src/lib.rs b/crates/storage-ops/src/lib.rs index 3f835e181e..79f0997396 100644 --- a/crates/storage-ops/src/lib.rs +++ b/crates/storage-ops/src/lib.rs @@ -1,5 +1,7 @@ pub mod pruning; pub mod rollback; +pub(crate) mod macros; #[cfg(test)] mod tests; +pub(crate) mod utils; diff --git a/crates/storage-ops/src/macros.rs b/crates/storage-ops/src/macros.rs new file mode 100644 index 0000000000..3bd4512e79 --- /dev/null +++ b/crates/storage-ops/src/macros.rs @@ -0,0 +1,18 @@ +#[macro_export] +macro_rules! log_result_or_error { + ($tables_group:literal, $call:expr) => {{ + match $call { + Ok(result) => { + tracing::debug!("Deleted {} records from {} group", result, $tables_group); + } + Err(e) => { + tracing::error!( + "Failed to prune ledger's {} table group: {:?}", + $tables_group, + e + ); + return; + } + } + }}; +} diff --git a/crates/storage-ops/src/pruning/components/ledger_db/mod.rs b/crates/storage-ops/src/pruning/components/ledger_db/mod.rs index cc309e85f2..2321e572bd 100644 --- a/crates/storage-ops/src/pruning/components/ledger_db/mod.rs +++ b/crates/storage-ops/src/pruning/components/ledger_db/mod.rs @@ -1,59 +1,43 @@ use std::sync::Arc; +use slots::prune_slots; +use soft_confirmations::prune_soft_confirmations; use sov_schema_db::DB; -use tracing::{debug, error}; +use tracing::debug; -use self::slots::prune_slots; -use self::soft_confirmations::prune_soft_confirmations; -use crate::pruning::types::PruningNodeType; +use crate::log_result_or_error; +use crate::pruning::types::StorageNodeType; mod slots; mod soft_confirmations; -macro_rules! log_result_or_error { - ($tables_group:literal, $call:expr) => {{ - match $call { - Ok(result) => { - debug!("Deleted {} records from {} group", result, $tables_group); - } - Err(e) => { - error!( - "Failed to prune ledger's {} table group: {:?}", - $tables_group, e - ); - return; - } - } - }}; -} - /// Prune ledger -pub(crate) fn prune_ledger(node_type: PruningNodeType, ledger_db: Arc, up_to_block: u64) { +pub(crate) fn prune_ledger(node_type: StorageNodeType, ledger_db: Arc, up_to_block: u64) { debug!("Pruning Ledger, up to L2 block {}", up_to_block); match node_type { - PruningNodeType::Sequencer => { + StorageNodeType::Sequencer => { log_result_or_error!( "soft_confirmations", prune_soft_confirmations(node_type, &ledger_db, up_to_block) ); log_result_or_error!("slots", prune_slots(node_type, &ledger_db, up_to_block)); } - PruningNodeType::FullNode => { + StorageNodeType::FullNode => { log_result_or_error!( "soft_confirmations", prune_soft_confirmations(node_type, &ledger_db, up_to_block) ); log_result_or_error!("slots", prune_slots(node_type, &ledger_db, up_to_block)); } - PruningNodeType::BatchProver => { + StorageNodeType::BatchProver => { log_result_or_error!( "soft_confirmations", prune_soft_confirmations(node_type, &ledger_db, up_to_block) ); log_result_or_error!("slots", prune_slots(node_type, &ledger_db, up_to_block)); } - PruningNodeType::LightClient => { + StorageNodeType::LightClient => { log_result_or_error!( "soft_confirmations", prune_soft_confirmations(node_type, &ledger_db, up_to_block) diff --git a/crates/storage-ops/src/pruning/components/ledger_db/slots.rs b/crates/storage-ops/src/pruning/components/ledger_db/slots.rs index 0ab8898cbd..c8c76939d6 100644 --- a/crates/storage-ops/src/pruning/components/ledger_db/slots.rs +++ b/crates/storage-ops/src/pruning/components/ledger_db/slots.rs @@ -1,14 +1,14 @@ use sov_db::schema::tables::{ - CommitmentsByNumber, L2RangeByL1Height, LightClientProofBySlotNumber, ProofsBySlotNumber, - ProofsBySlotNumberV2, SlotByHash, VerifiedBatchProofsBySlotNumber, + L2RangeByL1Height, ShortHeaderProofBySlotHash, SlotByHash, VerifiedBatchProofsBySlotNumber, }; use sov_db::schema::types::{SlotNumber, SoftConfirmationNumber}; use sov_schema_db::{ScanDirection, DB}; -use crate::pruning::types::PruningNodeType; +use crate::pruning::types::StorageNodeType; +use crate::utils::delete_slots_by_number; pub(crate) fn prune_slots( - node_type: PruningNodeType, + node_type: StorageNodeType, ledger_db: &DB, up_to_block: u64, ) -> anyhow::Result { @@ -28,24 +28,15 @@ pub(crate) fn prune_slots( if slot_range.1 > SoftConfirmationNumber(up_to_block) { break; } - ledger_db.delete::(&slot_height)?; - ledger_db.delete::(&slot_height)?; - if !matches!(node_type, PruningNodeType::Sequencer) { - prune_slot_by_hash(ledger_db, slot_height)?; - } - - if matches!(node_type, PruningNodeType::FullNode) { - ledger_db.delete::(&slot_height)?; - } + delete_slots_by_number(node_type, ledger_db, slot_height)?; - if matches!(node_type, PruningNodeType::BatchProver) { - ledger_db.delete::(&slot_height)?; - ledger_db.delete::(&slot_height)?; + if !matches!(node_type, StorageNodeType::Sequencer) { + prune_slot_by_hash(node_type, ledger_db, slot_height)?; } - if matches!(node_type, PruningNodeType::LightClient) { - ledger_db.delete::(&slot_height)?; + if matches!(node_type, StorageNodeType::FullNode) { + prune_verified_proofs_by_slot_number(ledger_db, slot_height)?; } deleted += 1; @@ -54,7 +45,11 @@ pub(crate) fn prune_slots( Ok(deleted) } -fn prune_slot_by_hash(ledger_db: &DB, slot_height: SlotNumber) -> anyhow::Result<()> { +fn prune_slot_by_hash( + node_type: StorageNodeType, + ledger_db: &DB, + slot_number: SlotNumber, +) -> anyhow::Result<()> { let mut slots = ledger_db.iter_with_direction::(Default::default(), ScanDirection::Forward)?; slots.seek_to_first(); @@ -64,9 +59,41 @@ fn prune_slot_by_hash(ledger_db: &DB, slot_height: SlotNumber) -> anyhow::Result continue; }; - if record.value < slot_height { - ledger_db.delete::(&record.key)?; + if record.value > slot_number { + break; + } + + if !matches!(node_type, StorageNodeType::LightClient) { + ledger_db.delete::(&record.key)?; + } + + ledger_db.delete::(&record.key)?; + } + + Ok(()) +} + +fn prune_verified_proofs_by_slot_number( + ledger_db: &DB, + slot_number: SlotNumber, +) -> anyhow::Result<()> { + let mut verified_proofs_by_number = ledger_db + .iter_with_direction::( + Default::default(), + ScanDirection::Forward, + )?; + verified_proofs_by_number.seek_to_first(); + + for record in verified_proofs_by_number { + let Ok(record) = record else { + continue; + }; + + if record.key > slot_number { + break; } + + ledger_db.delete::(&record.key)?; } Ok(()) diff --git a/crates/storage-ops/src/pruning/components/ledger_db/soft_confirmations.rs b/crates/storage-ops/src/pruning/components/ledger_db/soft_confirmations.rs index a2e1bf51b5..3ec596b74c 100644 --- a/crates/storage-ops/src/pruning/components/ledger_db/soft_confirmations.rs +++ b/crates/storage-ops/src/pruning/components/ledger_db/soft_confirmations.rs @@ -1,14 +1,12 @@ -use sov_db::schema::tables::{ - L2Witness, ProverStateDiffs, SoftConfirmationByHash, SoftConfirmationByNumber, - SoftConfirmationStatus, -}; +use sov_db::schema::tables::SoftConfirmationByNumber; use sov_db::schema::types::SoftConfirmationNumber; use sov_schema_db::{ScanDirection, DB}; -use crate::pruning::types::PruningNodeType; +use crate::pruning::types::StorageNodeType; +use crate::utils::delete_soft_confirmations_by_number; pub(crate) fn prune_soft_confirmations( - node_type: PruningNodeType, + node_type: StorageNodeType, ledger_db: &DB, up_to_block: u64, ) -> anyhow::Result { @@ -29,20 +27,13 @@ pub(crate) fn prune_soft_confirmations( if soft_confirmation_number > SoftConfirmationNumber(up_to_block) { break; } - ledger_db.delete::(&soft_confirmation_number)?; - if matches!(node_type, PruningNodeType::LightClient) { - continue; - } - - let soft_confirmation = record.value; - ledger_db.delete::(&soft_confirmation.hash)?; - ledger_db.delete::(&soft_confirmation_number)?; - - if matches!(node_type, PruningNodeType::BatchProver) { - ledger_db.delete::(&soft_confirmation_number)?; - ledger_db.delete::(&soft_confirmation_number)?; - } + delete_soft_confirmations_by_number( + node_type, + ledger_db, + soft_confirmation_number, + record.value.hash, + )?; deleted += 1; } diff --git a/crates/storage-ops/src/pruning/mod.rs b/crates/storage-ops/src/pruning/mod.rs index 53ca22b33b..61ab016cc5 100644 --- a/crates/storage-ops/src/pruning/mod.rs +++ b/crates/storage-ops/src/pruning/mod.rs @@ -4,7 +4,7 @@ use futures::future; use serde::{Deserialize, Serialize}; use sov_db::schema::tables::{LastPrunedBlock, LastPrunedL2Height}; use tracing::info; -use types::PruningNodeType; +use types::StorageNodeType; use self::components::{prune_ledger, prune_native_db}; use self::criteria::{Criteria, DistanceCriteria}; @@ -73,7 +73,7 @@ impl Pruner { } /// Prune everything - pub async fn prune(&self, node_type: PruningNodeType, up_to_block: u64) { + pub async fn prune(&self, node_type: StorageNodeType, up_to_block: u64) { info!("Pruning up to L2 block: {}", up_to_block); let ledger_db = self.ledger_db.clone(); diff --git a/crates/storage-ops/src/pruning/service.rs b/crates/storage-ops/src/pruning/service.rs index 65aec00c8f..a4d1691d59 100644 --- a/crates/storage-ops/src/pruning/service.rs +++ b/crates/storage-ops/src/pruning/service.rs @@ -3,7 +3,7 @@ use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; -use super::types::PruningNodeType; +use super::types::StorageNodeType; use super::Pruner; pub struct PrunerService { @@ -27,7 +27,7 @@ impl PrunerService { } } - pub async fn run(mut self, node_type: PruningNodeType, cancellation_token: CancellationToken) { + pub async fn run(mut self, node_type: StorageNodeType, cancellation_token: CancellationToken) { loop { select! { biased; diff --git a/crates/storage-ops/src/pruning/types.rs b/crates/storage-ops/src/pruning/types.rs index 1213a4a453..68f802e372 100644 --- a/crates/storage-ops/src/pruning/types.rs +++ b/crates/storage-ops/src/pruning/types.rs @@ -1,5 +1,5 @@ #[derive(Copy, Clone)] -pub enum PruningNodeType { +pub enum StorageNodeType { Sequencer, FullNode, BatchProver, diff --git a/crates/storage-ops/src/rollback/components/ledger_db/mod.rs b/crates/storage-ops/src/rollback/components/ledger_db/mod.rs new file mode 100644 index 0000000000..636b17d887 --- /dev/null +++ b/crates/storage-ops/src/rollback/components/ledger_db/mod.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use slots::{rollback_light_client_slots, rollback_slots}; +use soft_confirmations::rollback_soft_confirmations; +use tracing::debug; + +use crate::log_result_or_error; +use crate::pruning::types::StorageNodeType; + +mod slots; +mod soft_confirmations; + +/// Rollback native DB +pub(crate) fn rollback_ledger_db( + node_type: StorageNodeType, + ledger_db: Arc, + target_l2: u64, + target_l1: u64, + last_sequencer_commitment_l2_height: u64, +) { + debug!( + "Rolling back Ledger, down to L2 block {}, L1 block {}", + target_l2, target_l1 + ); + + log_result_or_error!( + "soft_confirmations", + rollback_soft_confirmations( + node_type, + &ledger_db, + target_l2, + last_sequencer_commitment_l2_height, + ) + ); + match node_type { + StorageNodeType::LightClient => { + log_result_or_error!( + "slots", + rollback_light_client_slots(node_type, &ledger_db, target_l1,) + ); + } + _ => { + log_result_or_error!("slots", rollback_slots(node_type, &ledger_db, target_l1,)); + } + } +} diff --git a/crates/storage-ops/src/rollback/components/ledger_db/slots.rs b/crates/storage-ops/src/rollback/components/ledger_db/slots.rs new file mode 100644 index 0000000000..fca2f999f1 --- /dev/null +++ b/crates/storage-ops/src/rollback/components/ledger_db/slots.rs @@ -0,0 +1,131 @@ +use sov_db::schema::tables::{ + CommitmentsByNumber, LightClientProofBySlotNumber, ShortHeaderProofBySlotHash, SlotByHash, + VerifiedBatchProofsBySlotNumber, +}; +use sov_db::schema::types::SlotNumber; +use sov_schema_db::{ScanDirection, DB}; + +use crate::pruning::types::StorageNodeType; +use crate::utils::delete_slots_by_number; + +pub(crate) fn rollback_slots( + node_type: StorageNodeType, + ledger_db: &DB, + target_l1: u64, +) -> anyhow::Result { + let mut commitments_by_number = ledger_db + .iter_with_direction::(Default::default(), ScanDirection::Backward)?; + commitments_by_number.seek_to_last(); + + let mut deleted = 0; + for record in commitments_by_number { + let Ok(record) = record else { + continue; + }; + + let slot_height = record.key; + + if slot_height <= SlotNumber(target_l1) { + break; + } + + delete_slots_by_number(node_type, ledger_db, slot_height)?; + + if !matches!(node_type, StorageNodeType::Sequencer) { + rollback_slot_by_hash(node_type, ledger_db, slot_height)?; + } + + if matches!(node_type, StorageNodeType::FullNode) { + rollback_verified_proofs_by_slot_number(ledger_db, slot_height)?; + } + + deleted += 1; + } + + Ok(deleted) +} + +pub(crate) fn rollback_light_client_slots( + node_type: StorageNodeType, + ledger_db: &DB, + target_l1: u64, +) -> anyhow::Result { + let mut proof_by_slot_number = ledger_db.iter_with_direction::( + Default::default(), + ScanDirection::Backward, + )?; + proof_by_slot_number.seek_to_last(); + + let mut deleted = 0; + for record in proof_by_slot_number { + let Ok(record) = record else { + continue; + }; + + let slot_height = record.key; + + if slot_height <= SlotNumber(target_l1) { + break; + } + + delete_slots_by_number(node_type, ledger_db, slot_height)?; + + deleted += 1; + } + + Ok(deleted) +} + +fn rollback_slot_by_hash( + node_type: StorageNodeType, + ledger_db: &DB, + slot_number: SlotNumber, +) -> anyhow::Result<()> { + let mut slots = + ledger_db.iter_with_direction::(Default::default(), ScanDirection::Backward)?; + slots.seek_to_last(); + + for record in slots { + let Ok(record) = record else { + continue; + }; + + if record.value < slot_number { + break; + } + + if !matches!(node_type, StorageNodeType::LightClient) { + ledger_db.delete::(&record.key)?; + } + + ledger_db.delete::(&record.key)?; + } + + Ok(()) +} + +fn rollback_verified_proofs_by_slot_number( + ledger_db: &DB, + slot_number: SlotNumber, +) -> anyhow::Result<()> { + let mut verified_proofs_by_number = ledger_db + .iter_with_direction::( + Default::default(), + ScanDirection::Backward, + )?; + verified_proofs_by_number.seek_to_last(); + + for record in verified_proofs_by_number { + let Ok(record) = record else { + continue; + }; + + if record.key < slot_number { + break; + } + + ledger_db.delete::(&record.key)?; + } + + Ok(()) +} diff --git a/crates/storage-ops/src/rollback/components/ledger_db/soft_confirmations.rs b/crates/storage-ops/src/rollback/components/ledger_db/soft_confirmations.rs new file mode 100644 index 0000000000..abeaae433a --- /dev/null +++ b/crates/storage-ops/src/rollback/components/ledger_db/soft_confirmations.rs @@ -0,0 +1,52 @@ +use sov_db::schema::tables::{LastSequencerCommitmentSent, SoftConfirmationByNumber}; +use sov_db::schema::types::SoftConfirmationNumber; +use sov_schema_db::{ScanDirection, DB}; + +use crate::pruning::types::StorageNodeType; +use crate::utils::delete_soft_confirmations_by_number; + +pub(crate) fn rollback_soft_confirmations( + node_type: StorageNodeType, + ledger_db: &DB, + target_l2: u64, + last_sequencer_commitment_l2_height: u64, +) -> anyhow::Result { + let mut soft_confirmations = ledger_db.iter_with_direction::( + Default::default(), + ScanDirection::Backward, + )?; + soft_confirmations.seek_to_last(); + + let mut deleted = 0; + for record in soft_confirmations { + let Ok(record) = record else { + continue; + }; + + let soft_confirmation_number = record.key; + + if soft_confirmation_number <= SoftConfirmationNumber(target_l2) { + break; + } + + delete_soft_confirmations_by_number( + node_type, + ledger_db, + soft_confirmation_number, + record.value.hash, + )?; + + deleted += 1; + } + + if matches!(node_type, StorageNodeType::Sequencer) + || matches!(node_type, StorageNodeType::FullNode) + { + ledger_db.put::( + &(), + &SoftConfirmationNumber(last_sequencer_commitment_l2_height), + )?; + } + + Ok(deleted) +} diff --git a/crates/storage-ops/src/rollback/components/mod.rs b/crates/storage-ops/src/rollback/components/mod.rs new file mode 100644 index 0000000000..f1decd0dd2 --- /dev/null +++ b/crates/storage-ops/src/rollback/components/mod.rs @@ -0,0 +1,7 @@ +mod ledger_db; +mod native_db; +mod state_db; + +pub(crate) use ledger_db::*; +pub(crate) use native_db::*; +pub(crate) use state_db::*; diff --git a/crates/storage-ops/src/rollback/components/native_db.rs b/crates/storage-ops/src/rollback/components/native_db.rs new file mode 100644 index 0000000000..a1fb372d60 --- /dev/null +++ b/crates/storage-ops/src/rollback/components/native_db.rs @@ -0,0 +1,38 @@ +use std::sync::Arc; + +use sov_db::schema::tables::ModuleAccessoryState; +use sov_schema_db::ScanDirection; +use tracing::{debug, error}; + +/// Rollback native DB +pub(crate) fn rollback_native_db(native_db: Arc, down_to_block: u64) { + debug!("Rolling back native DB, down to L2 block {}", down_to_block); + + let target_version = down_to_block + 1; + + let Ok(mut iter) = native_db + .iter_with_direction::(Default::default(), ScanDirection::Backward) + else { + return; + }; + + iter.seek_to_last(); + + let mut counter = 0u32; + let mut keys_to_delete = vec![]; + while let Some(Ok(entry)) = iter.next() { + let version = entry.key.1; + // The version value is always ahead of block number by one. + if version > target_version { + keys_to_delete.push(entry.key); + counter += 1; + } + } + + if let Err(e) = native_db.delete_batch::(keys_to_delete) { + error!("Failed to delete native DB entry {:?}", e); + return; + } + + debug!("Rolled back {} native DB records", counter); +} diff --git a/crates/storage-ops/src/rollback/components/state_db.rs b/crates/storage-ops/src/rollback/components/state_db.rs new file mode 100644 index 0000000000..5b2b2b86ad --- /dev/null +++ b/crates/storage-ops/src/rollback/components/state_db.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; + +use jmt::storage::Node; +use sov_db::schema::tables::{JmtNodes, JmtValues, KeyHashToKey}; +use sov_schema_db::{ScanDirection, SchemaBatch}; +use tracing::{error, info}; + +/// Rollback state DB +pub(crate) fn rollback_state_db(state_db: Arc, down_to_block: u64) { + info!("Rolling back state DB, down to L2 block {}", down_to_block); + + let target_version = down_to_block + 1; + + let mut indices = state_db + .iter_with_direction::(Default::default(), ScanDirection::Backward) + .expect("Tried to rollback state DB but could not obtain an iterator"); + + indices.seek_to_last(); + + let mut deletions = 0; + + let mut batch = SchemaBatch::new(); + for index in indices { + let Ok(index) = index else { + continue; + }; + + let node_key = index.key; + let node = index.value; + + // Exit loop if we go down below the target block + if node_key.version() <= target_version { + break; + } + + let key_hash = match node { + Node::Null => continue, + Node::Internal(_) => { + if let Err(e) = batch.delete::(&node_key) { + error!( + "Could not add JMT node deletion to schema batch operation: {:?}", + e + ); + } + + deletions += 1; + continue; + } + Node::Leaf(leaf) => leaf.key_hash(), + }; + + let key_preimage = match state_db.get::(&key_hash.0) { + Ok(Some(key)) => key, + _ => { + error!("Could not read key from key hash"); + continue; + } + }; + + if let Err(e) = batch.delete::(&(key_preimage, node_key.version())) { + error!( + "Could not add JMT value deletion to schema batch operation: {:?}", + e + ); + } + + if let Err(e) = batch.delete::(&node_key) { + error!( + "Could not add JMT node deletion to schema batch operation: {:?}", + e + ); + } + + deletions += 2; + } + + if let Err(e) = state_db.write_schemas(batch) { + error!("Could not delete state data: {:?}", e); + } + + info!("Rolled back {} records from state DB", deletions); +} diff --git a/crates/storage-ops/src/rollback/mod.rs b/crates/storage-ops/src/rollback/mod.rs index ee48880323..60190888b6 100644 --- a/crates/storage-ops/src/rollback/mod.rs +++ b/crates/storage-ops/src/rollback/mod.rs @@ -1,11 +1,74 @@ +use std::sync::Arc; + +use components::{rollback_ledger_db, rollback_native_db, rollback_state_db}; +use futures::future; +use tracing::info; + +use crate::pruning::types::StorageNodeType; + +mod components; pub mod service; -pub struct Rollback {} +pub struct Rollback { + /// Access to ledger tables. + ledger_db: Arc, + /// Access to native DB. + native_db: Arc, + /// Access to state DB. + state_db: Arc, +} impl Rollback { - /// Rollback the provided number of blocks - pub fn execute(&self, _num_blocks: u32) -> anyhow::Result<()> { - // Do something + pub fn new( + ledger_db: Arc, + state_db: Arc, + native_db: Arc, + ) -> Self { + // distance is the only criteria implemented at the moment. + Self { + ledger_db, + state_db, + native_db, + } + } + + /// Rollback the provided L2/L1 block combination. + pub async fn execute( + &self, + node_type: StorageNodeType, + _current_l2_height: u64, + l2_target: u64, + l1_target: u64, + last_sequencer_commitment_l2_height: u64, + ) -> anyhow::Result<()> { + info!("Rolling back until L2 {}, L1 {}", l2_target, l1_target); + + let ledger_db = self.ledger_db.clone(); + let native_db = self.native_db.clone(); + let state_db = self.state_db.clone(); + + let ledger_rollback_handle = tokio::task::spawn_blocking(move || { + rollback_ledger_db( + node_type, + ledger_db, + l2_target, + l1_target, + last_sequencer_commitment_l2_height, + ) + }); + + let state_db_rollback_handle = + tokio::task::spawn_blocking(move || rollback_state_db(state_db, l2_target)); + + let native_db_rollback_handle = + tokio::task::spawn_blocking(move || rollback_native_db(native_db, l2_target)); + + future::join_all([ + ledger_rollback_handle, + state_db_rollback_handle, + native_db_rollback_handle, + ]) + .await; Ok(()) } diff --git a/crates/storage-ops/src/rollback/service.rs b/crates/storage-ops/src/rollback/service.rs index 2fbd05e6c8..1932375a5c 100644 --- a/crates/storage-ops/src/rollback/service.rs +++ b/crates/storage-ops/src/rollback/service.rs @@ -4,28 +4,36 @@ use tokio_util::sync::CancellationToken; use tracing::info; use super::Rollback; +use crate::pruning::types::StorageNodeType; + +pub struct RollbackSignal { + current_l2_height: u64, + target_l2: u64, + target_l1: u64, + last_commitment_l2_height: u64, +} pub struct RollbackService { rollback: Rollback, - receiver: Receiver, + receiver: Receiver, } impl RollbackService { - pub fn new(rollback: Rollback, receiver: Receiver) -> Self { + pub fn new(rollback: Rollback, receiver: Receiver) -> Self { Self { rollback, receiver } } /// Run service to rollback when instructed to - pub async fn run(mut self, cancellation_token: CancellationToken) { + pub async fn run(mut self, node_type: StorageNodeType, cancellation_token: CancellationToken) { loop { select! { biased; _ = cancellation_token.cancelled() => { return; }, - Some(num_blocks) = self.receiver.recv() => { - info!("Received signal to rollback {num_blocks} blocks"); - if let Err(e) = self.rollback.execute(num_blocks) { + Some(signal) = self.receiver.recv() => { + info!("Received signal to rollback to L2 {}, L1 {}", signal.target_l2, signal.target_l1); + if let Err(e) = self.rollback.execute(node_type, signal.current_l2_height, signal.target_l2, signal.target_l1, signal.last_commitment_l2_height).await { panic!("Could not rollback blocks: {:?}", e); } } diff --git a/crates/storage-ops/src/tests.rs b/crates/storage-ops/src/tests.rs index 56ac71625a..0ff36f165a 100644 --- a/crates/storage-ops/src/tests.rs +++ b/crates/storage-ops/src/tests.rs @@ -24,7 +24,7 @@ use tokio_util::sync::CancellationToken; use crate::pruning::components::prune_ledger; use crate::pruning::criteria::{Criteria, DistanceCriteria}; -use crate::pruning::types::PruningNodeType; +use crate::pruning::types::StorageNodeType; use crate::pruning::{Pruner, PrunerService, PruningConfig}; #[tokio::test(flavor = "multi_thread")] @@ -47,7 +47,7 @@ async fn test_pruning_simple_run() { ); let pruner_service = PrunerService::new(pruner, 0, receiver); - tokio::spawn(pruner_service.run(PruningNodeType::Sequencer, cancellation_token.clone())); + tokio::spawn(pruner_service.run(StorageNodeType::Sequencer, cancellation_token.clone())); sleep(Duration::from_secs(1)); @@ -181,7 +181,7 @@ pub fn test_pruning_ledger_db_soft_confirmations() { .unwrap() .is_some()); - prune_ledger(PruningNodeType::Sequencer, ledger_db.clone(), 10); + prune_ledger(StorageNodeType::Sequencer, ledger_db.clone(), 10); // Pruned assert!(ledger_db @@ -333,7 +333,7 @@ pub fn test_pruning_ledger_db_batch_prover_soft_confirmations() { .unwrap() .is_some()); - prune_ledger(PruningNodeType::BatchProver, ledger_db.clone(), 10); + prune_ledger(StorageNodeType::BatchProver, ledger_db.clone(), 10); // Pruned assert!(ledger_db @@ -517,7 +517,7 @@ pub fn test_pruning_ledger_db_fullnode_slots() { prepare_slots_data(&ledger_db); - prune_ledger(PruningNodeType::FullNode, ledger_db.clone(), 10); + prune_ledger(StorageNodeType::FullNode, ledger_db.clone(), 10); // SHOULD NOT CHANGE assert!(ledger_db @@ -608,7 +608,7 @@ pub fn test_pruning_ledger_db_light_client_slots() { prepare_slots_data(&ledger_db); - prune_ledger(PruningNodeType::LightClient, ledger_db.clone(), 10); + prune_ledger(StorageNodeType::LightClient, ledger_db.clone(), 10); // SHOULD NOT CHANGE assert!(ledger_db @@ -699,7 +699,7 @@ pub fn test_pruning_ledger_db_batch_prover_slots() { prepare_slots_data(&ledger_db); - prune_ledger(PruningNodeType::BatchProver, ledger_db.clone(), 10); + prune_ledger(StorageNodeType::BatchProver, ledger_db.clone(), 10); // SHOULD NOT CHANGE assert!(ledger_db diff --git a/crates/storage-ops/src/utils.rs b/crates/storage-ops/src/utils.rs new file mode 100644 index 0000000000..d84e054857 --- /dev/null +++ b/crates/storage-ops/src/utils.rs @@ -0,0 +1,52 @@ +use sov_db::schema::tables::{ + CommitmentsByNumber, L2RangeByL1Height, L2Witness, LightClientProofBySlotNumber, + ProofsBySlotNumber, ProofsBySlotNumberV2, ProverStateDiffs, SoftConfirmationByHash, + SoftConfirmationByNumber, SoftConfirmationStatus, +}; +use sov_db::schema::types::{DbHash, SlotNumber, SoftConfirmationNumber}; +use sov_schema_db::DB; + +use crate::pruning::types::StorageNodeType; + +pub(crate) fn delete_soft_confirmations_by_number( + node_type: StorageNodeType, + ledger_db: &DB, + soft_confirmation_number: SoftConfirmationNumber, + soft_confirmation_hash: DbHash, +) -> anyhow::Result<()> { + ledger_db.delete::(&soft_confirmation_number)?; + + if matches!(node_type, StorageNodeType::LightClient) { + return Ok(()); + } + + ledger_db.delete::(&soft_confirmation_hash)?; + ledger_db.delete::(&soft_confirmation_number)?; + + if matches!(node_type, StorageNodeType::BatchProver) { + ledger_db.delete::(&soft_confirmation_number)?; + ledger_db.delete::(&soft_confirmation_number)?; + } + + Ok(()) +} + +pub(crate) fn delete_slots_by_number( + node_type: StorageNodeType, + ledger_db: &DB, + slot_number: SlotNumber, +) -> anyhow::Result<()> { + ledger_db.delete::(&slot_number)?; + ledger_db.delete::(&slot_number)?; + + if matches!(node_type, StorageNodeType::BatchProver) { + ledger_db.delete::(&slot_number)?; + ledger_db.delete::(&slot_number)?; + } + + if matches!(node_type, StorageNodeType::LightClient) { + ledger_db.delete::(&slot_number)?; + } + + Ok(()) +}