Skip to content

Commit

Permalink
Refactored Task trait
Browse files Browse the repository at this point in the history
have an error with lifetime of &self with executor.spawn_blocking
  • Loading branch information
SozinM committed Feb 18, 2025
1 parent b26aec5 commit 9fa8d78
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 75 deletions.
51 changes: 18 additions & 33 deletions crates/op-rbuilder/src/generator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures_util::Future;
use futures_util::FutureExt;
use reth::providers::BlockReaderIdExt;
use reth::{providers::StateProviderFactory, tasks::TaskSpawner};
use reth::providers::StateProviderFactory;
use reth_basic_payload_builder::HeaderForPayload;
use reth_basic_payload_builder::{BasicPayloadJobGeneratorConfig, PayloadConfig};
use reth_node_api::PayloadBuilderAttributes;
Expand Down Expand Up @@ -56,11 +56,10 @@ pub trait PayloadBuilder: Send + Sync + Clone {

/// The generator type that creates new jobs that builds empty blocks.
#[derive(Debug)]
pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {
pub struct BlockPayloadJobGenerator<Client, Builder> {
/// The client that can interact with the chain.
client: Client,
/// How to spawn building tasks
executor: Tasks,

/// The configuration for the job generator.
_config: BasicPayloadJobGeneratorConfig,
/// The type responsible for building payloads.
Expand All @@ -75,19 +74,17 @@ pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {

// === impl EmptyBlockPayloadJobGenerator ===

impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
impl<Client, Builder> BlockPayloadJobGenerator<Client, Builder> {
/// Creates a new [EmptyBlockPayloadJobGenerator] with the given config and custom
/// [PayloadBuilder]
pub fn with_builder(
client: Client,
executor: Tasks,
config: BasicPayloadJobGeneratorConfig,
builder: Builder,
ensure_only_one_payload: bool,
) -> Self {
Self {
client,
executor,
_config: config,
builder,
ensure_only_one_payload,
Expand All @@ -96,20 +93,19 @@ impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
}
}

impl<Client, Tasks, Builder> PayloadJobGenerator
for BlockPayloadJobGenerator<Client, Tasks, Builder>
impl<Client, Builder> PayloadJobGenerator
for BlockPayloadJobGenerator<Client, Builder>
where
Client: StateProviderFactory
+ BlockReaderIdExt<Header = HeaderForPayload<Builder::BuiltPayload>>
+ Clone
+ Unpin
+ 'static,
Tasks: TaskSpawner + Clone + Unpin + 'static,
Builder: PayloadBuilder + Unpin + 'static,
Builder::Attributes: Unpin + Clone,
Builder::BuiltPayload: Unpin + Clone,
{
type Job = BlockPayloadJob<Tasks, Builder>;
type Job = BlockPayloadJob<Builder>;

/// This is invoked when the node receives payload attributes from the beacon node via
/// `engine_forkchoiceUpdatedV1`
Expand Down Expand Up @@ -161,7 +157,6 @@ where
let config = PayloadConfig::new(Arc::new(parent_header.clone()), attributes);

let mut job = BlockPayloadJob {
executor: self.executor.clone(),
builder: self.builder.clone(),
config,
cell: BlockCell::new(),
Expand All @@ -182,14 +177,12 @@ use std::{
};

/// A [PayloadJob] that builds empty blocks.
pub struct BlockPayloadJob<Tasks, Builder>
pub struct BlockPayloadJob<Builder>
where
Builder: PayloadBuilder,
{
/// The configuration for how the payload will be created.
pub(crate) config: PayloadConfig<Builder::Attributes, HeaderForPayload<Builder::BuiltPayload>>,
/// How to spawn building tasks
pub(crate) executor: Tasks,
/// The type responsible for building payloads.
///
/// See [PayloadBuilder]
Expand All @@ -202,9 +195,8 @@ where
pub(crate) build_complete: Option<oneshot::Receiver<Result<(), PayloadBuilderError>>>,
}

impl<Tasks, Builder> PayloadJob for BlockPayloadJob<Tasks, Builder>
impl<Builder> PayloadJob for BlockPayloadJob<Builder>
where
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder + Unpin + 'static,
Builder::Attributes: Unpin + Clone,
Builder::BuiltPayload: Unpin + Clone,
Expand Down Expand Up @@ -245,9 +237,8 @@ pub struct BuildArguments<Attributes, Payload: BuiltPayload> {
}

/// A [PayloadJob] is a future that's being polled by the `PayloadBuilderService`
impl<Tasks, Builder> BlockPayloadJob<Tasks, Builder>
impl<Builder> BlockPayloadJob<Builder>
where
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder + Unpin + 'static,
Builder::Attributes: Unpin + Clone,
Builder::BuiltPayload: Unpin + Clone,
Expand All @@ -260,24 +251,20 @@ where

let (tx, rx) = oneshot::channel();
self.build_complete = Some(rx);
let args = BuildArguments {
cached_reads: Default::default(),
config: payload_config,
cancel,
};

self.executor.spawn_blocking(Box::pin(async move {
let args = BuildArguments {
cached_reads: Default::default(),
config: payload_config,
cancel,
};

let result = builder.try_build(args, cell);
let _ = tx.send(result);
}));
let result = builder.try_build(args, cell);
let _ = tx.send(result);
}
}

/// A [PayloadJob] is a a future that's being polled by the `PayloadBuilderService`
impl<Tasks, Builder> Future for BlockPayloadJob<Tasks, Builder>
impl<Builder> Future for BlockPayloadJob<Builder>
where
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder + Unpin + 'static,
Builder::Attributes: Unpin + Clone,
Builder::BuiltPayload: Unpin + Clone,
Expand Down Expand Up @@ -601,7 +588,6 @@ mod tests {
let mut rng = thread_rng();

let client = MockEthProvider::default();
let executor = TokioTaskExecutor::default();
let config = BasicPayloadJobGeneratorConfig::default();
let builder = MockBuilder::<OpPrimitives>::new();

Expand All @@ -619,7 +605,6 @@ mod tests {

let generator = BlockPayloadJobGenerator::with_builder(
client.clone(),
executor,
config,
builder.clone(),
false,
Expand Down
108 changes: 66 additions & 42 deletions crates/op-rbuilder/src/payload_builder_vanilla.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use alloy_consensus::constants::EMPTY_WITHDRAWALS;
use alloy_consensus::{
Eip658Value, Header, Transaction, TxEip1559, Typed2718, EMPTY_OMMER_ROOT_HASH,
};
use tokio::sync::oneshot;
use alloy_eips::merge::BEACON_NONCE;
use alloy_primitives::private::alloy_rlp::Encodable;
use alloy_primitives::{Address, Bytes, TxKind, B256, U256};
Expand Down Expand Up @@ -72,6 +73,7 @@ use revm::{
};
use std::error::Error as StdError;
use std::{fmt::Display, sync::Arc, time::Instant};
use reth::tasks::{TaskExecutor, TaskSpawner};
use tokio_util::sync::CancellationToken;
use tracing::{info, trace, warn};

Expand Down Expand Up @@ -100,7 +102,7 @@ where
+ Unpin
+ 'static,
{
type PayloadBuilder = OpPayloadBuilderVanilla<Pool, Node::Provider, OpEvmConfig, OpPrimitives>;
type PayloadBuilder = OpPayloadBuilderVanilla<Pool, Node::Provider, OpEvmConfig, OpPrimitives, TaskExecutor>;

async fn build_payload_builder(
&self,
Expand All @@ -113,6 +115,7 @@ where
pool,
ctx.provider().clone(),
Arc::new(BasicOpReceiptBuilder::default()),
ctx.task_executor().clone(),
))
}

Expand All @@ -126,7 +129,6 @@ where

let payload_generator = BlockPayloadJobGenerator::with_builder(
ctx.provider().clone(),
ctx.task_executor().clone(),
payload_job_config,
payload_builder,
false,
Expand All @@ -144,14 +146,15 @@ where
}
}

impl<Pool, Client, EvmConfig, N, Txs> reth_basic_payload_builder::PayloadBuilder
for OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Txs>
impl<Pool, Client, EvmConfig, N, Tasks, Txs> reth_basic_payload_builder::PayloadBuilder
for OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Tasks, Txs>
where
Pool: Clone + Send + Sync,
Client: Clone + Send + Sync,
EvmConfig: Clone + Send + Sync,
N: NodePrimitives,
Txs: Clone + Send + Sync,
Tasks: TaskSpawner + Clone + Unpin + 'static,
{
type Attributes = OpPayloadBuilderAttributes<N::SignedTx>;
type BuiltPayload = OpBuiltPayload<N>;
Expand All @@ -176,7 +179,10 @@ where

/// Optimism's payload builder
#[derive(Debug, Clone)]
pub struct OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N: NodePrimitives, Txs = ()> {
pub struct OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N: NodePrimitives, Tasks, Txs = ()>
where
Tasks: TaskSpawner + Clone + Unpin + 'static
{
/// The type responsible for creating the evm.
pub evm_config: EvmConfig,
/// The builder's signer key to use for an end of block tx
Expand All @@ -194,10 +200,14 @@ pub struct OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N: NodePrimitives, T
pub metrics: OpRBuilderMetrics,
/// Node primitive types.
pub receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
/// Executor to spawn tasks
pub executor: Tasks
}

impl<Pool, Client, EvmConfig, N: NodePrimitives>
OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N>
impl<Pool, Client, EvmConfig, N: NodePrimitives, Tasks>
OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Tasks>
where
Tasks: TaskSpawner + Clone + Unpin + 'static,
{
/// `OpPayloadBuilder` constructor.
pub fn new(
Expand All @@ -206,13 +216,15 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
pool: Pool,
client: Client,
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
executor: Tasks
) -> Self {
Self::with_builder_config(
evm_config,
builder_signer,
pool,
client,
receipt_builder,
executor,
Default::default(),
)
}
Expand All @@ -223,6 +235,7 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
pool: Pool,
client: Client,
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
executor: Tasks,
config: OpBuilderConfig,
) -> Self {
Self {
Expand All @@ -234,18 +247,20 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
best_transactions: (),
metrics: Default::default(),
builder_signer,
executor
}
}
}

impl<EvmConfig, Pool, Client, N, Txs> PayloadBuilder
for OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Txs>
impl<EvmConfig, Pool, Client, N, Tasks, Txs> PayloadBuilder
for OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Tasks, Txs>
where
Client: StateProviderFactory + ChainSpecProvider<ChainSpec: EthChainSpec + OpHardforks> + Clone,
Client: StateProviderFactory + ChainSpecProvider<ChainSpec: EthChainSpec + OpHardforks> + Clone + 'static ,
N: OpPayloadPrimitives<_TX = OpTransactionSigned>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
EvmConfig: ConfigureEvmFor<N>,
Txs: OpPayloadTransactions<Pool::Transaction>,
Tasks: TaskSpawner + Clone + Unpin + 'static,
{
type Attributes = OpPayloadBuilderAttributes<N::SignedTx>;
type BuiltPayload = OpBuiltPayload<N>;
Expand All @@ -257,44 +272,53 @@ where
) -> Result<(), PayloadBuilderError> {
let pool = self.pool.clone();
let block_build_start_time = Instant::now();

match self.build_payload(args, |attrs| {
#[allow(clippy::unit_arg)]
self.best_transactions.best_transactions(pool, attrs)
})? {
BuildOutcome::Better { payload, .. } => {
best_payload.set(payload);
self.metrics
.total_block_built_duration
.record(block_build_start_time.elapsed());
self.metrics.block_built_success.increment(1);
Ok(())
}
BuildOutcome::Freeze(payload) => {
best_payload.set(payload);
self.metrics
.total_block_built_duration
.record(block_build_start_time.elapsed());
Ok(())
}
BuildOutcome::Cancelled => {
tracing::warn!("Payload build cancelled");
Err(PayloadBuilderError::MissingPayload)
}
_ => {
tracing::warn!("No better payload found");
Err(PayloadBuilderError::MissingPayload)
let (tx, rx) = oneshot::channel();
let ctx = self.clone();
self.executor.spawn_blocking(Box::pin(async move {
match ctx.build_payload(args, |attrs| {
#[allow(clippy::unit_arg)]
ctx.best_transactions.best_transactions(pool, attrs)
}) {
Ok(BuildOutcome::Better { payload, .. }) => {
best_payload.set(payload);
ctx.metrics
.total_block_built_duration
.record(block_build_start_time.elapsed());
ctx.metrics.block_built_success.increment(1);
let _ = tx.send(Ok(()));
}
Ok(BuildOutcome::Freeze(payload)) => {
best_payload.set(payload);
ctx.metrics
.total_block_built_duration
.record(block_build_start_time.elapsed());
let _ = tx.send(Ok(()));
}
Ok(BuildOutcome::Cancelled) => {
tracing::warn!("Payload build cancelled");
let _ = tx.send(Err(PayloadBuilderError::MissingPayload));
}
Ok(_) => {
tracing::warn!("No better payload found");
let _ = tx.send(Err(PayloadBuilderError::MissingPayload));
}
Err(err) => {
tracing::warn!("Build payload error {}", err);
let _ = tx.send(Err(err));
}
}
}
}));
rx.blocking_recv().map_err(|err| PayloadBuilderError::from(err))?
}
}

impl<Pool, Client, EvmConfig, N, T> OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, T>
impl<Pool, Client, EvmConfig, N, Tasks, T> OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Tasks, T>
where
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
Client: StateProviderFactory + ChainSpecProvider<ChainSpec: EthChainSpec + OpHardforks>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
Client: StateProviderFactory + ChainSpecProvider<ChainSpec: EthChainSpec + OpHardforks> + 'static,
N: OpPayloadPrimitives<_TX = OpTransactionSigned>,
EvmConfig: ConfigureEvmFor<N>,
Tasks: TaskSpawner + Clone + Unpin + 'static,
{
/// Constructs an Optimism payload from the transactions sent via the
/// Payload attributes by the sequencer. If the `no_tx_pool` argument is passed in
Expand Down

0 comments on commit 9fa8d78

Please sign in to comment.