diff --git a/Cargo.lock b/Cargo.lock index 4edfab8c..6598f60f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4658,6 +4658,7 @@ dependencies = [ "log", "op-alloy-consensus", "op-alloy-network", + "op-alloy-rpc-types", "op-succinct-build-utils", "op-succinct-client-utils", "op-succinct-host-utils", diff --git a/proposer/op/proposer/service.go b/proposer/op/proposer/service.go index 28a3dbdc..bf4cbba0 100644 --- a/proposer/op/proposer/service.go +++ b/proposer/op/proposer/service.go @@ -56,6 +56,7 @@ type ProposerConfig struct { UseCachedDb bool SlackToken string BeaconRpc string + RollupRpc string TxCacheOutDir string MaxBlockRangePerSpanProof uint64 L2ChainID uint64 @@ -118,6 +119,7 @@ func (ps *ProposerService) initFromCLIConfig(ctx context.Context, version string ps.UseCachedDb = cfg.UseCachedDb ps.SlackToken = cfg.SlackToken ps.BeaconRpc = cfg.BeaconRpc + ps.RollupRpc = cfg.RollupRpc ps.TxCacheOutDir = cfg.TxCacheOutDir ps.MaxBlockRangePerSpanProof = cfg.MaxBlockRangePerSpanProof ps.OPSuccinctServerUrl = cfg.OPSuccinctServerUrl diff --git a/proposer/op/proposer/span_batches.go b/proposer/op/proposer/span_batches.go index be5ef618..4a08c6d9 100644 --- a/proposer/op/proposer/span_batches.go +++ b/proposer/op/proposer/span_batches.go @@ -3,7 +3,10 @@ package proposer import ( "context" "fmt" + "slices" + "github.com/ethereum-optimism/optimism/op-service/dial" + "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/succinctlabs/op-succinct-go/proposer/db/ent" "github.com/succinctlabs/op-succinct-go/proposer/db/ent/proofrequest" @@ -14,8 +17,113 @@ type Span struct { End uint64 } +// GetL1HeadForL2Block returns the L1 block from which the L2 block can be derived. +func (l *L2OutputSubmitter) GetL1HeadForL2Block(ctx context.Context, rollupClient *sources.RollupClient, l2End uint64) (uint64, error) { + status, err := rollupClient.SyncStatus(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get sync status: %w", err) + } + latestL1Block := status.HeadL1.Number + + // Get the L1 origin of the end block. + outputResponse, err := rollupClient.OutputAtBlock(ctx, l2End) + if err != nil { + return 0, fmt.Errorf("failed to get l1 origin: %w", err) + } + L2EndL1Origin := outputResponse.BlockRef.L1Origin.Number + + // Search forward from the L1 origin of the L2 end block until we find a safe head greater than the L2 end block. + for currentL1Block := L2EndL1Origin; currentL1Block <= latestL1Block; currentL1Block++ { + safeHead, err := rollupClient.SafeHeadAtL1Block(ctx, currentL1Block) + if err != nil { + return 0, fmt.Errorf("failed to get safe head: %w", err) + } + // If the safe head is greater than or equal to the L2 end block at this L1 block, then we can derive the L2 end block from this L1 block. + if safeHead.SafeHead.Number >= l2End { + return currentL1Block, nil + } + } + + return 0, fmt.Errorf("could not find an L1 block with an L2 safe head greater than the L2 end block") +} + +func (l *L2OutputSubmitter) isSafeDBActivated(ctx context.Context, rollupClient *sources.RollupClient) (bool, error) { + // Get the sync status of the rollup node. + status, err := rollupClient.SyncStatus(ctx) + if err != nil { + return false, fmt.Errorf("failed to get sync status: %w", err) + } + + // Attempt querying the safe head at the latest L1 block. + _, err = rollupClient.SafeHeadAtL1Block(ctx, status.HeadL1.Number) + if err != nil { + return false, fmt.Errorf("failed to get safe head: %w", err) + } + + return true, nil +} + +// SplitRangeBasedOnSafeHeads splits a range into spans based on safe head boundaries. +// This is useful when we want to ensure that each span aligns with L2 safe head boundaries. +func (l *L2OutputSubmitter) SplitRangeBasedOnSafeHeads(ctx context.Context, l2Start, l2End uint64) ([]Span, error) { + spans := []Span{} + currentStart := l2Start + + rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, l.Log, l.Cfg.RollupRpc) + if err != nil { + return nil, err + } + + L1Head, err := l.GetL1HeadForL2Block(ctx, rollupClient, l2End) + if err != nil { + return nil, fmt.Errorf("failed to get l1 head for l2 block: %w", err) + } + + l2StartOutput, err := rollupClient.OutputAtBlock(ctx, l2Start) + if err != nil { + return nil, fmt.Errorf("failed to get l2 start output: %w", err) + } + L2StartL1Origin := l2StartOutput.BlockRef.L1Origin.Number + + // Get all the unique safe heads between l1_start and l1_head + safeHeads := make(map[uint64]struct{}) + for currentL1Block := L2StartL1Origin; currentL1Block <= L1Head; currentL1Block++ { + safeHead, err := rollupClient.SafeHeadAtL1Block(ctx, currentL1Block) + if err != nil { + return nil, fmt.Errorf("failed to get safe head: %w", err) + } + safeHeads[safeHead.SafeHead.Number] = struct{}{} + } + uniqueSafeHeads := make([]uint64, 0, len(safeHeads)) + for safeHead := range safeHeads { + uniqueSafeHeads = append(uniqueSafeHeads, safeHead) + } + slices.Sort(uniqueSafeHeads) + + // Loop over all of the safe heads and create spans. + for _, safeHead := range uniqueSafeHeads { + if safeHead > currentStart { + rangeStart := currentStart + for rangeStart+l.Cfg.MaxBlockRangePerSpanProof < min(l2End, safeHead) { + spans = append(spans, Span{ + Start: rangeStart, + End: rangeStart + l.Cfg.MaxBlockRangePerSpanProof, + }) + rangeStart += l.Cfg.MaxBlockRangePerSpanProof + } + spans = append(spans, Span{ + Start: rangeStart, + End: min(l2End, safeHead), + }) + currentStart = safeHead + } + } + + return spans, nil +} + // CreateSpans creates a list of spans of size MaxBlockRangePerSpanProof from start to end. Note: The end of span i = start of span i+1. -func (l *L2OutputSubmitter) CreateSpans(start, end uint64) []Span { +func (l *L2OutputSubmitter) SplitRangeBasic(start, end uint64) []Span { spans := []Span{} // Create spans of size MaxBlockRangePerSpanProof from start to end. // Each span starts where the previous one ended. @@ -44,9 +152,9 @@ func (l *L2OutputSubmitter) DeriveNewSpanBatches(ctx context.Context) error { } newL2StartBlock := latestL2EndBlock - rollupClient, err := l.RollupProvider.RollupClient(ctx) + rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, l.Log, l.Cfg.RollupRpc) if err != nil { - return fmt.Errorf("failed to get rollup client: %w", err) + return err } // Get the latest finalized L2 block. @@ -58,8 +166,25 @@ func (l *L2OutputSubmitter) DeriveNewSpanBatches(ctx context.Context) error { // Note: Originally, this used the L1 finalized block. However, to satisfy the new API, we now use the L2 finalized block. newL2EndBlock := status.FinalizedL2.Number - // Create spans of size MaxBlockRangePerSpanProof from newL2StartBlock to newL2EndBlock. - spans := l.CreateSpans(newL2StartBlock, newL2EndBlock) + // Check if the safeDB is activated on the L2 node. If it is, we use the safeHead based range + // splitting algorithm. Otherwise, we use the simple range splitting algorithm. + safeDBActivated, err := l.isSafeDBActivated(ctx, rollupClient) + if err != nil { + return fmt.Errorf("failed to check if safeDB is activated: %w", err) + } + + var spans []Span + // If the safeDB is activated, we use the safeHead based range splitting algorithm. + // Otherwise, we use the simple range splitting algorithm. + if safeDBActivated { + spans, err = l.SplitRangeBasedOnSafeHeads(ctx, newL2StartBlock, newL2EndBlock) + if err != nil { + return fmt.Errorf("failed to split range based on safe heads: %w", err) + } + } else { + spans = l.SplitRangeBasic(newL2StartBlock, newL2EndBlock) + } + // Add each span to the DB. If there are no spans, we will not create any proofs. for _, span := range spans { err := l.db.NewEntry(proofrequest.TypeSPAN, span.Start, span.End) diff --git a/proposer/op/proposer/span_batches_test.go b/proposer/op/proposer/span_batches_test.go index 1118fcf5..a3115d5b 100644 --- a/proposer/op/proposer/span_batches_test.go +++ b/proposer/op/proposer/span_batches_test.go @@ -59,7 +59,7 @@ func TestCreateSpans(t *testing.T) { l := &L2OutputSubmitter{} l.Cfg = ProposerConfig{MaxBlockRangePerSpanProof: tt.maxBlockRange} - spans := l.CreateSpans(tt.start, tt.end) + spans := l.SplitRangeBasic(tt.start, tt.end) assert.Equal(t, tt.expectedSpansCount, len(spans), "Unexpected number of spans") diff --git a/scripts/utils/Cargo.toml b/scripts/utils/Cargo.toml index cf9d3937..96985ae4 100644 --- a/scripts/utils/Cargo.toml +++ b/scripts/utils/Cargo.toml @@ -45,7 +45,6 @@ futures.workspace = true rayon = "1.10.0" serde_json.workspace = true - # kona kona-host = { workspace = true } @@ -56,6 +55,7 @@ op-succinct-client-utils.workspace = true # op-alloy op-alloy-consensus.workspace = true op-alloy-network.workspace = true +op-alloy-rpc-types.workspace = true # sp1 sp1-sdk = { workspace = true } diff --git a/scripts/utils/bin/cost_estimator.rs b/scripts/utils/bin/cost_estimator.rs index b4e1c0f4..24762871 100644 --- a/scripts/utils/bin/cost_estimator.rs +++ b/scripts/utils/bin/cost_estimator.rs @@ -3,9 +3,10 @@ use clap::Parser; use futures::StreamExt; use kona_host::HostCli; use log::info; +use op_alloy_rpc_types::{OutputResponse, SafeHeadResponse}; use op_succinct_host_utils::{ block_range::{get_rolling_block_range, get_validated_block_range}, - fetcher::{CacheMode, OPSuccinctDataFetcher}, + fetcher::{CacheMode, OPSuccinctDataFetcher, RPCMode}, get_proof_stdin, stats::ExecutionStats, witnessgen::WitnessGenExecutor, @@ -16,6 +17,7 @@ use serde::{Deserialize, Serialize}; use sp1_sdk::{utils, ProverClient}; use std::{ cmp::{max, min}, + collections::HashSet, fs::{self, OpenOptions}, io::Seek, path::PathBuf, @@ -74,7 +76,9 @@ fn get_max_span_batch_range_size(l2_chain_id: u64, supplied_range_size: Option, +) -> Result> { + let data_fetcher = OPSuccinctDataFetcher::default(); + let max_size = get_max_span_batch_range_size(l2_chain_id, supplied_range_size); + + // Get the L1 origin of l2_start + let l2_start_hex = format!("0x{:x}", l2_start); + let start_output: OutputResponse = data_fetcher + .fetch_rpc_data_with_mode( + RPCMode::L2Node, + "optimism_outputAtBlock", + vec![l2_start_hex.into()], + ) + .await?; + let l1_start = start_output.block_ref.l1_origin.number; + + // Get the L1Head from which l2_end can be derived + let (_, l1_head_number) = data_fetcher.get_l1_head_with_safe_head(l2_end).await?; + + // Get all the unique safeHeads between l1_start and l1_head + let mut ranges = Vec::new(); + let mut current_l2_start = l2_start; + let safe_heads = futures::stream::iter(l1_start..=l1_head_number) + .map(|block| async move { + let l1_block_hex = format!("0x{:x}", block); + let data_fetcher = OPSuccinctDataFetcher::default(); + let result: SafeHeadResponse = data_fetcher + .fetch_rpc_data_with_mode( + RPCMode::L2Node, + "optimism_safeHeadAtL1Block", + vec![l1_block_hex.into()], + ) + .await + .expect("Failed to fetch safe head"); + result.safe_head.number + }) + .buffered(15) + .collect::>() + .await; + + // Collect and sort the safe heads. + let mut safe_heads: Vec<_> = safe_heads.into_iter().collect(); + safe_heads.sort(); + + // Loop over all of the safe heads and create ranges. + for safe_head in safe_heads { + if safe_head > current_l2_start { + let mut range_start = current_l2_start; + while range_start + max_size < min(l2_end, safe_head) { + ranges.push(SpanBatchRange { + start: range_start, + end: range_start + max_size, + }); + range_start += max_size; + } + ranges.push(SpanBatchRange { + start: range_start, + end: min(l2_end, safe_head), + }); + current_l2_start = safe_head; + } + } + + Ok(ranges) +} + /// Concurrently run the native data generation process for each split range. async fn run_native_data_generation(host_clis: &[HostCli]) { const CONCURRENT_NATIVE_HOST_RUNNERS: usize = 5; @@ -283,7 +364,16 @@ async fn main() -> Result<()> { get_validated_block_range(&data_fetcher, args.start, args.end, args.default_range).await? }; - let split_ranges = split_range(l2_start_block, l2_end_block, l2_chain_id, args.batch_size); + // Check if the safeDB is activated on the L2 node. If it is, we use the safeHead based range + // splitting algorithm. Otherwise, we use the simple range splitting algorithm. + let safe_db_activated = data_fetcher.is_safe_db_activated().await?; + + let split_ranges = if safe_db_activated { + split_range_based_on_safe_heads(l2_start_block, l2_end_block, l2_chain_id, args.batch_size) + .await? + } else { + split_range_basic(l2_start_block, l2_end_block, l2_chain_id, args.batch_size) + }; info!( "The span batch ranges which will be executed: {:?}", diff --git a/utils/host/src/fetcher.rs b/utils/host/src/fetcher.rs index fab86d2a..e19a5764 100644 --- a/utils/host/src/fetcher.rs +++ b/utils/host/src/fetcher.rs @@ -785,6 +785,7 @@ impl OPSuccinctDataFetcher { } /// Get the L1 block time in seconds. + #[allow(dead_code)] async fn get_l1_block_time(&self) -> Result { let l1_head = self.get_l1_header(BlockId::latest()).await?; @@ -803,9 +804,7 @@ impl OPSuccinctDataFetcher { } /// Get the L1 block from which the `l2_end_block` can be derived. - async fn get_l1_head_with_safe_head(&self, l2_end_block: u64) -> Result<(B256, u64)> { - let l1_block_time_secs = self.get_l1_block_time().await?; - + pub async fn get_l1_head_with_safe_head(&self, l2_end_block: u64) -> Result<(B256, u64)> { let latest_l1_header = self.get_l1_header(BlockId::latest()).await?; // Get the l1 origin of the l2 end block. @@ -820,7 +819,7 @@ impl OPSuccinctDataFetcher { let l1_origin = optimism_output_data.block_ref.l1_origin; - // Search forward from the l1Origin, skipping forward in 5 minute increments until an L1 block with an L2 safe head greater than the l2_end_block is found. + // Search forward from the l1Origin, checking each L1 block until we find one with an L2 safe head greater than l2_end_block let mut current_l1_block_number = l1_origin.number; loop { // If the current L1 block number is greater than the latest L1 header number, then return an error. @@ -839,13 +838,12 @@ impl OPSuccinctDataFetcher { ) .await?; let l2_safe_head = result.safe_head.number; - if l2_safe_head > l2_end_block { + // If the safe head is greater than or equal to the L2 end block at this L1 block, then we can derive the L2 end block from this L1 block. + if l2_safe_head >= l2_end_block { return Ok((result.l1_block.hash, result.l1_block.number)); } - // Move forward in 5 minute increments. - const SKIP_MINS: u64 = 5; - current_l1_block_number += SKIP_MINS * (60 / l1_block_time_secs); + current_l1_block_number += 1; } } @@ -916,6 +914,20 @@ impl OPSuccinctDataFetcher { Ok(result.safe_head.number) } + /// Check if the safeDB is activated on the L2 node. + pub async fn is_safe_db_activated(&self) -> Result { + let l1_block = self.get_l1_header(BlockId::latest()).await?; + let l1_block_number_hex = format!("0x{:x}", l1_block.number); + let result: Result = self + .fetch_rpc_data_with_mode( + RPCMode::L2Node, + "optimism_safeHeadAtL1Block", + vec![l1_block_number_hex.into()], + ) + .await; + Ok(result.is_ok()) + } + /// Get the l2_end_block number given the l2_start_block number and the ideal block interval. /// Picks the l2 end block that minimizes the derivation cost by picking the l2 block that can be derived from the same batch as the l2_start_block. pub async fn get_l2_end_block(