Skip to content

Commit

Permalink
fixes based on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-nguy committed Jan 28, 2025
1 parent 4357e48 commit d1594e2
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 105 deletions.
48 changes: 20 additions & 28 deletions core/node/node_sync/src/external_io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{collections::HashMap, time::Duration};
use std::{
collections::{HashMap, VecDeque},
time::Duration,
};

use anyhow::Context as _;
use async_trait::async_trait;
Expand Down Expand Up @@ -40,7 +43,7 @@ pub struct ExternalIO {
actions: ActionQueue,
main_node_client: Box<dyn MainNodeClient>,
chain_id: L2ChainId,
last_l2_block_params: Option<L2BlockParams>,
pending_l2_block_actions: VecDeque<L2BlockParams>,
}

impl ExternalIO {
Expand All @@ -57,7 +60,7 @@ impl ExternalIO {
actions,
main_node_client,
chain_id,
last_l2_block_params: None,
pending_l2_block_actions: VecDeque::new(),
})
}

Expand Down Expand Up @@ -338,37 +341,28 @@ impl StateKeeperIO for ExternalIO {
&mut self,
cursor: &IoCursor,
max_wait: Duration,
) -> anyhow::Result<Option<L2BlockParams>> {
// Wait for the next L2 block to appear in the queue.
let Some(action) = self.actions.recv_action(max_wait).await else {
return Ok(None);
};
match action {
SyncAction::L2Block { params, number } => {
anyhow::ensure!(
number == cursor.next_l2_block,
"L2 block number mismatch: expected {}, got {number}",
cursor.next_l2_block
);
self.last_l2_block_params = Some(params);
return Ok(Some(params));
}
other => {
anyhow::bail!(
"Unexpected action in the queue while waiting for the next L2 block: {other:?}"
);
) -> anyhow::Result<Option<(L2BlockParams, Option<Transaction>)>> {
if let Some(params) = self
.wait_for_empty_l2_block_params(cursor, max_wait)
.await?
{
if let Ok(Some(tx)) = self.wait_for_next_tx(max_wait, params.timestamp).await {
return Ok(Some((params, Some(tx))));
}
//if no tx, we save the block param for later
self.pending_l2_block_actions.push_back(params);
return Ok(Some((params, None)));
}
return Ok(None);
}

async fn wait_for_l2_block_params_when_closing_batch(
async fn wait_for_empty_l2_block_params(
&mut self,
cursor: &IoCursor,
max_wait: Duration,
) -> anyhow::Result<Option<L2BlockParams>> {
// Check if there is already a l2 block params, then use it
if let Some(params) = self.last_l2_block_params {
self.last_l2_block_params = None;
// Check if there is a pending l2 block action while waiting for the next tx, if yes process it
if let Some(params) = self.pending_l2_block_actions.pop_front() {
return Ok(Some(params));
} else {
// Alternatively, wait for the next L2 block to appear in the queue.
Expand Down Expand Up @@ -408,8 +402,6 @@ impl StateKeeperIO for ExternalIO {
match action {
SyncAction::Tx(tx) => {
self.actions.pop_action().unwrap();
//reset the flag because a l2 block is generated after receiving the tx
self.last_l2_block_params = None;
return Ok(Some(Transaction::from(*tx)));
}
SyncAction::SealL2Block | SyncAction::SealBatch => {
Expand Down
30 changes: 25 additions & 5 deletions core/node/state_keeper/src/io/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl StateKeeperIO for MempoolIO {
&mut self,
cursor: &IoCursor,
max_wait: Duration,
) -> anyhow::Result<Option<L2BlockParams>> {
) -> anyhow::Result<Option<(L2BlockParams, Option<Transaction>)>> {
// We must provide different timestamps for each L2 block.
// If L2 block sealing interval is greater than 1 second then `sleep_past` won't actually sleep.
let timeout_result = tokio::time::timeout(
Expand All @@ -283,19 +283,39 @@ impl StateKeeperIO for MempoolIO {
return Ok(None);
};

Ok(Some(L2BlockParams {
let l2_block_param = L2BlockParams {
timestamp,
// This value is effectively ignored by the protocol.
virtual_blocks: 1,
}))
};

if let Ok(Some(tx)) = self.wait_for_next_tx(max_wait, timestamp).await {
return Ok(Some((l2_block_param, Some(tx))));
}
Ok(Some((l2_block_param, None)))
}

async fn wait_for_l2_block_params_when_closing_batch(
async fn wait_for_empty_l2_block_params(
&mut self,
cursor: &IoCursor,
max_wait: Duration,
) -> anyhow::Result<Option<L2BlockParams>> {
self.wait_for_new_l2_block_params(cursor, max_wait).await
// We must provide different timestamps for each L2 block.
// If L2 block sealing interval is greater than 1 second then `sleep_past` won't actually sleep.
let timeout_result = tokio::time::timeout(
max_wait,
sleep_past(cursor.prev_l2_block_timestamp, cursor.next_l2_block),
)
.await;
let Ok(timestamp) = timeout_result else {
return Ok(None);
};

Ok(Some(L2BlockParams {
timestamp,
// This value is effectively ignored by the protocol.
virtual_blocks: 1,
}))
}

async fn wait_for_next_tx(
Expand Down
4 changes: 2 additions & 2 deletions core/node/state_keeper/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ pub trait StateKeeperIO: 'static + Send + Sync + fmt::Debug + IoSealCriteria {
&mut self,
cursor: &IoCursor,
max_wait: Duration,
) -> anyhow::Result<Option<L2BlockParams>>;
) -> anyhow::Result<Option<(L2BlockParams, Option<Transaction>)>>;

/// Blocks for up to `max_wait` until the parameters for the final L2 block are available.
async fn wait_for_l2_block_params_when_closing_batch(
async fn wait_for_empty_l2_block_params(
&mut self,
cursor: &IoCursor,
max_wait: Duration,
Expand Down
2 changes: 1 addition & 1 deletion core/node/state_keeper/src/io/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ async fn different_timestamp_for_l2_blocks_in_same_batch(commitment_mode: L1Batc
let current_timestamp = seconds_since_epoch();
io_cursor.prev_l2_block_timestamp = current_timestamp;

let l2_block_params = mempool
let (l2_block_params, _) = mempool
.wait_for_new_l2_block_params(&io_cursor, Duration::from_secs(10))
.await
.unwrap()
Expand Down
80 changes: 19 additions & 61 deletions core/node/state_keeper/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl ZkSyncStateKeeper {
// We've sealed the L2 block that we had, but we still need to set up the timestamp
// for the fictive L2 block.
let new_l2_block_params = self
.wait_for_new_l2_block_params(&updates_manager, &stop_receiver)
.wait_for_empty_l2_block_params(&updates_manager, &stop_receiver)
.await?;
Self::start_next_l2_block(
new_l2_block_params,
Expand Down Expand Up @@ -379,23 +379,6 @@ impl ZkSyncStateKeeper {
}
}

#[tracing::instrument(skip_all)]
async fn wait_for_next_tx(
&mut self,
l2_block_timestamp: u64,
) -> anyhow::Result<Option<Transaction>> {
let Some(tx) = self
.io
.wait_for_next_tx(POLL_WAIT_DURATION, l2_block_timestamp)
.instrument(info_span!("wait_for_next_tx"))
.await
.context("error waiting for next transaction")?
else {
return Ok(None);
};
Ok(Some(tx))
}

#[tracing::instrument(
skip_all,
fields(
Expand All @@ -407,21 +390,24 @@ impl ZkSyncStateKeeper {
&mut self,
updates: &UpdatesManager,
stop_receiver: &watch::Receiver<bool>,
) -> Result<L2BlockParams, Error> {
) -> Result<Option<(L2BlockParams, Transaction)>, Error> {
let latency = KEEPER_METRICS.wait_for_l2_block_params.start();
let cursor = updates.io_cursor();
while !is_canceled(stop_receiver) {
if let Some(params) = self
if let Some((params, next_tx)) = self
.io
.wait_for_new_l2_block_params(&cursor, POLL_WAIT_DURATION)
.await
.context("error waiting for new L2 block params")?
{
self.health_updater
.update(StateKeeperHealthDetails::from(&cursor).into());

if let Some(tx) = next_tx {
latency.observe();
return Ok(Some((params, tx)));
}
latency.observe();
return Ok(params);
return Ok(None);
}
}
Err(Error::Canceled)
Expand All @@ -434,44 +420,19 @@ impl ZkSyncStateKeeper {
l2_block = %updates.l2_block.number,
)
)]
async fn wait_for_new_l2_block_params_and_first_tx(
async fn wait_for_empty_l2_block_params(
&mut self,
updates: &mut UpdatesManager,
stop_receiver: &watch::Receiver<bool>,
) -> Result<Option<(L2BlockParams, Transaction)>, Error> {
let new_l2_params = self
.wait_for_new_l2_block_params(updates, stop_receiver)
.await?;
if let Some(tx) = self
.wait_for_next_tx(new_l2_params.timestamp)
.await
.context("error waiting for the first transaction")?
{
return Ok(Some((new_l2_params, tx)));
}
return Ok(None);
}

#[tracing::instrument(
skip_all,
fields(
l1_batch = %updates.l1_batch.number,
l2_block = %updates.l2_block.number,
)
)]
async fn wait_for_l2_block_params_when_closing_batch(
&mut self,
updates: &mut UpdatesManager,
updates: &UpdatesManager,
stop_receiver: &watch::Receiver<bool>,
) -> Result<L2BlockParams, Error> {
let latency = KEEPER_METRICS.wait_for_l2_block_params.start();
let cursor = updates.io_cursor();
while !is_canceled(stop_receiver) {
if let Some(params) = self
.io
.wait_for_l2_block_params_when_closing_batch(&cursor, POLL_WAIT_DURATION)
.wait_for_empty_l2_block_params(&cursor, POLL_WAIT_DURATION)
.await
.context("error waiting for new L2 block params in wait_for_l2_block_params_when_closing_batch")?
.context("error waiting for empty L2 block params")?
{
self.health_updater
.update(StateKeeperHealthDetails::from(&cursor).into());
Expand Down Expand Up @@ -655,14 +616,9 @@ impl ZkSyncStateKeeper {
// Push the current block if it has not been done yet
if is_last_block_sealed {
let new_l2_block_params = self
.wait_for_l2_block_params_when_closing_batch(
updates_manager,
stop_receiver,
)
.wait_for_empty_l2_block_params(updates_manager, stop_receiver)
.await
.map_err(|e| {
e.context("wait_for_l2_block_params_when_closing_batch")
})?;
.map_err(|e| e.context("wait_for_empty_l2_block_params"))?;
Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor)
.await?;
}
Expand All @@ -683,9 +639,9 @@ impl ZkSyncStateKeeper {
let next_tx;
if is_last_block_sealed {
let Some((params, tx)) = self
.wait_for_new_l2_block_params_and_first_tx(updates_manager, stop_receiver)
.wait_for_new_l2_block_params(updates_manager, stop_receiver)
.await
.context("error waiting for new l2 block params and next transaction")?
.context("error waiting for new l2 block params")?
else {
waiting_latency.observe();
tracing::trace!("No new transactions. Waiting!");
Expand All @@ -703,7 +659,9 @@ impl ZkSyncStateKeeper {
next_tx = tx;
} else {
let Some(tx) = self
.wait_for_next_tx(updates_manager.l2_block.timestamp)
.io
.wait_for_next_tx(POLL_WAIT_DURATION, updates_manager.l2_block.timestamp)
.instrument(info_span!("wait_for_next_tx"))
.await
.context("error waiting for next transaction")?
else {
Expand Down
25 changes: 17 additions & 8 deletions core/node/state_keeper/src/testonly/test_batch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,23 @@ impl StateKeeperIO for TestIO {
&mut self,
cursor: &IoCursor,
_max_wait: Duration,
) -> anyhow::Result<Option<(L2BlockParams, Option<Transaction>)>> {
if let Some(params) = self
.wait_for_empty_l2_block_params(cursor, _max_wait)
.await?
{
if let Ok(Some(tx)) = self.wait_for_next_tx(_max_wait, params.timestamp).await {
return Ok(Some((params, Some(tx))));
}
return Ok(Some((params, None)));
}
return Ok(None);
}

async fn wait_for_empty_l2_block_params(
&mut self,
cursor: &IoCursor,
_max_wait: Duration,
) -> anyhow::Result<Option<L2BlockParams>> {
assert_eq!(cursor.next_l2_block, self.l2_block_number);
let params = L2BlockParams {
Expand All @@ -720,14 +737,6 @@ impl StateKeeperIO for TestIO {
Ok(Some(params))
}

async fn wait_for_l2_block_params_when_closing_batch(
&mut self,
cursor: &IoCursor,
_max_wait: Duration,
) -> anyhow::Result<Option<L2BlockParams>> {
self.wait_for_new_l2_block_params(cursor, _max_wait).await
}

async fn wait_for_next_tx(
&mut self,
max_wait: Duration,
Expand Down

0 comments on commit d1594e2

Please sign in to comment.