Skip to content

Commit

Permalink
Changes to activate all raydium pools on production (#15)
Browse files Browse the repository at this point in the history
* Changes to activate all raydium pools on production

* chaning quic client and repanicking on lags

* fixing prometheus server

* Removing CORs from prometheus server

* removing unwanted error message from prometheus

* Adding region dfw for dallas

* updating name in config file
  • Loading branch information
godmodegalactus authored Nov 4, 2024
1 parent 2d8ada1 commit 65a7a82
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 62 deletions.
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 1 addition & 17 deletions bin/autobahn-router/src/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl EdgeState {
}

pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> {
if !self.is_valid_out() {
if !self.is_valid() {
return None;
}

Expand Down Expand Up @@ -304,22 +304,6 @@ impl EdgeState {
true
}

pub fn is_valid_out(&self) -> bool {
if !self.is_valid {
return false;
}

if self.cooldown_until.is_some() {
// Do not check time here !
// We will reset "cooldown until" on first account update coming after cooldown
// So if this is not reset yet, it means that we didn't change anything
// No reason to be working again
return false;
}

true
}

pub fn reset_cooldown(&mut self) {
self.cooldown_event += 0;
self.cooldown_until = None;
Expand Down
15 changes: 10 additions & 5 deletions bin/autobahn-router/src/edge_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, warn};

#[derive(Clone)]
pub struct Dex {
Expand Down Expand Up @@ -242,14 +242,17 @@ impl EdgeUpdater {
Some(since) => {
if since.elapsed() > max_lag_duration {
panic!(
"Lagging a lot {} for more than {}s, exiting..",
"Lagging a lot {} for more than {}s, for dex {}..",
lag,
max_lag_duration.as_secs()
max_lag_duration.as_secs(),
self.dex.name,
);
}
}
}
return;
} else if state.slot_excessive_lagging_since.is_some() {
state.slot_excessive_lagging_since = None;
}
}
}
Expand Down Expand Up @@ -333,7 +336,9 @@ impl EdgeUpdater {
};

state.received_account.insert(pk);
state.latest_slot_pending = slot;
if state.latest_slot_pending < slot {
state.latest_slot_pending = slot;
}

self.check_readiness();

Expand Down Expand Up @@ -398,7 +403,7 @@ impl EdgeUpdater {
state.latest_slot_processed = state.latest_slot_pending;

if started_at.elapsed() > Duration::from_millis(100) {
info!(
debug!(
"{} - refresh {} - took - {:?}",
self.dex.name,
refreshed_edges.len(),
Expand Down
16 changes: 8 additions & 8 deletions bin/autobahn-router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ async fn main() -> anyhow::Result<()> {
let config = Config::load(&args[1])?;
let router_version = RouterVersion::OverestimateAmount;

if config.metrics.output_http {
let prom_bind_addr = config
.metrics
.prometheus_address
.clone()
.expect("prometheus_address must be set");
PrometheusSync::sync(prom_bind_addr);
}
let hot_mints = Arc::new(RwLock::new(HotMintsCache::new(&config.hot_mints)));

let mango_data = match mango::mango_fetcher::fetch_mango_data().await {
Expand Down Expand Up @@ -200,14 +208,6 @@ async fn main() -> anyhow::Result<()> {
exit(-1);
};

if config.metrics.output_http {
let prom_bind_addr = config
.metrics
.prometheus_address
.clone()
.expect("prometheus_address must be set");
let _prometheus = PrometheusSync::sync(prom_bind_addr);
}
if config.metrics.output_stdout {
warn!("metrics output to stdout is not supported yet");
}
Expand Down
41 changes: 16 additions & 25 deletions bin/autobahn-router/src/prometheus_sync.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
use std::time::Duration;

use axum::{routing, Router};
use prometheus::{Encoder, TextEncoder};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::task::JoinHandle;
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream, ToSocketAddrs},
};
use tracing::error;
use tracing::{error, info};

Check warning on line 5 in bin/autobahn-router/src/prometheus_sync.rs

View workflow job for this annotation

GitHub Actions / Router full build

unused import: `error`

Check warning on line 5 in bin/autobahn-router/src/prometheus_sync.rs

View workflow job for this annotation

GitHub Actions / Router full build

unused import: `error`

use crate::server::errors::AppError;

pub struct PrometheusSync;

impl PrometheusSync {
fn create_response(payload: &str) -> String {
fn create_response(payload: String) -> String {
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
payload.len(),
payload
)
}

async fn handle_stream(stream: &mut TcpStream) -> anyhow::Result<()> {
async fn get_prometheus_stream() -> Result<String, AppError> {
let mut metrics_buffer = Vec::new();
let encoder = TextEncoder::new();

Expand All @@ -29,29 +27,22 @@ impl PrometheusSync {
.unwrap();

let metrics_buffer = String::from_utf8(metrics_buffer).unwrap();
let response = Self::create_response(&metrics_buffer);

stream.writable().await?;
stream.write_all(response.as_bytes()).await?;

stream.flush().await?;

Ok(())
Ok(Self::create_response(metrics_buffer))
}

pub fn sync(addr: impl ToSocketAddrs + Send + 'static) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
let listener = TcpListener::bind(addr).await?;

loop {
let Ok((mut stream, _addr)) = listener.accept().await else {
error!("Error accepting prometheus stream");
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
};
let mut router: Router<()> = Router::new();
router = router.route("/metrics", routing::get(Self::get_prometheus_stream));

let handle = axum::serve(listener, router);

info!("Prometheus Server started");

let _ = Self::handle_stream(&mut stream).await;
}
handle.await.expect("Prometheus Server failed");
Ok(())
})
}
}
2 changes: 1 addition & 1 deletion bin/autobahn-router/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod alt_provider;
mod errors;
pub mod errors;
pub mod hash_provider;
pub mod http_server;
pub mod live_account_provider;
Expand Down
41 changes: 39 additions & 2 deletions bin/autobahn-router/template-config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
snapshot_timeout_in_seconds = 900

[infinity]
enabled = true

Expand All @@ -22,8 +24,8 @@ add_mango_tokens = false
[raydium]
enabled = true
mints = []
take_all_mints = false
add_mango_tokens = true
take_all_mints = true
add_mango_tokens = false

[raydium_cp]
enabled = true
Expand Down Expand Up @@ -77,26 +79,61 @@ dedup_queue_size = 50000
rpc_http_url = "$RPC_HTTP_URL"
rpc_support_compression = true
re_snapshot_interval_secs = 1200
request_timeout_in_seconds = 300

[[sources.grpc_sources]]
name = "router-other"
connection_string = "$RPC_HTTP_URL_WITHOUT_TOKEN"
token = "$RPC_TOKEN"
retry_connection_sleep_secs = 30

[[sources.quic_sources]]
name = "quic-client"
connection_string = "$RPC_QUIC_URL"
retry_connection_sleep_secs = 1
enable_gso = false

[[sources]]
region = "dfw"
dedup_queue_size = 50000
rpc_http_url = "$DFW_RPC_HTTP_URL"
rpc_support_compression = true
re_snapshot_interval_secs = 1200
request_timeout_in_seconds = 300

[[sources.grpc_sources]]
name = "router-dfw"
connection_string = "$DFW_RPC_HTTP_URL_WITHOUT_TOKEN"
token = "$AMS_RPC_TOKEN"
retry_connection_sleep_secs = 30

[[sources.quic_sources]]
name = "quic-client-dfw"
connection_string = "$DFW_RPC_QUIC_URL"
retry_connection_sleep_secs = 1
enable_gso = false


[[sources]]
region = "ams"
dedup_queue_size = 50000
rpc_http_url = "$AMS_RPC_HTTP_URL"
rpc_support_compression = true
re_snapshot_interval_secs = 1200
request_timeout_in_seconds = 300

[[sources.grpc_sources]]
name = "router-ams"
connection_string = "$AMS_RPC_HTTP_URL_WITHOUT_TOKEN"
token = "$AMS_RPC_TOKEN"
retry_connection_sleep_secs = 30

[[sources.quic_sources]]
name = "quic-client-ams "
connection_string = "$AMS_RPC_QUIC_URL"
retry_connection_sleep_secs = 1
enable_gso = false

[price_feed]
birdeye_token = "$BIRDEYE_TOKEN"
refresh_interval_secs = 1200 # every 20 min
Expand Down
4 changes: 2 additions & 2 deletions fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ kill_timeout = "30s"
cmd = ["autobahn-router", "/usr/local/bin/template-config.toml"]

[[vm]]
size = "shared-cpu-4x"
memory = "8gb"
size = "performance-4x"
memory = "16gb"

[[restart]]
policy = "always"
Expand Down
1 change: 1 addition & 0 deletions lib/router-config-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct GrpcSourceConfig {
#[derive(Clone, Debug, Default, serde_derive::Deserialize)]
pub struct QuicSourceConfig {
pub name: String,
#[serde(deserialize_with = "serde_string_or_env")]
pub connection_string: String,
pub retry_connection_sleep_secs: u64,
pub enable_gso: Option<bool>,
Expand Down

0 comments on commit 65a7a82

Please sign in to comment.