Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable account update batching #33

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 1 addition & 17 deletions bin/autobahn-router/src/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl EdgeState {
}

pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> {
if !self.is_valid_out() {
if !self.is_valid() {
return None;
}

Expand Down Expand Up @@ -304,22 +304,6 @@ impl EdgeState {
true
}

pub fn is_valid_out(&self) -> bool {
if !self.is_valid {
return false;
}

if self.cooldown_until.is_some() {
// Do not check time here !
// We will reset "cooldown until" on first account update coming after cooldown
// So if this is not reset yet, it means that we didn't change anything
// No reason to be working again
return false;
}

true
}

pub fn reset_cooldown(&mut self) {
self.cooldown_event += 0;
self.cooldown_until = None;
Expand Down
42 changes: 23 additions & 19 deletions bin/autobahn-router/src/edge_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, warn};

#[derive(Clone)]
pub struct Dex {
Expand Down Expand Up @@ -167,20 +167,19 @@ pub fn spawn_updater_job(
if !updater.invalidate_one(res) {
break 'drain_loop;
}

let mut batchsize: u32 = 0;
let started_at = Instant::now();
'batch_loop: while let Ok(res) = account_updates.try_recv() {
batchsize += 1;
if !updater.invalidate_one(Ok(res)) {
break 'drain_loop;
}

// budget for microbatch
if batchsize > 10 || started_at.elapsed() > Duration::from_micros(500) {
break 'batch_loop;
}
}
// let mut batchsize: u32 = 0;
// let started_at = Instant::now();
// 'batch_loop: while let Ok(res) = account_updates.try_recv() {
// batchsize += 1;
// if !updater.invalidate_one(Ok(res)) {
// break 'drain_loop;
// }

// // budget for microbatch
// if batchsize > 10 || started_at.elapsed() > Duration::from_micros(500) {
// break 'batch_loop;
// }
// }
},
Ok(price_upd) = price_updates.recv() => {
if let Some(impacted_edges) = updater.state.edges_per_mint.get(&price_upd.mint) {
Expand Down Expand Up @@ -242,14 +241,17 @@ impl EdgeUpdater {
Some(since) => {
if since.elapsed() > max_lag_duration {
panic!(
"Lagging a lot {} for more than {}s, exiting..",
"Lagging a lot {} for more than {}s, for dex {}..",
lag,
max_lag_duration.as_secs()
max_lag_duration.as_secs(),
self.dex.name,
);
}
}
}
return;
} else if state.slot_excessive_lagging_since.is_some() {
state.slot_excessive_lagging_since = None;
}
}
}
Expand Down Expand Up @@ -333,7 +335,9 @@ impl EdgeUpdater {
};

state.received_account.insert(pk);
state.latest_slot_pending = slot;
if state.latest_slot_pending < slot {
state.latest_slot_pending = slot;
}

self.check_readiness();

Expand Down Expand Up @@ -398,7 +402,7 @@ impl EdgeUpdater {
state.latest_slot_processed = state.latest_slot_pending;

if started_at.elapsed() > Duration::from_millis(100) {
info!(
debug!(
"{} - refresh {} - took - {:?}",
self.dex.name,
refreshed_edges.len(),
Expand Down
16 changes: 8 additions & 8 deletions bin/autobahn-router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ async fn main() -> anyhow::Result<()> {
let config = Config::load(&args[1])?;
let router_version = RouterVersion::OverestimateAmount;

if config.metrics.output_http {
let prom_bind_addr = config
.metrics
.prometheus_address
.clone()
.expect("prometheus_address must be set");
PrometheusSync::sync(prom_bind_addr);
}
let hot_mints = Arc::new(RwLock::new(HotMintsCache::new(&config.hot_mints)));

let mango_data = match mango::mango_fetcher::fetch_mango_data().await {
Expand Down Expand Up @@ -200,14 +208,6 @@ async fn main() -> anyhow::Result<()> {
exit(-1);
};

if config.metrics.output_http {
let prom_bind_addr = config
.metrics
.prometheus_address
.clone()
.expect("prometheus_address must be set");
let _prometheus = PrometheusSync::sync(prom_bind_addr);
}
if config.metrics.output_stdout {
warn!("metrics output to stdout is not supported yet");
}
Expand Down
41 changes: 16 additions & 25 deletions bin/autobahn-router/src/prometheus_sync.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
use std::time::Duration;

use axum::{routing, Router};
use prometheus::{Encoder, TextEncoder};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::task::JoinHandle;
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream, ToSocketAddrs},
};
use tracing::error;
use tracing::{error, info};

Check warning on line 5 in bin/autobahn-router/src/prometheus_sync.rs

View workflow job for this annotation

GitHub Actions / Router full build

unused import: `error`

Check warning on line 5 in bin/autobahn-router/src/prometheus_sync.rs

View workflow job for this annotation

GitHub Actions / Router full build

unused import: `error`

Check warning on line 5 in bin/autobahn-router/src/prometheus_sync.rs

View workflow job for this annotation

GitHub Actions / Router full build

unused import: `error`

Check warning on line 5 in bin/autobahn-router/src/prometheus_sync.rs

View workflow job for this annotation

GitHub Actions / Router full build

unused import: `error`

use crate::server::errors::AppError;

pub struct PrometheusSync;

impl PrometheusSync {
fn create_response(payload: &str) -> String {
fn create_response(payload: String) -> String {
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
payload.len(),
payload
)
}

async fn handle_stream(stream: &mut TcpStream) -> anyhow::Result<()> {
async fn get_prometheus_stream() -> Result<String, AppError> {
let mut metrics_buffer = Vec::new();
let encoder = TextEncoder::new();

Expand All @@ -29,29 +27,22 @@
.unwrap();

let metrics_buffer = String::from_utf8(metrics_buffer).unwrap();
let response = Self::create_response(&metrics_buffer);

stream.writable().await?;
stream.write_all(response.as_bytes()).await?;

stream.flush().await?;

Ok(())
Ok(Self::create_response(metrics_buffer))
}

pub fn sync(addr: impl ToSocketAddrs + Send + 'static) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
let listener = TcpListener::bind(addr).await?;

loop {
let Ok((mut stream, _addr)) = listener.accept().await else {
error!("Error accepting prometheus stream");
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
};
let mut router: Router<()> = Router::new();
router = router.route("/metrics", routing::get(Self::get_prometheus_stream));

let handle = axum::serve(listener, router);

info!("Prometheus Server started");

let _ = Self::handle_stream(&mut stream).await;
}
handle.await.expect("Prometheus Server failed");
Ok(())
})
}
}
2 changes: 1 addition & 1 deletion bin/autobahn-router/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod alt_provider;
mod errors;
pub mod errors;
pub mod hash_provider;
pub mod http_server;
pub mod live_account_provider;
Expand Down
41 changes: 39 additions & 2 deletions bin/autobahn-router/template-config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
snapshot_timeout_in_seconds = 900

[infinity]
enabled = true

Expand All @@ -22,8 +24,8 @@ add_mango_tokens = false
[raydium]
enabled = true
mints = []
take_all_mints = false
add_mango_tokens = true
take_all_mints = true
add_mango_tokens = false

[raydium_cp]
enabled = true
Expand Down Expand Up @@ -77,26 +79,61 @@ dedup_queue_size = 50000
rpc_http_url = "$RPC_HTTP_URL"
rpc_support_compression = true
re_snapshot_interval_secs = 1200
request_timeout_in_seconds = 300

[[sources.grpc_sources]]
name = "router-other"
connection_string = "$RPC_HTTP_URL_WITHOUT_TOKEN"
token = "$RPC_TOKEN"
retry_connection_sleep_secs = 30

[[sources.quic_sources]]
name = "quic-client"
connection_string = "$RPC_QUIC_URL"
retry_connection_sleep_secs = 1
enable_gso = false

[[sources]]
region = "dfw"
dedup_queue_size = 50000
rpc_http_url = "$DFW_RPC_HTTP_URL"
rpc_support_compression = true
re_snapshot_interval_secs = 1200
request_timeout_in_seconds = 300

[[sources.grpc_sources]]
name = "router-dfw"
connection_string = "$DFW_RPC_HTTP_URL_WITHOUT_TOKEN"
token = "$AMS_RPC_TOKEN"
retry_connection_sleep_secs = 30

[[sources.quic_sources]]
name = "quic-client-dfw"
connection_string = "$DFW_RPC_QUIC_URL"
retry_connection_sleep_secs = 1
enable_gso = false


[[sources]]
region = "ams"
dedup_queue_size = 50000
rpc_http_url = "$AMS_RPC_HTTP_URL"
rpc_support_compression = true
re_snapshot_interval_secs = 1200
request_timeout_in_seconds = 300

[[sources.grpc_sources]]
name = "router-ams"
connection_string = "$AMS_RPC_HTTP_URL_WITHOUT_TOKEN"
token = "$AMS_RPC_TOKEN"
retry_connection_sleep_secs = 30

[[sources.quic_sources]]
name = "quic-client-ams "
connection_string = "$AMS_RPC_QUIC_URL"
retry_connection_sleep_secs = 1
enable_gso = false

[price_feed]
birdeye_token = "$BIRDEYE_TOKEN"
refresh_interval_secs = 1200 # every 20 min
Expand Down
4 changes: 2 additions & 2 deletions fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ kill_timeout = "30s"
cmd = ["autobahn-router", "/usr/local/bin/template-config.toml"]

[[vm]]
size = "shared-cpu-4x"
memory = "8gb"
size = "performance-4x"
memory = "16gb"

[[restart]]
policy = "always"
Expand Down
1 change: 1 addition & 0 deletions lib/router-config-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct GrpcSourceConfig {
#[derive(Clone, Debug, Default, serde_derive::Deserialize)]
pub struct QuicSourceConfig {
pub name: String,
#[serde(deserialize_with = "serde_string_or_env")]
pub connection_string: String,
pub retry_connection_sleep_secs: u64,
pub enable_gso: Option<bool>,
Expand Down
Loading