diff --git a/src/config.rs b/src/config.rs index 9081b87..9b314c9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,14 +1,26 @@ use std::{fs, path::Path}; use eyre::{Result, WrapErr}; +use hashbrown::HashMap; use serde::Deserialize; +#[derive(Debug, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum UrlProvider { + Lookahead, + UrlProvider, +} + #[derive(Debug, Deserialize)] pub struct Config { #[serde(rename = "lookahead-providers-relays")] pub lookahead_providers_relays: Vec, #[serde(rename = "beacon-urls")] pub beacon_urls: Vec, + #[serde(rename = "url-provider")] + pub url_provider: UrlProvider, + #[serde(rename = "pubkey-url-map")] + pub url_map: Option>, } #[derive(Debug, Deserialize)] diff --git a/src/forward_service.rs b/src/forward_service.rs index 92e254a..b90cab9 100644 --- a/src/forward_service.rs +++ b/src/forward_service.rs @@ -94,7 +94,7 @@ impl RpcForward { fn router(shared_state: SharedState) -> Router { Router::new() - .route("/:scan_id", post(scan_id_forward_request)) + .route("/:chain_id", post(scan_id_forward_request)) .route("/", post(forward_request)) .layer(TraceLayer::new_for_http()) .with_state(Arc::new(shared_state)) @@ -107,27 +107,32 @@ async fn scan_id_forward_request( body: Bytes, ) -> Result { if let Some(manager) = state.managers.get(&chain_id) { - if let Some(entry) = manager.get_next_elected_preconfer() { - match inner_forward_request(&state.client, &entry.url, body, headers).await { - Ok(res) => Ok(res), - Err(_) => Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "error while forwarding request".to_string(), - )), - } - } else { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("no preconfer has been elected for chain_id {}", chain_id), - )) + match manager.get_url() { + None => Err(( + StatusCode::BAD_REQUEST, + format!("no lookahead entry found for chain-id {}", chain_id), + )), + Some(url) => match url { + Ok(url) => match inner_forward_request(&state.client, &url, body, headers).await { + Ok(res) => Ok(res), + Err(_) => Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "error while forwarding request".to_string(), + )), + }, + Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())), + }, } } else { - Err((StatusCode::BAD_REQUEST, format!("no lookahead provider found for id {}", chain_id))) + Err(( + StatusCode::BAD_REQUEST, + format!("no lookahead provider found for chain-id {}", chain_id), + )) } } async fn forward_request(State(_state): State>) -> impl IntoResponse { - (StatusCode::BAD_REQUEST, "missing chain_id parameter") + (StatusCode::BAD_REQUEST, "missing chain-id parameter") } async fn inner_forward_request( @@ -148,6 +153,7 @@ mod test { time::Duration, }; + use alloy::rpc::types::beacon::{constants::BLS_PUBLIC_KEY_BYTES_LEN, BlsPublicKey}; use axum::{ extract::State, http::HeaderMap, @@ -163,7 +169,8 @@ mod test { use crate::{ forward_service::{router, SharedState}, - lookahead::{Lookahead, LookaheadEntry, LookaheadManager, LookaheadProvider}, + lookahead::{Lookahead, LookaheadEntry, LookaheadManager, LookaheadProvider, UrlProvider}, + preconf::election::{PreconferElection, SignedPreconferElection}, }; struct DummySharedState { @@ -176,6 +183,7 @@ mod test { let manager = LookaheadManager::new( Lookahead { map: DashMap::new().into() }, LookaheadProvider::None, + UrlProvider::LookaheadEntry, ); let mut managers = HashMap::new(); managers.insert(1u16, manager); @@ -186,7 +194,7 @@ mod test { tokio::time::sleep(Duration::from_secs(1)).await; let res = reqwest::Client::new().post("http://localhost:12001").send().await.unwrap(); assert_eq!(res.status(), StatusCode::BAD_REQUEST); - assert_eq!(res.text().await.unwrap(), "missing chain_id parameter"); + assert_eq!(res.text().await.unwrap(), "missing chain-id parameter"); Ok(()) } @@ -196,6 +204,7 @@ mod test { let manager = LookaheadManager::new( Lookahead { map: DashMap::new().into() }, LookaheadProvider::None, + UrlProvider::LookaheadEntry, ); let mut managers = HashMap::new(); managers.insert(1u16, manager); @@ -206,7 +215,7 @@ mod test { tokio::time::sleep(Duration::from_secs(1)).await; let res = reqwest::Client::new().post("http://localhost:12002/2").send().await.unwrap(); assert_eq!(res.status(), StatusCode::BAD_REQUEST); - assert_eq!(res.text().await.unwrap(), "no lookahead provider found for id 2"); + assert_eq!(res.text().await.unwrap(), "no lookahead provider found for chain-id 2"); Ok(()) } @@ -218,7 +227,36 @@ mod test { url: "http://not-a-valid-url".into(), ..Default::default() }); - let manager = LookaheadManager::new(Lookahead { map }, LookaheadProvider::None); + let manager = LookaheadManager::new( + Lookahead { map }, + LookaheadProvider::None, + UrlProvider::LookaheadEntry, + ); + let mut managers = HashMap::new(); + managers.insert(1u16, manager); + let router = router(SharedState::new(managers).unwrap()); + let listener = tokio::net::TcpListener::bind("localhost:12003").await.unwrap(); + axum::serve(listener, router).await.unwrap(); + }); + tokio::time::sleep(Duration::from_secs(1)).await; + let res = reqwest::Client::new().post("http://localhost:12003/1").send().await.unwrap(); + assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR); + Ok(()) + } + + #[tokio::test] + async fn test_no_pubkey() -> Result<()> { + tokio::spawn(async move { + let signature: BlsPublicKey = BlsPublicKey::from([42u8; BLS_PUBLIC_KEY_BYTES_LEN]); + let map = Arc::new(DashMap::new()); + let mut provider = HashMap::new(); + provider.insert(signature.to_string(), "http:://not-a-valid-http".into()); + map.insert(0, LookaheadEntry { url: "".into(), ..Default::default() }); + let manager = LookaheadManager::new( + Lookahead { map }, + LookaheadProvider::None, + UrlProvider::UrlMap(provider), + ); let mut managers = HashMap::new(); managers.insert(1u16, manager); let router = router(SharedState::new(managers).unwrap()); @@ -228,6 +266,13 @@ mod test { tokio::time::sleep(Duration::from_secs(1)).await; let res = reqwest::Client::new().post("http://localhost:12003/1").send().await.unwrap(); assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!( + res.text().await.unwrap(), + format!( + "could not find key for pubkey {}", + BlsPublicKey::from([0u8; BLS_PUBLIC_KEY_BYTES_LEN]).to_string() + ) + ); Ok(()) } @@ -248,7 +293,11 @@ mod test { url: "http://localhost:12004".into(), ..Default::default() }); - let manager = LookaheadManager::new(Lookahead { map }, LookaheadProvider::None); + let manager = LookaheadManager::new( + Lookahead { map }, + LookaheadProvider::None, + UrlProvider::LookaheadEntry, + ); let mut managers = HashMap::new(); managers.insert(1u16, manager); let router = router(SharedState::new(managers).unwrap()); @@ -276,6 +325,63 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_url_map_request() -> Result<()> { + tokio::spawn(async move { + let dst = Arc::new(Mutex::new(DummySharedState { cnt: 0 })); + let router: Router = Router::new() + .route("/", post(handle_request)) + .route("/cnt", get(counter)) + .with_state(dst); + let listener = tokio::net::TcpListener::bind("localhost:12006").await.unwrap(); + axum::serve(listener, router).await.unwrap(); + }); + tokio::spawn(async move { + let map = Arc::new(DashMap::new()); + let signature: BlsPublicKey = BlsPublicKey::from([42u8; BLS_PUBLIC_KEY_BYTES_LEN]); + let mut url_mapping = HashMap::new(); + url_mapping.insert(signature.to_string(), "http://localhost:12006".into()); + map.insert(0, LookaheadEntry { + url: "".into(), + election: SignedPreconferElection { + message: PreconferElection { + preconfer_pubkey: signature.clone(), + ..Default::default() + }, + ..Default::default() + }, + }); + let manager = LookaheadManager::new( + Lookahead { map }, + LookaheadProvider::None, + UrlProvider::UrlMap(url_mapping), + ); + let mut managers = HashMap::new(); + managers.insert(1u16, manager); + let router = router(SharedState::new(managers).unwrap()); + let listener = tokio::net::TcpListener::bind("localhost:12007").await.unwrap(); + axum::serve(listener, router).await.unwrap(); + }); + tokio::time::sleep(Duration::from_secs(1)).await; + for _ in 0..10 { + let mut headers = HeaderMap::new(); + headers.insert("Content-Type", HeaderValue::from_str("application/json").unwrap()); + let res = reqwest::Client::new() + .post("http://localhost:12007/1") + .body("dummy plain body") + .headers(headers) + .headers(HeaderMap::new()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + } + + let cnt_res = reqwest::get("http://localhost:12006/cnt").await.unwrap(); + assert_eq!(StatusCode::OK, cnt_res.status()); + assert_eq!(cnt_res.text().await.unwrap(), "10"); + Ok(()) + } async fn handle_request( State(state): State>>, headers: HeaderMap, diff --git a/src/lookahead/manager.rs b/src/lookahead/manager.rs index 0e909b8..93d8b7c 100644 --- a/src/lookahead/manager.rs +++ b/src/lookahead/manager.rs @@ -1,6 +1,6 @@ use alloy::rpc::types::beacon::events::HeadEvent; use dashmap::DashMap; -use eyre::{bail, Result}; +use eyre::{bail, ContextCompat, Result}; use hashbrown::HashMap; use tokio::sync::broadcast; @@ -16,17 +16,29 @@ enum LookaheadProviderManager { Running, } +#[derive(Debug, Clone)] +pub enum UrlProvider { + LookaheadEntry, + UrlMap(HashMap), +} + /// Manages the lookahead for preconfer elections. pub struct LookaheadManager { lookahead: Lookahead, provider_manager: Option, + url_provider: UrlProvider, } impl LookaheadManager { - pub fn new(lookahead: Lookahead, lookahead_provider: LookaheadProvider) -> Self { + pub fn new( + lookahead: Lookahead, + lookahead_provider: LookaheadProvider, + url_provider: UrlProvider, + ) -> Self { Self { lookahead, provider_manager: Some(LookaheadProviderManager::Initialized(lookahead_provider)), + url_provider, } } @@ -47,9 +59,21 @@ impl LookaheadManager { } } - pub fn get_next_elected_preconfer(&self) -> Option { + fn get_next_elected_preconfer(&self) -> Option { self.lookahead.get_next_elected_preconfer() } + + pub fn get_url(&self) -> Option> { + self.get_next_elected_preconfer().map(|entry| match &self.url_provider { + UrlProvider::LookaheadEntry => Ok(entry.url), + UrlProvider::UrlMap(m) => { + let pubkey = entry.election.preconfer_pubkey().to_string(); + m.get(&pubkey) + .cloned() + .wrap_err(format!("could not find key for pubkey {}", pubkey)) + } + }) + } } /// BBuilds a map of lookahead managers from the configuration, keyed by the chain-id. @@ -57,6 +81,16 @@ pub fn lookahead_managers_from_config( config: Config, beacon_tx: broadcast::Sender, ) -> HashMap { + let url_provider = match config.url_provider { + crate::config::UrlProvider::Lookahead => UrlProvider::LookaheadEntry, + crate::config::UrlProvider::UrlProvider => { + let mut m = HashMap::new(); + for (key, value) in config.url_map.expect("pubkey-url-map not present") { + m.insert(key, value); + } + UrlProvider::UrlMap(m) + } + }; // build managers from relay lookahead providers let mut map = HashMap::new(); for r_c in config.lookahead_providers_relays { @@ -70,7 +104,7 @@ pub fn lookahead_managers_from_config( )), } .build_relay_provider(); - map.insert(r_c.chain_id, LookaheadManager::new(lookahead, provider)); + map.insert(r_c.chain_id, LookaheadManager::new(lookahead, provider, url_provider.clone())); } map } diff --git a/src/preconf/constraints.rs b/src/preconf/constraints.rs index 2307419..f5bb9b2 100644 --- a/src/preconf/constraints.rs +++ b/src/preconf/constraints.rs @@ -60,6 +60,5 @@ mod tests { let s = serde_json::to_string(&singed_constraints).unwrap(); let encode = alloy::primitives::hex::encode(s); - println!("{}", encode); } }