Skip to content

Commit

Permalink
Move settlement updater (#3018)
Browse files Browse the repository at this point in the history
# Description
`OnSettlementEventUpdater` was previously refactored and ended up being
dependent on `domain` and `infra` objects only.

That means it is ready to be moved to a proper place inside `domain`.

# Changes
<!-- List of detailed changes (how the change is accomplished) -->

- [ ] Moved updater to domain::settlement::on_event module
- [ ] Updated EventIndexer to call deletion of settlement dependent data
directly.

## How to test
Existing e2e tests.
  • Loading branch information
sunce86 authored Sep 27, 2024
1 parent 8db2902 commit b585cdd
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 36 deletions.
14 changes: 7 additions & 7 deletions crates/autopilot/src/boundary/events/settlement.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::{database::Postgres, on_settlement_event_updater::OnSettlementEventUpdater},
crate::{database::Postgres, domain::settlement},
anyhow::Result,
ethrpc::block_stream::RangeInclusive,
shared::{event_handling::EventStoring, impl_event_retrieving},
Expand All @@ -11,14 +11,14 @@ impl_event_retrieving! {

pub struct Indexer {
db: Postgres,
settlement_updater: OnSettlementEventUpdater,
settlement_observer: settlement::Observer,
}

impl Indexer {
pub fn new(db: Postgres, settlement_updater: OnSettlementEventUpdater) -> Self {
pub fn new(db: Postgres, settlement_observer: settlement::Observer) -> Self {
Self {
db,
settlement_updater,
settlement_observer,
}
}
}
Expand All @@ -38,10 +38,10 @@ impl EventStoring<contracts::gpv2_settlement::Event> for Indexer {
let mut transaction = self.db.pool.begin().await?;
let from_block = *range.start();
crate::database::events::replace_events(&mut transaction, events, from_block).await?;
OnSettlementEventUpdater::delete_observations(&mut transaction, from_block).await?;
database::settlements::delete(&mut transaction, from_block).await?;
transaction.commit().await?;

self.settlement_updater.update().await;
self.settlement_observer.update().await;
Ok(())
}

Expand All @@ -53,7 +53,7 @@ impl EventStoring<contracts::gpv2_settlement::Event> for Indexer {
crate::database::events::append_events(&mut transaction, events).await?;
transaction.commit().await?;

self.settlement_updater.update().await;
self.settlement_observer.update().await;
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/autopilot/src/domain/settlement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use {
};

mod auction;
mod observer;
mod trade;
mod transaction;
pub use {auction::Auction, trade::Trade, transaction::Transaction};
pub use {auction::Auction, observer::Observer, trade::Trade, transaction::Transaction};

/// A settled transaction together with the `Auction`, for which it was executed
/// on-chain.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,22 @@

use {
crate::{domain::settlement, infra},
anyhow::{anyhow, Context, Result},
database::PgTransaction,
anyhow::{anyhow, Result},
};

#[derive(Clone)]
pub struct OnSettlementEventUpdater {
pub struct Observer {
eth: infra::Ethereum,
persistence: infra::Persistence,
}

impl OnSettlementEventUpdater {
/// Creates a new OnSettlementEventUpdater and asynchronously schedules the
/// first update run.
impl Observer {
/// Creates a new Observer and asynchronously schedules the first update
/// run.
pub fn new(eth: infra::Ethereum, persistence: infra::Persistence) -> Self {
Self { eth, persistence }
}

/// Deletes settlement_observations and order executions for the given range
pub async fn delete_observations(
transaction: &mut PgTransaction<'_>,
from_block: u64,
) -> Result<()> {
database::settlements::delete(transaction, from_block)
.await
.context("delete_settlement_observations")?;

Ok(())
}

/// Fetches all the available missing data needed for bookkeeping.
/// This needs to get called after indexing a new settlement event
/// since this code needs that data to already be present in the DB.
Expand Down Expand Up @@ -75,8 +62,7 @@ impl OnSettlementEventUpdater {

tracing::debug!(tx = ?event.transaction, "updating settlement details");

// Reconstruct the settlement transaction based on the blockchain transaction
// hash
// Reconstruct the settlement transaction based on the transaction hash
let transaction = match self.eth.transaction(event.transaction).await {
Ok(transaction) => {
let separator = self.eth.contracts().settlement_domain_separator();
Expand Down Expand Up @@ -128,7 +114,7 @@ impl OnSettlementEventUpdater {
}
}

/// Whether OnSettlementEventUpdater loop should retry on the given error.
/// Whether Observer loop should retry on the given error.
fn retryable(err: &settlement::Error) -> bool {
match err {
settlement::Error::Infra(_) => true,
Expand Down
1 change: 0 additions & 1 deletion crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pub mod domain;
pub mod event_updater;
pub mod infra;
mod maintenance;
pub mod on_settlement_event_updater;
pub mod periodic_db_cleanup;
pub mod run;
pub mod run_loop;
Expand Down
9 changes: 3 additions & 6 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,13 @@ pub async fn run(args: Arguments) {

let persistence =
infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await;
let on_settlement_event_updater =
crate::on_settlement_event_updater::OnSettlementEventUpdater::new(
eth.clone(),
persistence.clone(),
);
let settlement_observer =
crate::domain::settlement::Observer::new(eth.clone(), persistence.clone());
let settlement_event_indexer = EventUpdater::new(
boundary::events::settlement::GPv2SettlementContract::new(
eth.contracts().settlement().clone(),
),
boundary::events::settlement::Indexer::new(db.clone(), on_settlement_event_updater),
boundary::events::settlement::Indexer::new(db.clone(), settlement_observer),
block_retriever.clone(),
skip_event_sync_start,
);
Expand Down
1 change: 1 addition & 0 deletions crates/database/src/settlements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ WHERE block_number = $2 AND log_index = $3
.map(|_| ())
}

/// Deletes all database data that referenced the deleted settlement events.
pub async fn delete(
ex: &mut PgTransaction<'_>,
delete_from_block_number: u64,
Expand Down

0 comments on commit b585cdd

Please sign in to comment.