Skip to content

Commit

Permalink
refactor: Change replay back mechanism (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
gshep authored Aug 13, 2024
1 parent 4d53d1f commit a2eb187
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 64 deletions.
9 changes: 0 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 1 addition & 12 deletions gear-programs/checkpoint-light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,8 @@ tree_hash_derive.workspace = true
gear-wasm-builder.workspace = true
checkpoint_light_client-io.workspace = true

[dev-dependencies]
gclient.workspace = true
ark-bls12-381 = { workspace = true, features = ["std"] }
serde = { workspace = true, features = ["std"] }
futures.workspace = true
tokio.workspace = true
hex = { workspace = true, features = ["std"] }
reqwest.workspace = true
serde_json.workspace = true
anyhow.workspace = true

[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
getrandom = { version = "0.2", default-features = false, features = ["js"] }
getrandom = { version = "0.2", default-features = false, features = ["custom"] }
lazy_static = { version = "1.1", features = ["spin_no_std"] }

[features]
Expand Down
41 changes: 14 additions & 27 deletions relayer/src/ethereum_checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use super::*;
use anyhow::{anyhow, Result as AnyResult};
use checkpoint_light_client_io::{
ethereum_common::{utils as eth_utils, SLOTS_PER_EPOCH},
meta::ReplayBack,
tree_hash::Hash256,
Handle, HandleResult, Slot, SyncCommitteeUpdate, G2,
};
use futures::{
future::{self, Either},
pin_mut,
};
use gclient::{EventListener, EventProcessor, GearApi, WSAddress};
use gclient::{EventProcessor, GearApi, WSAddress};
use parity_scale_codec::Decode;
use reqwest::Client;
use tokio::{
Expand Down Expand Up @@ -75,27 +76,14 @@ pub async fn relay(args: RelayCheckpointsArgs) {
let gas_limit = gas_limit_block / 100 * 95;
log::info!("Gas limit for extrinsics: {gas_limit}");

let mut listener = client
.subscribe()
.await
.expect("Events listener should be created");

let sync_update = receiver
.recv()
.await
.expect("Updates receiver should be open before the loop");

let mut slot_last = sync_update.finalized_header.slot;

match sync_update::try_to_apply(
&client,
&mut listener,
program_id,
sync_update.clone(),
gas_limit,
)
.await
{
match sync_update::try_to_apply(&client, program_id, sync_update.clone(), gas_limit).await {
Err(e) => {
log::error!("{e:?}");
return;
Expand All @@ -108,21 +96,17 @@ pub async fn relay(args: RelayCheckpointsArgs) {
&client_http,
&beacon_endpoint,
&client,
&mut listener,
program_id,
gas_limit,
replay_back.map(|r| r.last_header),
replay_back,
checkpoint,
sync_update,
)
.await
{
log::error!("{e:?}");
log::error!("{e:?}. Exiting");
return;
}

log::info!("Exiting");

return;
}
Ok(Ok(_) | Err(sync_update::Error::NotActual)) => (),
_ => {
Expand Down Expand Up @@ -171,9 +155,7 @@ pub async fn relay(args: RelayCheckpointsArgs) {
}

let committee_update = sync_update.sync_committee_next_pub_keys.is_some();
match sync_update::try_to_apply(&client, &mut listener, program_id, sync_update, gas_limit)
.await
{
match sync_update::try_to_apply(&client, program_id, sync_update, gas_limit).await {
Ok(Ok(_)) => {
slot_last = slot;

Expand All @@ -186,10 +168,15 @@ pub async fn relay(args: RelayCheckpointsArgs) {
}
}
Ok(Err(sync_update::Error::ReplayBackRequired { .. })) => {
log::info!("Replay back within the main loop. Exiting");
log::error!("Replay back within the main loop. Exiting");
return;
}
Ok(Err(e)) => log::info!("The program failed with: {e:?}. Skipping"),
Ok(Err(e)) => {
log::error!("The program failed with: {e:?}. Skipping");
if let sync_update::Error::NotActual = e {
slot_last = slot;
}
}
Err(e) => {
log::error!("{e:?}");
return;
Expand Down
26 changes: 11 additions & 15 deletions relayer/src/ethereum_checkpoints/replay_back.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,34 @@ pub async fn execute(
client_http: &Client,
beacon_endpoint: &str,
client: &GearApi,
listener: &mut EventListener,
program_id: [u8; 32],
gas_limit: u64,
replayed_slot: Option<Slot>,
replay_back: Option<ReplayBack>,
checkpoint: (Slot, Hash256),
sync_update: SyncCommitteeUpdate,
) -> AnyResult<()> {
log::info!("Replaying back started");

let (mut slot_start, _) = checkpoint;
if let Some(slot_end) = replayed_slot {
if let Some(ReplayBack {
finalized_header,
last_header: slot_end,
}) = replay_back
{
let slots_batch_iter = SlotsBatchIter::new(slot_start, slot_end, SIZE_BATCH)
.ok_or(anyhow!("Failed to create slots_batch::Iter with slot_start = {slot_start}, slot_end = {slot_end}."))?;

replay_back_slots(
client_http,
beacon_endpoint,
client,
listener,
program_id,
gas_limit,
slots_batch_iter,
)
.await?;

log::info!("The ongoing replaying back finished");

return Ok(());
slot_start = finalized_header;
}

let period_start = 1 + eth_utils::calculate_period(slot_start);
Expand Down Expand Up @@ -65,7 +65,6 @@ pub async fn execute(
client_http,
beacon_endpoint,
client,
listener,
program_id,
gas_limit,
slots_batch_iter.next(),
Expand All @@ -77,7 +76,6 @@ pub async fn execute(
client_http,
beacon_endpoint,
client,
listener,
program_id,
gas_limit,
slots_batch_iter,
Expand All @@ -97,7 +95,6 @@ pub async fn execute(
client_http,
beacon_endpoint,
client,
listener,
program_id,
gas_limit,
slots_batch_iter.next(),
Expand All @@ -109,7 +106,6 @@ pub async fn execute(
client_http,
beacon_endpoint,
client,
listener,
program_id,
gas_limit,
slots_batch_iter,
Expand All @@ -125,7 +121,6 @@ async fn replay_back_slots(
client_http: &Client,
beacon_endpoint: &str,
client: &GearApi,
listener: &mut EventListener,
program_id: [u8; 32],
gas_limit: u64,
slots_batch_iter: SlotsBatchIter,
Expand All @@ -135,7 +130,6 @@ async fn replay_back_slots(
client_http,
beacon_endpoint,
client,
listener,
program_id,
slot_start,
slot_end,
Expand All @@ -152,7 +146,6 @@ async fn replay_back_slots_inner(
client_http: &Client,
beacon_endpoint: &str,
client: &GearApi,
listener: &mut EventListener,
program_id: [u8; 32],
slot_start: Slot,
slot_end: Slot,
Expand All @@ -162,6 +155,8 @@ async fn replay_back_slots_inner(
request_headers(client_http, beacon_endpoint, slot_start, slot_end).await?,
);

let mut listener = client.subscribe().await?;

let (message_id, _) = client
.send_message(program_id.into(), payload, gas_limit, 0)
.await
Expand Down Expand Up @@ -190,7 +185,6 @@ async fn replay_back_slots_start(
client_http: &Client,
beacon_endpoint: &str,
client: &GearApi,
listener: &mut EventListener,
program_id: [u8; 32],
gas_limit: u64,
slots: Option<(Slot, Slot)>,
Expand All @@ -205,6 +199,8 @@ async fn replay_back_slots_start(
headers: request_headers(client_http, beacon_endpoint, slot_start, slot_end).await?,
};

let mut listener = client.subscribe().await?;

let (message_id, _) = client
.send_message(program_id.into(), payload, gas_limit, 0)
.await
Expand Down
3 changes: 2 additions & 1 deletion relayer/src/ethereum_checkpoints/sync_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ async fn receive(

pub async fn try_to_apply(
client: &GearApi,
listener: &mut EventListener,
program_id: [u8; 32],
sync_update: SyncCommitteeUpdate,
gas_limit: u64,
) -> AnyResult<Result<(), Error>> {
let mut listener = client.subscribe().await?;

let payload = Handle::SyncUpdate(sync_update);
let (message_id, _) = client
.send_message(program_id.into(), payload, gas_limit, 0)
Expand Down

0 comments on commit a2eb187

Please sign in to comment.