Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cost-estimator+proposer): split range based on optimism_safeHeadAtL1Block #256

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proposer/op/proposer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ProposerConfig struct {
UseCachedDb bool
SlackToken string
BeaconRpc string
RollupRpc string
TxCacheOutDir string
MaxBlockRangePerSpanProof uint64
L2ChainID uint64
Expand Down Expand Up @@ -118,6 +119,7 @@ func (ps *ProposerService) initFromCLIConfig(ctx context.Context, version string
ps.UseCachedDb = cfg.UseCachedDb
ps.SlackToken = cfg.SlackToken
ps.BeaconRpc = cfg.BeaconRpc
ps.RollupRpc = cfg.RollupRpc
ps.TxCacheOutDir = cfg.TxCacheOutDir
ps.MaxBlockRangePerSpanProof = cfg.MaxBlockRangePerSpanProof
ps.OPSuccinctServerUrl = cfg.OPSuccinctServerUrl
Expand Down
135 changes: 130 additions & 5 deletions proposer/op/proposer/span_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package proposer
import (
"context"
"fmt"
"slices"

"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/succinctlabs/op-succinct-go/proposer/db/ent"
"github.com/succinctlabs/op-succinct-go/proposer/db/ent/proofrequest"
Expand All @@ -14,8 +17,113 @@ type Span struct {
End uint64
}

// GetL1HeadForL2Block returns the L1 block from which the L2 block can be derived.
func (l *L2OutputSubmitter) GetL1HeadForL2Block(ctx context.Context, rollupClient *sources.RollupClient, l2End uint64) (uint64, error) {
status, err := rollupClient.SyncStatus(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get sync status: %w", err)
}
latestL1Block := status.HeadL1.Number

// Get the L1 origin of the end block.
outputResponse, err := rollupClient.OutputAtBlock(ctx, l2End)
if err != nil {
return 0, fmt.Errorf("failed to get l1 origin: %w", err)
}
L2EndL1Origin := outputResponse.BlockRef.L1Origin.Number

// Search forward from the L1 origin of the L2 end block until we find a safe head greater than the L2 end block.
for currentL1Block := L2EndL1Origin; currentL1Block <= latestL1Block; currentL1Block++ {
safeHead, err := rollupClient.SafeHeadAtL1Block(ctx, currentL1Block)
if err != nil {
return 0, fmt.Errorf("failed to get safe head: %w", err)
}
// If the safe head is greater than or equal to the L2 end block at this L1 block, then we can derive the L2 end block from this L1 block.
if safeHead.SafeHead.Number >= l2End {
return currentL1Block, nil
}
}

return 0, fmt.Errorf("could not find an L1 block with an L2 safe head greater than the L2 end block")
}

func (l *L2OutputSubmitter) isSafeDBActivated(ctx context.Context, rollupClient *sources.RollupClient) (bool, error) {
// Get the sync status of the rollup node.
status, err := rollupClient.SyncStatus(ctx)
if err != nil {
return false, fmt.Errorf("failed to get sync status: %w", err)
}

// Attempt querying the safe head at the latest L1 block.
_, err = rollupClient.SafeHeadAtL1Block(ctx, status.HeadL1.Number)
if err != nil {
return false, fmt.Errorf("failed to get safe head: %w", err)
}

return true, nil
}

// SplitRangeBasedOnSafeHeads splits a range into spans based on safe head boundaries.
// This is useful when we want to ensure that each span aligns with L2 safe head boundaries.
func (l *L2OutputSubmitter) SplitRangeBasedOnSafeHeads(ctx context.Context, l2Start, l2End uint64) ([]Span, error) {
spans := []Span{}
currentStart := l2Start

rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, l.Log, l.Cfg.RollupRpc)
if err != nil {
return nil, err
}

L1Head, err := l.GetL1HeadForL2Block(ctx, rollupClient, l2End)
if err != nil {
return nil, fmt.Errorf("failed to get l1 head for l2 block: %w", err)
}

l2StartOutput, err := rollupClient.OutputAtBlock(ctx, l2Start)
if err != nil {
return nil, fmt.Errorf("failed to get l2 start output: %w", err)
}
L2StartL1Origin := l2StartOutput.BlockRef.L1Origin.Number

// Get all the unique safe heads between l1_start and l1_head
safeHeads := make(map[uint64]struct{})
for currentL1Block := L2StartL1Origin; currentL1Block <= L1Head; currentL1Block++ {
safeHead, err := rollupClient.SafeHeadAtL1Block(ctx, currentL1Block)
if err != nil {
return nil, fmt.Errorf("failed to get safe head: %w", err)
}
safeHeads[safeHead.SafeHead.Number] = struct{}{}
}
uniqueSafeHeads := make([]uint64, 0, len(safeHeads))
for safeHead := range safeHeads {
uniqueSafeHeads = append(uniqueSafeHeads, safeHead)
}
slices.Sort(uniqueSafeHeads)

// Loop over all of the safe heads and create spans.
for _, safeHead := range uniqueSafeHeads {
if safeHead > currentStart {
rangeStart := currentStart
for rangeStart+l.Cfg.MaxBlockRangePerSpanProof < min(l2End, safeHead) {
spans = append(spans, Span{
Start: rangeStart,
End: rangeStart + l.Cfg.MaxBlockRangePerSpanProof,
})
rangeStart += l.Cfg.MaxBlockRangePerSpanProof
}
spans = append(spans, Span{
Start: rangeStart,
End: min(l2End, safeHead),
})
currentStart = safeHead
}
}

return spans, nil
}

// CreateSpans creates a list of spans of size MaxBlockRangePerSpanProof from start to end. Note: The end of span i = start of span i+1.
func (l *L2OutputSubmitter) CreateSpans(start, end uint64) []Span {
func (l *L2OutputSubmitter) SplitRangeBasic(start, end uint64) []Span {
spans := []Span{}
// Create spans of size MaxBlockRangePerSpanProof from start to end.
// Each span starts where the previous one ended.
Expand Down Expand Up @@ -44,9 +152,9 @@ func (l *L2OutputSubmitter) DeriveNewSpanBatches(ctx context.Context) error {
}
newL2StartBlock := latestL2EndBlock

rollupClient, err := l.RollupProvider.RollupClient(ctx)
rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, l.Log, l.Cfg.RollupRpc)
if err != nil {
return fmt.Errorf("failed to get rollup client: %w", err)
return err
}

// Get the latest finalized L2 block.
Expand All @@ -58,8 +166,25 @@ func (l *L2OutputSubmitter) DeriveNewSpanBatches(ctx context.Context) error {
// Note: Originally, this used the L1 finalized block. However, to satisfy the new API, we now use the L2 finalized block.
newL2EndBlock := status.FinalizedL2.Number

// Create spans of size MaxBlockRangePerSpanProof from newL2StartBlock to newL2EndBlock.
spans := l.CreateSpans(newL2StartBlock, newL2EndBlock)
// Check if the safeDB is activated on the L2 node. If it is, we use the safeHead based range
// splitting algorithm. Otherwise, we use the simple range splitting algorithm.
safeDBActivated, err := l.isSafeDBActivated(ctx, rollupClient)
if err != nil {
return fmt.Errorf("failed to check if safeDB is activated: %w", err)
}

var spans []Span
// If the safeDB is activated, we use the safeHead based range splitting algorithm.
// Otherwise, we use the simple range splitting algorithm.
if safeDBActivated {
spans, err = l.SplitRangeBasedOnSafeHeads(ctx, newL2StartBlock, newL2EndBlock)
if err != nil {
return fmt.Errorf("failed to split range based on safe heads: %w", err)
}
} else {
spans = l.SplitRangeBasic(newL2StartBlock, newL2EndBlock)
}

// Add each span to the DB. If there are no spans, we will not create any proofs.
for _, span := range spans {
err := l.db.NewEntry(proofrequest.TypeSPAN, span.Start, span.End)
Expand Down
2 changes: 1 addition & 1 deletion proposer/op/proposer/span_batches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestCreateSpans(t *testing.T) {
l := &L2OutputSubmitter{}
l.Cfg = ProposerConfig{MaxBlockRangePerSpanProof: tt.maxBlockRange}

spans := l.CreateSpans(tt.start, tt.end)
spans := l.SplitRangeBasic(tt.start, tt.end)

assert.Equal(t, tt.expectedSpansCount, len(spans), "Unexpected number of spans")

Expand Down
2 changes: 1 addition & 1 deletion scripts/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ futures.workspace = true
rayon = "1.10.0"
serde_json.workspace = true


# kona
kona-host = { workspace = true }

Expand All @@ -56,6 +55,7 @@ op-succinct-client-utils.workspace = true
# op-alloy
op-alloy-consensus.workspace = true
op-alloy-network.workspace = true
op-alloy-rpc-types.workspace = true

# sp1
sp1-sdk = { workspace = true }
Expand Down
96 changes: 93 additions & 3 deletions scripts/utils/bin/cost_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use clap::Parser;
use futures::StreamExt;
use kona_host::HostCli;
use log::info;
use op_alloy_rpc_types::{OutputResponse, SafeHeadResponse};
use op_succinct_host_utils::{
block_range::{get_rolling_block_range, get_validated_block_range},
fetcher::{CacheMode, OPSuccinctDataFetcher},
fetcher::{CacheMode, OPSuccinctDataFetcher, RPCMode},
get_proof_stdin,
stats::ExecutionStats,
witnessgen::WitnessGenExecutor,
Expand All @@ -16,6 +17,7 @@ use serde::{Deserialize, Serialize};
use sp1_sdk::{utils, ProverClient};
use std::{
cmp::{max, min},
collections::HashSet,
fs::{self, OpenOptions},
io::Seek,
path::PathBuf,
Expand Down Expand Up @@ -74,7 +76,9 @@ fn get_max_span_batch_range_size(l2_chain_id: u64, supplied_range_size: Option<u
}

/// Split a range of blocks into a list of span batch ranges.
fn split_range(
///
/// This is a simple implementation used when the safeDB is not activated on the L2 Node.
fn split_range_basic(
start: u64,
end: u64,
l2_chain_id: u64,
Expand All @@ -96,6 +100,83 @@ fn split_range(
ranges
}

/// Split a range of blocks into a list of span batch ranges based on L2 safeHeads.
///
/// 1. Get the L1 block range [L1 origin of l2_start, L1Head] where L1Head is the block from which l2_end can be derived
/// 2. Loop over L1 blocks to get safeHead increases (batch posts) which form a step function
/// 3. Split ranges based on safeHead increases and max batch size
///
/// Example: If safeHeads are [27,49,90] and max_size=30, ranges will be [(0,27), (27,49), (49,69), (69,90)]
async fn split_range_based_on_safe_heads(
l2_start: u64,
l2_end: u64,
l2_chain_id: u64,
supplied_range_size: Option<u64>,
) -> Result<Vec<SpanBatchRange>> {
let data_fetcher = OPSuccinctDataFetcher::default();
let max_size = get_max_span_batch_range_size(l2_chain_id, supplied_range_size);

// Get the L1 origin of l2_start
let l2_start_hex = format!("0x{:x}", l2_start);
let start_output: OutputResponse = data_fetcher
.fetch_rpc_data_with_mode(
RPCMode::L2Node,
"optimism_outputAtBlock",
vec![l2_start_hex.into()],
)
.await?;
let l1_start = start_output.block_ref.l1_origin.number;

// Get the L1Head from which l2_end can be derived
let (_, l1_head_number) = data_fetcher.get_l1_head_with_safe_head(l2_end).await?;

// Get all the unique safeHeads between l1_start and l1_head
let mut ranges = Vec::new();
let mut current_l2_start = l2_start;
let safe_heads = futures::stream::iter(l1_start..=l1_head_number)
.map(|block| async move {
let l1_block_hex = format!("0x{:x}", block);
let data_fetcher = OPSuccinctDataFetcher::default();
let result: SafeHeadResponse = data_fetcher
.fetch_rpc_data_with_mode(
RPCMode::L2Node,
"optimism_safeHeadAtL1Block",
vec![l1_block_hex.into()],
)
.await
.expect("Failed to fetch safe head");
result.safe_head.number
})
.buffered(15)
.collect::<HashSet<_>>()
.await;

// Collect and sort the safe heads.
let mut safe_heads: Vec<_> = safe_heads.into_iter().collect();
safe_heads.sort();

// Loop over all of the safe heads and create ranges.
for safe_head in safe_heads {
if safe_head > current_l2_start {
let mut range_start = current_l2_start;
while range_start + max_size < min(l2_end, safe_head) {
ranges.push(SpanBatchRange {
start: range_start,
end: range_start + max_size,
});
range_start += max_size;
}
ranges.push(SpanBatchRange {
start: range_start,
end: min(l2_end, safe_head),
});
current_l2_start = safe_head;
}
}

Ok(ranges)
}

/// Concurrently run the native data generation process for each split range.
async fn run_native_data_generation(host_clis: &[HostCli]) {
const CONCURRENT_NATIVE_HOST_RUNNERS: usize = 5;
Expand Down Expand Up @@ -283,7 +364,16 @@ async fn main() -> Result<()> {
get_validated_block_range(&data_fetcher, args.start, args.end, args.default_range).await?
};

let split_ranges = split_range(l2_start_block, l2_end_block, l2_chain_id, args.batch_size);
// Check if the safeDB is activated on the L2 node. If it is, we use the safeHead based range
// splitting algorithm. Otherwise, we use the simple range splitting algorithm.
let safe_db_activated = data_fetcher.is_safe_db_activated().await?;

let split_ranges = if safe_db_activated {
split_range_based_on_safe_heads(l2_start_block, l2_end_block, l2_chain_id, args.batch_size)
.await?
} else {
split_range_basic(l2_start_block, l2_end_block, l2_chain_id, args.batch_size)
};

info!(
"The span batch ranges which will be executed: {:?}",
Expand Down
Loading
Loading