diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index b1379a4d..a4f32619 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -1,4 +1,5 @@ pub mod model; +mod round_robin; use anyhow::{anyhow, Context}; use axum::{ @@ -19,11 +20,14 @@ use model::IndexerResponse; use moka::sync::Cache; use ordered_float::OrderedFloat; use serde_json::from_slice; -use std::{collections::VecDeque, net::IpAddr, sync::Arc}; +use std::{net::IpAddr, sync::Arc}; use tracing::{debug, error, warn}; use crate::{ - resolver::model::{Metadata, ProviderResult}, + resolver::{ + model::{Metadata, ProviderResult}, + round_robin::Queue, + }, util::error::Error, }; @@ -35,7 +39,7 @@ type Client = client::Client, Body>; pub struct Resolver { indexer_cid_url: String, client: Client, - cache: Cache, + cache: Cache>, maxminddb: Arc>>, location: GatewayLocation, } @@ -44,7 +48,7 @@ impl Resolver { pub fn new( indexer_cid_url: String, client: Client, - cache: Cache, + cache: Cache>, maxminddb: Arc>>, addr: IpAddr, ) -> Result { @@ -66,8 +70,10 @@ impl Resolver { }) } - /// Returns a set of provider address sorted by their distance relative to the gateway. - fn provider_addresses(&self, providers: Vec<&ProviderResult>) -> Addresses { + /// Partitions the providers into a set containing providers that are within + /// MAX_DISTANCE distance from the gateway and another set of providers that + /// are outside of that distance. + fn partition_providers(&self, providers: Vec<&ProviderResult>) -> Providers { let (neighbors, outsiders) = providers .into_iter() .flat_map(|provider_result| &provider_result.provider.addrs) @@ -137,9 +143,9 @@ impl Resolver { } Either::Left(address) }); - Addresses { - neighbors, - outsiders, + Providers { + neighbors: Queue::new(neighbors), + outsiders: Queue::new(outsiders), } } @@ -151,7 +157,7 @@ impl Resolver { anyhow!("Error parsed uri: {endpoint}") })?; - let mut provider_addresses = match self.cache.get(cid) { + let providers = match self.cache.get(cid) { None => { let body = match self .client @@ -220,29 +226,23 @@ impl Resolver { }) .collect(); - let provider_addresses = self.provider_addresses(providers); - if provider_addresses.neighbors.is_empty() - && provider_addresses.outsiders.is_empty() - { + let providers = self.partition_providers(providers); + if providers.neighbors.is_empty() && providers.outsiders.is_empty() { return Err(Error::Internal( "Failed to get a valid address for provider".to_string(), )); } + let providers = Arc::new(providers); + self.cache.insert(cid.to_string(), providers.clone()); - self.cache - .insert(cid.to_string(), provider_addresses.clone()); - - provider_addresses + providers } - Some(provider_addresses) => provider_addresses, + Some(providers) => providers, }; - debug!( - "Provider addresses to query: {:?}", - provider_addresses.neighbors - ); + debug!("Provider addresses to query: {:?}", providers.neighbors); - while let Some(addr) = provider_addresses.neighbors.pop_front() { + while let Some(addr) = providers.neighbors.next() { let endpoint = format!("{addr}/ursa/v0/{cid}"); let uri = match endpoint.parse::() { Ok(uri) => uri, @@ -252,24 +252,22 @@ impl Resolver { } }; match self.client.get(uri).await { - Ok(resp) => { - // The address is good so we put it back at the end. - provider_addresses.neighbors.push_back(addr); - self.cache.insert(cid.to_string(), provider_addresses); - return Ok(resp); + Ok(resp) => return Ok(resp), + Err(e) => { + providers.neighbors.remove(addr); + error!("Error querying the node provider: {endpoint:?} {e:?}") } - Err(e) => error!("Error querying the node provider: {endpoint:?} {e:?}"), }; } - if !provider_addresses.outsiders.is_empty() { + if !providers.outsiders.is_empty() { debug!( "Failed to get content from neighbors so falling back to {:?}", - provider_addresses.outsiders + providers.outsiders ); } - while let Some(addr) = provider_addresses.outsiders.pop_front() { + while let Some(addr) = providers.outsiders.next() { let endpoint = format!("{addr}/ursa/v0/{cid}"); let uri = match endpoint.parse::() { Ok(uri) => uri, @@ -279,13 +277,11 @@ impl Resolver { } }; match self.client.get(uri).await { - Ok(resp) => { - // The address is good so we put it back at the end. - provider_addresses.outsiders.push_back(addr); - self.cache.insert(cid.to_string(), provider_addresses); - return Ok(resp); + Ok(resp) => return Ok(resp), + Err(e) => { + providers.outsiders.remove(addr); + error!("Error querying the node provider: {endpoint:?} {e:?}") } - Err(e) => error!("Error querying the node provider: {endpoint:?} {e:?}"), }; } @@ -295,20 +291,16 @@ impl Resolver { } } -#[derive(Clone)] +pub struct Providers { + neighbors: Queue, + outsiders: Queue, +} + pub enum GatewayLocation { Private, Public(Location), } -#[derive(Clone)] -pub struct Addresses { - // Nodes within MAX_DISTANCE radius. - neighbors: VecDeque, - // Nodes outside MAX_DISTANCE radius. - outsiders: VecDeque, -} - fn get_location(city: City) -> Result { let location = city.location.ok_or_else(|| anyhow!("missing location"))?; let latitude = location diff --git a/crates/ursa-gateway/src/resolver/round_robin.rs b/crates/ursa-gateway/src/resolver/round_robin.rs new file mode 100644 index 00000000..ec3b73d0 --- /dev/null +++ b/crates/ursa-gateway/src/resolver/round_robin.rs @@ -0,0 +1,65 @@ +use std::{ + collections::{HashSet, VecDeque}, + sync::{Arc, RwLock, TryLockError}, + time::Duration, +}; +use tokio::time::Instant; + +const TIME_SHARING_DURATION: Duration = Duration::from_millis(500); + +#[derive(Debug)] +pub struct Queue { + inner: Arc>>, +} + +#[derive(Debug)] +struct Inner { + addresses: VecDeque, + stamp: Instant, +} + +impl Queue +where + T: Clone + PartialEq, +{ + pub fn new(addresses: HashSet) -> Self { + Self { + inner: Arc::new(RwLock::new(Inner { + addresses: addresses.into_iter().collect(), + stamp: Instant::now(), + })), + } + } + + pub fn next(&self) -> Option { + let inner = self.inner.read().unwrap(); + let now = Instant::now(); + if now.duration_since(inner.stamp) >= TIME_SHARING_DURATION { + drop(inner); + match self.inner.try_write() { + Ok(mut writer_guard) => { + let front = writer_guard.addresses.pop_front().unwrap(); + writer_guard.addresses.push_back(front.clone()); + writer_guard.stamp = now; + Some(front) + } + Err(TryLockError::WouldBlock) => { + self.inner.read().unwrap().addresses.front().cloned() + } + Err(TryLockError::Poisoned(e)) => panic!("{e}"), + } + } else { + inner.addresses.front().cloned() + } + } + + pub fn remove(&self, addr: T) { + let mut inner = self.inner.write().unwrap(); + inner.stamp = Instant::now(); + inner.addresses.retain(|a| a != &addr); + } + + pub fn is_empty(&self) -> bool { + self.inner.read().unwrap().addresses.is_empty() + } +}