Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

perf(gateway): stable round robin #437

Merged
merged 11 commits into from
Mar 16, 2023
88 changes: 40 additions & 48 deletions crates/ursa-gateway/src/resolver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod model;
mod round_robin;

use anyhow::{anyhow, Context};
use axum::{
Expand All @@ -19,11 +20,14 @@ use model::IndexerResponse;
use moka::sync::Cache;
use ordered_float::OrderedFloat;
use serde_json::from_slice;
use std::{collections::VecDeque, net::IpAddr, sync::Arc};
use std::{net::IpAddr, sync::Arc};
use tracing::{debug, error, warn};

use crate::{
resolver::model::{Metadata, ProviderResult},
resolver::{
model::{Metadata, ProviderResult},
round_robin::Queue,
},
util::error::Error,
};

Expand All @@ -35,7 +39,7 @@ type Client = client::Client<HttpsConnector<HttpConnector>, Body>;
pub struct Resolver {
indexer_cid_url: String,
client: Client,
cache: Cache<String, Addresses>,
cache: Cache<String, Arc<Providers>>,
maxminddb: Arc<Reader<Vec<u8>>>,
location: GatewayLocation,
}
Expand All @@ -44,7 +48,7 @@ impl Resolver {
pub fn new(
indexer_cid_url: String,
client: Client,
cache: Cache<String, Addresses>,
cache: Cache<String, Arc<Providers>>,
maxminddb: Arc<Reader<Vec<u8>>>,
addr: IpAddr,
) -> Result<Self, Error> {
Expand All @@ -66,8 +70,10 @@ impl Resolver {
})
}

/// Returns a set of provider address sorted by their distance relative to the gateway.
fn provider_addresses(&self, providers: Vec<&ProviderResult>) -> Addresses {
/// Partitions the providers into a set containing providers that are within
/// MAX_DISTANCE distance from the gateway and another set of providers that
/// are outside of that distance.
fn partition_providers(&self, providers: Vec<&ProviderResult>) -> Providers {
let (neighbors, outsiders) = providers
.into_iter()
.flat_map(|provider_result| &provider_result.provider.addrs)
Expand Down Expand Up @@ -137,9 +143,9 @@ impl Resolver {
}
Either::Left(address)
});
Addresses {
neighbors,
outsiders,
Providers {
neighbors: Queue::new(neighbors),
outsiders: Queue::new(outsiders),
}
}

Expand All @@ -151,7 +157,7 @@ impl Resolver {
anyhow!("Error parsed uri: {endpoint}")
})?;

let mut provider_addresses = match self.cache.get(cid) {
let providers = match self.cache.get(cid) {
None => {
let body = match self
.client
Expand Down Expand Up @@ -220,29 +226,23 @@ impl Resolver {
})
.collect();

let provider_addresses = self.provider_addresses(providers);
if provider_addresses.neighbors.is_empty()
&& provider_addresses.outsiders.is_empty()
{
let providers = self.partition_providers(providers);
if providers.neighbors.is_empty() && providers.outsiders.is_empty() {
return Err(Error::Internal(
"Failed to get a valid address for provider".to_string(),
));
}
let providers = Arc::new(providers);
self.cache.insert(cid.to_string(), providers.clone());

self.cache
.insert(cid.to_string(), provider_addresses.clone());

provider_addresses
providers
}
Some(provider_addresses) => provider_addresses,
Some(providers) => providers,
};

debug!(
"Provider addresses to query: {:?}",
provider_addresses.neighbors
);
debug!("Provider addresses to query: {:?}", providers.neighbors);

while let Some(addr) = provider_addresses.neighbors.pop_front() {
while let Some(addr) = providers.neighbors.next() {
let endpoint = format!("{addr}/ursa/v0/{cid}");
let uri = match endpoint.parse::<Uri>() {
Ok(uri) => uri,
Expand All @@ -252,24 +252,22 @@ impl Resolver {
}
};
match self.client.get(uri).await {
Ok(resp) => {
// The address is good so we put it back at the end.
provider_addresses.neighbors.push_back(addr);
self.cache.insert(cid.to_string(), provider_addresses);
return Ok(resp);
Ok(resp) => return Ok(resp),
Err(e) => {
providers.neighbors.remove(addr);
error!("Error querying the node provider: {endpoint:?} {e:?}")
}
Err(e) => error!("Error querying the node provider: {endpoint:?} {e:?}"),
};
}

if !provider_addresses.outsiders.is_empty() {
if !providers.outsiders.is_empty() {
debug!(
"Failed to get content from neighbors so falling back to {:?}",
provider_addresses.outsiders
providers.outsiders
);
}

while let Some(addr) = provider_addresses.outsiders.pop_front() {
while let Some(addr) = providers.outsiders.next() {
let endpoint = format!("{addr}/ursa/v0/{cid}");
let uri = match endpoint.parse::<Uri>() {
Ok(uri) => uri,
Expand All @@ -279,13 +277,11 @@ impl Resolver {
}
};
match self.client.get(uri).await {
Ok(resp) => {
// The address is good so we put it back at the end.
provider_addresses.outsiders.push_back(addr);
self.cache.insert(cid.to_string(), provider_addresses);
return Ok(resp);
Ok(resp) => return Ok(resp),
Err(e) => {
providers.outsiders.remove(addr);
error!("Error querying the node provider: {endpoint:?} {e:?}")
}
Err(e) => error!("Error querying the node provider: {endpoint:?} {e:?}"),
};
}

Expand All @@ -295,20 +291,16 @@ impl Resolver {
}
}

#[derive(Clone)]
pub struct Providers {
neighbors: Queue<String>,
outsiders: Queue<String>,
}

pub enum GatewayLocation {
Private,
Public(Location),
}

#[derive(Clone)]
pub struct Addresses {
// Nodes within MAX_DISTANCE radius.
neighbors: VecDeque<String>,
// Nodes outside MAX_DISTANCE radius.
outsiders: VecDeque<String>,
}

fn get_location(city: City) -> Result<Location, Error> {
let location = city.location.ok_or_else(|| anyhow!("missing location"))?;
let latitude = location
Expand Down
65 changes: 65 additions & 0 deletions crates/ursa-gateway/src/resolver/round_robin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::{
collections::{HashSet, VecDeque},
sync::{Arc, RwLock, TryLockError},
time::Duration,
};
use tokio::time::Instant;

const TIME_SHARING_DURATION: Duration = Duration::from_millis(500);

#[derive(Debug)]
pub struct Queue<T> {
inner: Arc<RwLock<Inner<T>>>,
}

#[derive(Debug)]
struct Inner<T> {
addresses: VecDeque<T>,
stamp: Instant,
}

impl<T> Queue<T>
where
T: Clone + PartialEq,
{
pub fn new(addresses: HashSet<T>) -> Self {
Self {
inner: Arc::new(RwLock::new(Inner {
addresses: addresses.into_iter().collect(),
stamp: Instant::now(),
})),
}
}

pub fn next(&self) -> Option<T> {
let inner = self.inner.read().unwrap();
let now = Instant::now();
if now.duration_since(inner.stamp) >= TIME_SHARING_DURATION {
drop(inner);
match self.inner.try_write() {
Ok(mut writer_guard) => {
let front = writer_guard.addresses.pop_front().unwrap();
writer_guard.addresses.push_back(front.clone());
writer_guard.stamp = now;
Some(front)
}
Err(TryLockError::WouldBlock) => {
self.inner.read().unwrap().addresses.front().cloned()
}
Err(TryLockError::Poisoned(e)) => panic!("{e}"),
}
} else {
inner.addresses.front().cloned()
}
}

pub fn remove(&self, addr: T) {
let mut inner = self.inner.write().unwrap();
inner.stamp = Instant::now();
inner.addresses.retain(|a| a != &addr);
}

pub fn is_empty(&self) -> bool {
self.inner.read().unwrap().addresses.is_empty()
}
}