Skip to content

Commit

Permalink
Router: more granularity when updating edges because of price change (#7
Browse files Browse the repository at this point in the history
)
  • Loading branch information
farnyser authored Oct 1, 2024
1 parent c49de6c commit 9bd763f
Showing 1 changed file with 51 additions and 78 deletions.
129 changes: 51 additions & 78 deletions bin/autobahn-router/src/edge_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ struct EdgeUpdaterState {
pub latest_slot_processed: u64,
pub slot_excessive_lagging_since: Option<Instant>,
pub dirty_prices: bool,

Check warning on line 74 in bin/autobahn-router/src/edge_updater.rs

View workflow job for this annotation

GitHub Actions / Router full build

field `dirty_prices` is never read

Check warning on line 74 in bin/autobahn-router/src/edge_updater.rs

View workflow job for this annotation

GitHub Actions / Router full build

field `dirty_prices` is never read
pub dirty_pools: HashSet<Pubkey>,
pub received_account: HashSet<Pubkey>,
pub dirty_programs: HashSet<Pubkey>,
pub dirty_token_accounts_for_owners: bool,
pub dirty_edges: HashMap<(Pubkey, Pubkey), Arc<Edge>>,
pub edges_per_mint: HashMap<Pubkey, Vec<Arc<Edge>>>,
}

struct EdgeUpdater {
Expand Down Expand Up @@ -109,6 +111,18 @@ pub fn spawn_updater_job(
let config = config.clone();
let edges = dex.edges();

let mut edges_per_mint = HashMap::<Pubkey, Vec<Arc<Edge>>>::default();
for edge in &edges {
edges_per_mint
.entry(edge.input_mint)
.or_default()
.push(edge.clone());
edges_per_mint
.entry(edge.output_mint)
.or_default()
.push(edge.clone());
}

match &dex.subscription_mode {
DexSubscriptionMode::Accounts(x) => info!(
dex_name = dex.name,
Expand Down Expand Up @@ -138,8 +152,8 @@ pub fn spawn_updater_job(
),
};

let snapshot_timeout_in_seconds = config.snapshot_timeout_in_seconds.unwrap_or(60 * 5);
let snapshot_timeout = Instant::now() + Duration::from_secs(snapshot_timeout_in_seconds);
let init_timeout_in_seconds = config.snapshot_timeout_in_seconds.unwrap_or(60 * 5);
let init_timeout = Instant::now() + Duration::from_secs(init_timeout_in_seconds);
let listener_job = tokio_spawn(format!("edge_updater_{}", dex.name).as_str(), async move {
let mut updater = EdgeUpdater {
dex,
Expand All @@ -150,14 +164,13 @@ pub fn spawn_updater_job(
ready_sender,
config,
state: EdgeUpdaterState {
edges_per_mint,
..EdgeUpdaterState::default()
},
path_warming_amounts,
};

let mut refresh_all_interval = tokio::time::interval(Duration::from_secs(1));
let mut refresh_one_interval = tokio::time::interval(Duration::from_millis(10));
refresh_all_interval.tick().await;
refresh_one_interval.tick().await;

'drain_loop: loop {
Expand Down Expand Up @@ -190,20 +203,20 @@ pub fn spawn_updater_job(
break 'batch_loop;
}
}

},
Ok(_) = price_updates.recv() => {
updater.state.dirty_prices = true;
Ok(price_upd) = price_updates.recv() => {
if let Some(impacted_edges) = updater.state.edges_per_mint.get(&price_upd.mint) {
for edge in impacted_edges {
updater.state.dirty_edges.insert(edge.unique_id(), edge.clone());
}
};
},
_ = refresh_all_interval.tick() => {
updater.refresh_all(&edges);

if !updater.state.is_ready && snapshot_timeout < Instant::now() {
_ = refresh_one_interval.tick() => {
if !updater.state.is_ready && init_timeout < Instant::now() {
error!("Failed to init '{}' before timeout", updater.dex.name);
break;
}
}
_ = refresh_one_interval.tick() => {

updater.refresh_some();
}
}
Expand Down Expand Up @@ -293,7 +306,7 @@ impl EdgeUpdater {
match res {
Ok(v) => match v {
FeedMetadata::InvalidAccount(key) => {
state.dirty_pools.insert(key);
state.received_account.insert(key);
self.check_readiness();
}
FeedMetadata::SnapshotStart(_) => {}
Expand Down Expand Up @@ -335,7 +348,13 @@ impl EdgeUpdater {
}
};

state.dirty_pools.insert(pk);
if let Some(impacted_edges) = self.dex.edges_per_pk.get(&pk) {
for edge in impacted_edges {
state.dirty_edges.insert(edge.unique_id(), edge.clone());
}
};

state.received_account.insert(pk);
state.latest_slot_pending = slot;

self.check_readiness();
Expand All @@ -352,14 +371,14 @@ impl EdgeUpdater {

match &self.dex.subscription_mode {
DexSubscriptionMode::Accounts(accounts) => {
state.is_ready = state.dirty_pools.is_superset(&accounts);
state.is_ready = state.received_account.is_superset(&accounts);
}
DexSubscriptionMode::Disabled => {}
DexSubscriptionMode::Programs(programs) => {
state.is_ready = state.dirty_programs.is_superset(&programs);
}
DexSubscriptionMode::Mixed(m) => {
state.is_ready = state.dirty_pools.is_superset(&m.accounts)
state.is_ready = state.received_account.is_superset(&m.accounts)
&& state.dirty_programs.is_superset(&m.programs)
&& (state.dirty_token_accounts_for_owners
|| m.token_accounts_for_owner.is_empty());
Expand All @@ -373,84 +392,38 @@ impl EdgeUpdater {

fn refresh_some(&mut self) {
let state = &mut self.state;
if state.dirty_pools.is_empty() || !state.is_ready {
if state.dirty_edges.is_empty() || !state.is_ready {
return;
}

let started_at = Instant::now();
let mut refreshed_edges = HashSet::new();

for pk in state.dirty_pools.iter() {
let Some(edges_for_pk) = self.dex.edges_per_pk.get(&pk) else {
// TODO Is that a new market/pool ?
// Could have two list
// - edges_per_pk
// - ignored_pk
// To check if that's a new pk (coming from gPa subscription)
// And react accordingly (add to either of the two)
continue;
};

trace!(
"- Updating {} slot={}",
pk,
self.chain_data.account(pk).unwrap().slot
);

for edge in edges_for_pk {
if !refreshed_edges.insert(edge.unique_id()) {
continue;
}
let mut refreshed_edges = vec![];

edge.update(
&self.chain_data,
&self.token_cache,
&self.price_cache,
&self.path_warming_amounts,
);
}
}

state.latest_slot_processed = state.latest_slot_pending;
state.dirty_pools.clear();

if started_at.elapsed() > Duration::from_millis(100) {
info!(
"{} - refresh {} - took - {:?}",
self.dex.name,
refreshed_edges.len(),
started_at.elapsed()
)
}
}

// called once when startup is completed
#[tracing::instrument(skip_all, level = "trace")]
fn refresh_all(&mut self, edges: &Vec<Arc<Edge>>) {
let state = &mut self.state;
if !state.dirty_prices || !state.is_ready {
return;
}

let started_at = Instant::now();

for edge in edges {
for (unique_id, edge) in &state.dirty_edges {
edge.update(
&self.chain_data,
&self.token_cache,
&self.price_cache,
&self.path_warming_amounts,
);
refreshed_edges.push(unique_id.clone());

// Do not process for too long or we could miss update in account write queue
if started_at.elapsed() > Duration::from_millis(100) {
break;
}
}
for unique_id in &refreshed_edges {
state.dirty_edges.remove(&unique_id);
}

state.latest_slot_processed = state.latest_slot_pending;
state.dirty_prices = false;
state.dirty_pools.clear();

if started_at.elapsed() > Duration::from_millis(100) {
info!(
"{} - refresh_all - took - {:?}",
"{} - refresh {} - took - {:?}",
self.dex.name,
refreshed_edges.len(),
started_at.elapsed()
)
}
Expand Down

0 comments on commit 9bd763f

Please sign in to comment.