Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic data prims EngineSyncController #12825

Open
Tracked by #12578
emhane opened this issue Nov 24, 2024 · 0 comments
Open
Tracked by #12578

Generic data prims EngineSyncController #12825

emhane opened this issue Nov 24, 2024 · 0 comments
Labels
A-consensus Related to the consensus engine A-sdk Related to reth's use as a library C-debt Refactor of code section that is hard to understand or maintain D-good-first-issue Nice and easy! A great choice to get started S-blocked This cannot more forward until something else changes

Comments

@emhane
Copy link
Member

emhane commented Nov 24, 2024

Describe the feature

In EngineSyncController type definition, replace BeaconConsensusEngineEvent with BeaconConsensusEngineEvent<N::Primitives>, and OrderedSealedBlock with OrderedSealedBlock<reth_node_types::HeaderTy<N>, reth_node_types::BodyTy<N>>. Update the impl body accordingly.

/// Manages syncing under the control of the engine.
///
/// This type controls the [Pipeline] and supports (single) full block downloads.
///
/// Caution: If the pipeline is running, this type will not emit blocks downloaded from the network
/// [`EngineSyncEvent::FetchedFullBlock`] until the pipeline is idle to prevent commits to the
/// database while the pipeline is still active.
pub(crate) struct EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: EthBlockClient,
{
/// A downloader that can download full blocks from the network.
full_block_client: FullBlockClient<Client>,
/// The type that can spawn the pipeline task.
pipeline_task_spawner: Box<dyn TaskSpawner>,
/// The current state of the pipeline.
/// The pipeline is used for large ranges.
pipeline_state: PipelineState<N>,
/// Pending target block for the pipeline to sync
pending_pipeline_target: Option<PipelineTarget>,
/// In-flight full block requests in progress.
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Sender for engine events.
event_sender: EventSender<BeaconConsensusEngineEvent>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
/// Max block after which the consensus engine would terminate the sync. Used for debugging
/// purposes.
max_block: Option<BlockNumber>,
/// Engine sync metrics.
metrics: EngineSyncMetrics,
}
impl<N, Client> EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: EthBlockClient + 'static,
{
/// Create a new instance
pub(crate) fn new(
pipeline: Pipeline<N>,
client: Client,
pipeline_task_spawner: Box<dyn TaskSpawner>,
max_block: Option<BlockNumber>,
chain_spec: Arc<N::ChainSpec>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(
client,
Arc::new(EthBeaconConsensus::new(chain_spec)),
),
pipeline_task_spawner,
pipeline_state: PipelineState::Idle(Some(pipeline)),
pending_pipeline_target: None,
inflight_full_block_requests: Vec::new(),
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
event_sender,
max_block,
metrics: EngineSyncMetrics::default(),
}
}
/// Sets the metrics for the active downloads
fn update_block_download_metrics(&self) {
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
// TODO: full block range metrics
}
/// Sets the max block value for testing
#[cfg(test)]
pub(crate) fn set_max_block(&mut self, block: BlockNumber) {
self.max_block = Some(block);
}
/// Cancels all download requests that are in progress and buffered blocks.
pub(crate) fn clear_block_download_requests(&mut self) {
self.inflight_full_block_requests.clear();
self.inflight_block_range_requests.clear();
self.range_buffered_blocks.clear();
self.update_block_download_metrics();
}
/// Cancels the full block request with the given hash.
pub(crate) fn cancel_full_block_request(&mut self, hash: B256) {
self.inflight_full_block_requests.retain(|req| *req.hash() != hash);
self.update_block_download_metrics();
}
/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)]
pub(crate) const fn is_pipeline_sync_pending(&self) -> bool {
self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
}
/// Returns `true` if the pipeline is idle.
pub(crate) const fn is_pipeline_idle(&self) -> bool {
self.pipeline_state.is_idle()
}
/// Returns `true` if the pipeline is active.
pub(crate) const fn is_pipeline_active(&self) -> bool {
!self.is_pipeline_idle()
}
/// Returns true if there's already a request for the given hash.
pub(crate) fn is_inflight_request(&self, hash: B256) -> bool {
self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
}
/// Starts requesting a range of blocks from the network, in reverse from the given hash.
///
/// If the `count` is 1, this will use the `download_full_block` method instead, because it
/// downloads headers and bodies for the block concurrently.
pub(crate) fn download_block_range(&mut self, hash: B256, count: u64) {
if count == 1 {
self.download_full_block(hash);
} else {
trace!(
target: "consensus::engine",
?hash,
?count,
"start downloading full block range."
);
// notify listeners that we're downloading a block range
self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: count,
target: hash,
},
));
let request = self.full_block_client.get_full_block_range(hash, count);
self.inflight_block_range_requests.push(request);
}
// // TODO: need more metrics for block ranges
// self.update_block_download_metrics();
}
/// Starts requesting a full block from the network.
///
/// Returns `true` if the request was started, `false` if there's already a request for the
/// given hash.
pub(crate) fn download_full_block(&mut self, hash: B256) -> bool {
if self.is_inflight_request(hash) {
return false
}
trace!(
target: "consensus::engine::sync",
?hash,
"Start downloading full block"
);
// notify listeners that we're downloading a block
self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: 1,
target: hash,
},
));
let request = self.full_block_client.get_full_block(hash);
self.inflight_full_block_requests.push(request);
self.update_block_download_metrics();
true
}
/// Sets a new target to sync the pipeline to.
///
/// But ensures the target is not the zero hash.
pub(crate) fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
if target.sync_target().is_some_and(|target| target.is_zero()) {
trace!(
target: "consensus::engine::sync",
"Pipeline target cannot be zero hash."
);
// precaution to never sync to the zero hash
return
}
self.pending_pipeline_target = Some(target);
}
/// Check if the engine reached max block as specified by `max_block` parameter.
///
/// Note: this is mainly for debugging purposes.
pub(crate) fn has_reached_max_block(&self, progress: BlockNumber) -> bool {
let has_reached_max_block = self.max_block.is_some_and(|target| progress >= target);
if has_reached_max_block {
trace!(
target: "consensus::engine::sync",
?progress,
max_block = ?self.max_block,
"Consensus engine reached max block"
);
}
has_reached_max_block
}
/// Advances the pipeline state.
///
/// This checks for the result in the channel, or returns pending if the pipeline is idle.
fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent> {
let res = match self.pipeline_state {
PipelineState::Idle(_) => return Poll::Pending,
PipelineState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let ev = match res {
Ok((pipeline, result)) => {
let minimum_block_number = pipeline.minimum_block_number();
let reached_max_block =
self.has_reached_max_block(minimum_block_number.unwrap_or_default());
self.pipeline_state = PipelineState::Idle(Some(pipeline));
EngineSyncEvent::PipelineFinished { result, reached_max_block }
}
Err(_) => {
// failed to receive the pipeline
EngineSyncEvent::PipelineTaskDropped
}
};
Poll::Ready(ev)
}
/// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
/// run continuously.
fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent> {
match &mut self.pipeline_state {
PipelineState::Idle(pipeline) => {
let target = self.pending_pipeline_target.take()?;
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result);
}),
);
self.pipeline_state = PipelineState::Running(rx);
// we also clear any pending full block requests because we expect them to be
// outdated (included in the range the pipeline is syncing anyway)
self.clear_block_download_requests();
Some(EngineSyncEvent::PipelineStarted(Some(target)))
}
PipelineState::Running(_) => None,
}
}
/// Advances the sync process.
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent> {
// try to spawn a pipeline if a target is set
if let Some(event) = self.try_spawn_pipeline() {
return Poll::Ready(event)
}
// make sure we poll the pipeline if it's active, and return any ready pipeline events
if !self.is_pipeline_idle() {
// advance the pipeline
if let Poll::Ready(event) = self.poll_pipeline(cx) {
return Poll::Ready(event)
}
}
// advance all full block requests
for idx in (0..self.inflight_full_block_requests.len()).rev() {
let mut request = self.inflight_full_block_requests.swap_remove(idx);
if let Poll::Ready(block) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
self.range_buffered_blocks.push(Reverse(OrderedSealedBlock(block)));
} else {
// still pending
self.inflight_full_block_requests.push(request);
}
}
// advance all full block range requests
for idx in (0..self.inflight_block_range_requests.len()).rev() {
let mut request = self.inflight_block_range_requests.swap_remove(idx);
if let Poll::Ready(blocks) = request.poll_unpin(cx) {
trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
self.range_buffered_blocks
.extend(blocks.into_iter().map(OrderedSealedBlock).map(Reverse));
} else {
// still pending
self.inflight_block_range_requests.push(request);
}
}
self.update_block_download_metrics();
// drain an element of the block buffer if there are any
if let Some(block) = self.range_buffered_blocks.pop() {
// peek ahead and pop duplicates
while let Some(peek) = self.range_buffered_blocks.peek_mut() {
if peek.0 .0.hash() == block.0 .0.hash() {
PeekMut::pop(peek);
} else {
break
}
}
return Poll::Ready(EngineSyncEvent::FetchedFullBlock(block.0 .0))
}
Poll::Pending
}
}

blocked by #12824, #12823 and #12822

Additional context

No response

@emhane emhane added A-sdk Related to reth's use as a library C-debt Refactor of code section that is hard to understand or maintain D-good-first-issue Nice and easy! A great choice to get started S-blocked This cannot more forward until something else changes labels Nov 24, 2024
@emhane emhane added the A-consensus Related to the consensus engine label Nov 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-consensus Related to the consensus engine A-sdk Related to reth's use as a library C-debt Refactor of code section that is hard to understand or maintain D-good-first-issue Nice and easy! A great choice to get started S-blocked This cannot more forward until something else changes
Projects
Status: Todo
Development

No branches or pull requests

1 participant