From 1719f70ab013ceeed09a8ade9ee9c33c08d7defb Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 10 Mar 2023 17:49:17 -0500 Subject: [PATCH 1/9] Rotate provider list on some duration --- crates/ursa-gateway/src/resolver/mod.rs | 94 ++++++++++++++++++------- 1 file changed, 70 insertions(+), 24 deletions(-) diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index d4811dfd..c1ead677 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -19,7 +19,9 @@ use model::IndexerResponse; use moka::sync::Cache; use ordered_float::OrderedFloat; use serde_json::from_slice; -use std::{collections::VecDeque, net::IpAddr, sync::Arc}; +use std::sync::TryLockError; +use std::{collections::VecDeque, net::IpAddr, sync::Arc, sync::RwLock, time::Duration}; +use tokio::time::Instant; use tracing::{debug, error, warn}; use crate::{ @@ -29,6 +31,7 @@ use crate::{ const FLEEK_NETWORK_FILTER: &[u8] = b"FleekNetwork"; const MAX_DISTANCE: OrderedFloat = OrderedFloat(565_000f64); +const TIME_SHARING_DURATION: Duration = Duration::from_micros(500); type Client = client::Client, Body>; @@ -123,8 +126,11 @@ impl Resolver { Either::Left(address) }); Addresses { - neighbors, - outsiders, + inner: Arc::new(RwLock::new(AddressesState { + neighbors, + outsiders, + stamp: Instant::now(), + })), } } @@ -136,7 +142,7 @@ impl Resolver { anyhow!("Error parsed uri: {endpoint}") })?; - let mut provider_addresses = match self.cache.get(cid) { + let provider_addresses = match self.cache.get(cid) { None => { let body = match self .client @@ -206,9 +212,7 @@ impl Resolver { .collect(); let provider_addresses = self.provider_addresses(providers); - if provider_addresses.neighbors.is_empty() - && provider_addresses.outsiders.is_empty() - { + if provider_addresses.is_empty() { return Err(Error::Internal( "Failed to get a valid address for provider".to_string(), )); @@ -224,10 +228,10 @@ impl Resolver { debug!( "Provider addresses to query: {:?}", - provider_addresses.neighbors + provider_addresses.neighbors() ); - while let Some(addr) = provider_addresses.neighbors.pop_front() { + while let Some(addr) = provider_addresses.next() { let endpoint = format!("{addr}/ursa/v0/{cid}"); let uri = match endpoint.parse::() { Ok(uri) => uri, @@ -237,24 +241,24 @@ impl Resolver { } }; match self.client.get(uri).await { - Ok(resp) => { - // The address is good so we put it back at the end. - provider_addresses.neighbors.push_back(addr); - self.cache.insert(cid.to_string(), provider_addresses); - return Ok(resp); + Ok(resp) => return Ok(resp), + Err(e) => { + provider_addresses.remove(); + error!("Error querying the node provider: {endpoint:?} {e:?}") } - Err(e) => error!("Error querying the node provider: {endpoint:?} {e:?}"), }; } - if !provider_addresses.outsiders.is_empty() { + let outsiders = provider_addresses.outsiders(); + + if !outsiders.is_empty() { debug!( "Failed to get content from neighbors so falling back to {:?}", - provider_addresses.outsiders + outsiders ); } - while let Some(addr) = provider_addresses.outsiders.pop_front() { + for addr in outsiders { let endpoint = format!("{addr}/ursa/v0/{cid}"); let uri = match endpoint.parse::() { Ok(uri) => uri, @@ -264,12 +268,7 @@ impl Resolver { } }; match self.client.get(uri).await { - Ok(resp) => { - // The address is good so we put it back at the end. - provider_addresses.outsiders.push_back(addr); - self.cache.insert(cid.to_string(), provider_addresses); - return Ok(resp); - } + Ok(resp) => return Ok(resp), Err(e) => error!("Error querying the node provider: {endpoint:?} {e:?}"), }; } @@ -282,10 +281,57 @@ impl Resolver { #[derive(Clone)] pub struct Addresses { + inner: Arc>, +} + +pub struct AddressesState { // Nodes within MAX_DISTANCE radius. neighbors: VecDeque, // Nodes outside MAX_DISTANCE radius. outsiders: VecDeque, + // Recently viewed time stamp. + stamp: Instant, +} + +impl Addresses { + fn next(&self) -> Option { + let inner = self.inner.read().unwrap(); + let old_stamp = inner.stamp; + if old_stamp + TIME_SHARING_DURATION >= Instant::now() { + drop(inner); + match self.inner.try_write() { + Ok(mut writer_guard) => { + let front = writer_guard.neighbors.pop_front().unwrap(); + writer_guard.neighbors.push_back(front.clone()); + writer_guard.stamp = Instant::now(); + Some(front) + } + Err(TryLockError::WouldBlock) => { + self.inner.read().unwrap().neighbors.front().cloned() + } + _ => panic!(""), + } + } else { + inner.neighbors.front().cloned() + } + } + + fn neighbors(&self) -> VecDeque { + self.inner.read().unwrap().neighbors.clone() + } + + fn outsiders(&self) -> VecDeque { + self.inner.read().unwrap().outsiders.clone() + } + + fn remove(&self) { + self.inner.write().unwrap().neighbors.pop_front(); + } + + fn is_empty(&self) -> bool { + let inner = self.inner.read().unwrap(); + return inner.neighbors.is_empty() && inner.outsiders.is_empty(); + } } fn get_location(city: City) -> Result { From 98f30eec1758fbc8f496f2a52a5ed0cddc8555b4 Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 10 Mar 2023 18:06:56 -0500 Subject: [PATCH 2/9] Add logging --- crates/ursa-gateway/src/resolver/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index c1ead677..9369fbf2 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -31,7 +31,7 @@ use crate::{ const FLEEK_NETWORK_FILTER: &[u8] = b"FleekNetwork"; const MAX_DISTANCE: OrderedFloat = OrderedFloat(565_000f64); -const TIME_SHARING_DURATION: Duration = Duration::from_micros(500); +const TIME_SHARING_DURATION: Duration = Duration::from_millis(500); type Client = client::Client, Body>; @@ -299,6 +299,7 @@ impl Addresses { let old_stamp = inner.stamp; if old_stamp + TIME_SHARING_DURATION >= Instant::now() { drop(inner); + debug!("Time is up! Attempting to grab write lock to rotate"); match self.inner.try_write() { Ok(mut writer_guard) => { let front = writer_guard.neighbors.pop_front().unwrap(); @@ -324,8 +325,10 @@ impl Addresses { self.inner.read().unwrap().outsiders.clone() } - fn remove(&self) { - self.inner.write().unwrap().neighbors.pop_front(); + fn remove(&self) -> Option { + let mut inner = self.inner.write().unwrap(); + inner.stamp = Instant::now(); + inner.neighbors.pop_front() } fn is_empty(&self) -> bool { From 192ec6ed24f3aaafd595ac3f91b2f3baf32111f0 Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 10 Mar 2023 18:25:40 -0500 Subject: [PATCH 3/9] Remove bad provider addresses --- crates/ursa-gateway/src/resolver/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index 9369fbf2..c56dff96 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -243,7 +243,7 @@ impl Resolver { match self.client.get(uri).await { Ok(resp) => return Ok(resp), Err(e) => { - provider_addresses.remove(); + provider_addresses.remove(addr); error!("Error querying the node provider: {endpoint:?} {e:?}") } }; @@ -296,8 +296,7 @@ pub struct AddressesState { impl Addresses { fn next(&self) -> Option { let inner = self.inner.read().unwrap(); - let old_stamp = inner.stamp; - if old_stamp + TIME_SHARING_DURATION >= Instant::now() { + if inner.stamp + TIME_SHARING_DURATION >= Instant::now() { drop(inner); debug!("Time is up! Attempting to grab write lock to rotate"); match self.inner.try_write() { @@ -325,10 +324,10 @@ impl Addresses { self.inner.read().unwrap().outsiders.clone() } - fn remove(&self) -> Option { + fn remove(&self, addr: String) { let mut inner = self.inner.write().unwrap(); inner.stamp = Instant::now(); - inner.neighbors.pop_front() + inner.neighbors.retain(|a| a != &addr); } fn is_empty(&self) -> bool { From 7d58caf69faf99bd1e2bb88df6f8b48bc7838848 Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 10 Mar 2023 18:31:08 -0500 Subject: [PATCH 4/9] Do not recompute time --- crates/ursa-gateway/src/resolver/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index c56dff96..e2ea7854 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -296,14 +296,14 @@ pub struct AddressesState { impl Addresses { fn next(&self) -> Option { let inner = self.inner.read().unwrap(); - if inner.stamp + TIME_SHARING_DURATION >= Instant::now() { + let now = Instant::now(); + if now.duration_since(inner.stamp) >= TIME_SHARING_DURATION { drop(inner); - debug!("Time is up! Attempting to grab write lock to rotate"); match self.inner.try_write() { Ok(mut writer_guard) => { let front = writer_guard.neighbors.pop_front().unwrap(); writer_guard.neighbors.push_back(front.clone()); - writer_guard.stamp = Instant::now(); + writer_guard.stamp = now; Some(front) } Err(TryLockError::WouldBlock) => { From bb0807ab5a8545d59992bff2aabc0138a0b21c7b Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 10 Mar 2023 19:36:28 -0500 Subject: [PATCH 5/9] Derive debug --- crates/ursa-gateway/src/resolver/mod.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index e2ea7854..3e54b4b6 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -228,7 +228,7 @@ impl Resolver { debug!( "Provider addresses to query: {:?}", - provider_addresses.neighbors() + provider_addresses ); while let Some(addr) = provider_addresses.next() { @@ -279,11 +279,12 @@ impl Resolver { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Addresses { inner: Arc>, } +#[derive(Debug)] pub struct AddressesState { // Nodes within MAX_DISTANCE radius. neighbors: VecDeque, @@ -309,17 +310,13 @@ impl Addresses { Err(TryLockError::WouldBlock) => { self.inner.read().unwrap().neighbors.front().cloned() } - _ => panic!(""), + Err(TryLockError::Poisoned(e)) => panic!("{e}"), } } else { inner.neighbors.front().cloned() } } - fn neighbors(&self) -> VecDeque { - self.inner.read().unwrap().neighbors.clone() - } - fn outsiders(&self) -> VecDeque { self.inner.read().unwrap().outsiders.clone() } From b6f60cdd48b257eae99c35a3ba78cf730adbe704 Mon Sep 17 00:00:00 2001 From: Miguel Date: Fri, 10 Mar 2023 19:54:07 -0500 Subject: [PATCH 6/9] Run fmt --- crates/ursa-gateway/src/resolver/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index 3e54b4b6..ae5a1455 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -226,10 +226,7 @@ impl Resolver { Some(provider_addresses) => provider_addresses, }; - debug!( - "Provider addresses to query: {:?}", - provider_addresses - ); + debug!("Provider addresses to query: {:?}", provider_addresses); while let Some(addr) = provider_addresses.next() { let endpoint = format!("{addr}/ursa/v0/{cid}"); From bb33b86972dc3d6274d464856df4c64ee2cde03d Mon Sep 17 00:00:00 2001 From: Miguel Date: Mon, 13 Mar 2023 18:48:04 -0400 Subject: [PATCH 7/9] Add round robin queue --- crates/ursa-gateway/src/resolver/mod.rs | 119 ++++++------------ .../ursa-gateway/src/resolver/round_robin.rs | 65 ++++++++++ 2 files changed, 101 insertions(+), 83 deletions(-) create mode 100644 crates/ursa-gateway/src/resolver/round_robin.rs diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index ae5a1455..fee6b322 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -1,4 +1,5 @@ pub mod model; +mod round_robin; use anyhow::{anyhow, Context}; use axum::{ @@ -19,26 +20,26 @@ use model::IndexerResponse; use moka::sync::Cache; use ordered_float::OrderedFloat; use serde_json::from_slice; -use std::sync::TryLockError; -use std::{collections::VecDeque, net::IpAddr, sync::Arc, sync::RwLock, time::Duration}; -use tokio::time::Instant; +use std::{net::IpAddr, sync::Arc}; use tracing::{debug, error, warn}; use crate::{ - resolver::model::{Metadata, ProviderResult}, + resolver::{ + model::{Metadata, ProviderResult}, + round_robin::Queue, + }, util::error::Error, }; const FLEEK_NETWORK_FILTER: &[u8] = b"FleekNetwork"; const MAX_DISTANCE: OrderedFloat = OrderedFloat(565_000f64); -const TIME_SHARING_DURATION: Duration = Duration::from_millis(500); type Client = client::Client, Body>; pub struct Resolver { indexer_cid_url: String, client: Client, - cache: Cache, + cache: Cache>, maxminddb: Arc>>, location: Location, } @@ -47,7 +48,7 @@ impl Resolver { pub fn new( indexer_cid_url: String, client: Client, - cache: Cache, + cache: Cache>, maxminddb: Arc>>, addr: IpAddr, ) -> Result { @@ -64,8 +65,9 @@ impl Resolver { }) } - /// Returns a set of provider address sorted by their distance relative to the gateway. - fn provider_addresses(&self, providers: Vec<&ProviderResult>) -> Addresses { + /// Partitions the providers into those that are within a MAX_DISTANCE distance + /// from the gateway and those that are outside of that distance. + fn partition_providers(&self, providers: Vec<&ProviderResult>) -> Providers { let (neighbors, outsiders) = providers .into_iter() .flat_map(|provider_result| &provider_result.provider.addrs) @@ -125,12 +127,9 @@ impl Resolver { } Either::Left(address) }); - Addresses { - inner: Arc::new(RwLock::new(AddressesState { - neighbors, - outsiders, - stamp: Instant::now(), - })), + Providers { + neighbors: Queue::new(neighbors), + outsiders: Queue::new(outsiders), } } @@ -211,24 +210,26 @@ impl Resolver { }) .collect(); - let provider_addresses = self.provider_addresses(providers); - if provider_addresses.is_empty() { + let providers = self.partition_providers(providers); + if providers.neighbors.is_empty() && providers.outsiders.is_empty() { return Err(Error::Internal( "Failed to get a valid address for provider".to_string(), )); } + let providers = Arc::new(providers); + self.cache.insert(cid.to_string(), providers.clone()); - self.cache - .insert(cid.to_string(), provider_addresses.clone()); - - provider_addresses + providers } - Some(provider_addresses) => provider_addresses, + Some(providers) => providers, }; - debug!("Provider addresses to query: {:?}", provider_addresses); + debug!( + "Provider addresses to query: {:?}", + provider_addresses.neighbors + ); - while let Some(addr) = provider_addresses.next() { + while let Some(addr) = provider_addresses.neighbors.next() { let endpoint = format!("{addr}/ursa/v0/{cid}"); let uri = match endpoint.parse::() { Ok(uri) => uri, @@ -240,22 +241,20 @@ impl Resolver { match self.client.get(uri).await { Ok(resp) => return Ok(resp), Err(e) => { - provider_addresses.remove(addr); + provider_addresses.neighbors.remove(addr); error!("Error querying the node provider: {endpoint:?} {e:?}") } }; } - let outsiders = provider_addresses.outsiders(); - - if !outsiders.is_empty() { + if !provider_addresses.outsiders.is_empty() { debug!( "Failed to get content from neighbors so falling back to {:?}", - outsiders + provider_addresses.outsiders ); } - for addr in outsiders { + while let Some(addr) = provider_addresses.outsiders.next() { let endpoint = format!("{addr}/ursa/v0/{cid}"); let uri = match endpoint.parse::() { Ok(uri) => uri, @@ -266,7 +265,10 @@ impl Resolver { }; match self.client.get(uri).await { Ok(resp) => return Ok(resp), - Err(e) => error!("Error querying the node provider: {endpoint:?} {e:?}"), + Err(e) => { + provider_addresses.outsiders.remove(addr); + error!("Error querying the node provider: {endpoint:?} {e:?}") + } }; } @@ -276,58 +278,9 @@ impl Resolver { } } -#[derive(Clone, Debug)] -pub struct Addresses { - inner: Arc>, -} - -#[derive(Debug)] -pub struct AddressesState { - // Nodes within MAX_DISTANCE radius. - neighbors: VecDeque, - // Nodes outside MAX_DISTANCE radius. - outsiders: VecDeque, - // Recently viewed time stamp. - stamp: Instant, -} - -impl Addresses { - fn next(&self) -> Option { - let inner = self.inner.read().unwrap(); - let now = Instant::now(); - if now.duration_since(inner.stamp) >= TIME_SHARING_DURATION { - drop(inner); - match self.inner.try_write() { - Ok(mut writer_guard) => { - let front = writer_guard.neighbors.pop_front().unwrap(); - writer_guard.neighbors.push_back(front.clone()); - writer_guard.stamp = now; - Some(front) - } - Err(TryLockError::WouldBlock) => { - self.inner.read().unwrap().neighbors.front().cloned() - } - Err(TryLockError::Poisoned(e)) => panic!("{e}"), - } - } else { - inner.neighbors.front().cloned() - } - } - - fn outsiders(&self) -> VecDeque { - self.inner.read().unwrap().outsiders.clone() - } - - fn remove(&self, addr: String) { - let mut inner = self.inner.write().unwrap(); - inner.stamp = Instant::now(); - inner.neighbors.retain(|a| a != &addr); - } - - fn is_empty(&self) -> bool { - let inner = self.inner.read().unwrap(); - return inner.neighbors.is_empty() && inner.outsiders.is_empty(); - } +pub struct Providers { + neighbors: Queue, + outsiders: Queue, } fn get_location(city: City) -> Result { diff --git a/crates/ursa-gateway/src/resolver/round_robin.rs b/crates/ursa-gateway/src/resolver/round_robin.rs new file mode 100644 index 00000000..ec3b73d0 --- /dev/null +++ b/crates/ursa-gateway/src/resolver/round_robin.rs @@ -0,0 +1,65 @@ +use std::{ + collections::{HashSet, VecDeque}, + sync::{Arc, RwLock, TryLockError}, + time::Duration, +}; +use tokio::time::Instant; + +const TIME_SHARING_DURATION: Duration = Duration::from_millis(500); + +#[derive(Debug)] +pub struct Queue { + inner: Arc>>, +} + +#[derive(Debug)] +struct Inner { + addresses: VecDeque, + stamp: Instant, +} + +impl Queue +where + T: Clone + PartialEq, +{ + pub fn new(addresses: HashSet) -> Self { + Self { + inner: Arc::new(RwLock::new(Inner { + addresses: addresses.into_iter().collect(), + stamp: Instant::now(), + })), + } + } + + pub fn next(&self) -> Option { + let inner = self.inner.read().unwrap(); + let now = Instant::now(); + if now.duration_since(inner.stamp) >= TIME_SHARING_DURATION { + drop(inner); + match self.inner.try_write() { + Ok(mut writer_guard) => { + let front = writer_guard.addresses.pop_front().unwrap(); + writer_guard.addresses.push_back(front.clone()); + writer_guard.stamp = now; + Some(front) + } + Err(TryLockError::WouldBlock) => { + self.inner.read().unwrap().addresses.front().cloned() + } + Err(TryLockError::Poisoned(e)) => panic!("{e}"), + } + } else { + inner.addresses.front().cloned() + } + } + + pub fn remove(&self, addr: T) { + let mut inner = self.inner.write().unwrap(); + inner.stamp = Instant::now(); + inner.addresses.retain(|a| a != &addr); + } + + pub fn is_empty(&self) -> bool { + self.inner.read().unwrap().addresses.is_empty() + } +} From 92a5f94d92f4d9f416e88b0680e6aa78ae7612ce Mon Sep 17 00:00:00 2001 From: Miguel Date: Mon, 13 Mar 2023 18:54:48 -0400 Subject: [PATCH 8/9] Update comment --- crates/ursa-gateway/src/resolver/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index 6ec76c7f..662ae161 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -70,8 +70,9 @@ impl Resolver { }) } - /// Partitions the providers into those that are within a MAX_DISTANCE distance - /// from the gateway and those that are outside of that distance. + /// Partitions the providers into a set containing providers that are within + /// MAX_DISTANCE distance from the gateway and another set of providers that + /// are outside of that distance. fn partition_providers(&self, providers: Vec<&ProviderResult>) -> Providers { let (neighbors, outsiders) = providers .into_iter() From 052daad62fdb7f9ef2076f7424033917c7606d71 Mon Sep 17 00:00:00 2001 From: Miguel Date: Mon, 13 Mar 2023 23:07:09 -0400 Subject: [PATCH 9/9] Naming consistency --- crates/ursa-gateway/src/resolver/mod.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/crates/ursa-gateway/src/resolver/mod.rs b/crates/ursa-gateway/src/resolver/mod.rs index 662ae161..a4f32619 100644 --- a/crates/ursa-gateway/src/resolver/mod.rs +++ b/crates/ursa-gateway/src/resolver/mod.rs @@ -157,7 +157,7 @@ impl Resolver { anyhow!("Error parsed uri: {endpoint}") })?; - let provider_addresses = match self.cache.get(cid) { + let providers = match self.cache.get(cid) { None => { let body = match self .client @@ -240,12 +240,9 @@ impl Resolver { Some(providers) => providers, }; - debug!( - "Provider addresses to query: {:?}", - provider_addresses.neighbors - ); + debug!("Provider addresses to query: {:?}", providers.neighbors); - while let Some(addr) = provider_addresses.neighbors.next() { + while let Some(addr) = providers.neighbors.next() { let endpoint = format!("{addr}/ursa/v0/{cid}"); let uri = match endpoint.parse::() { Ok(uri) => uri, @@ -257,20 +254,20 @@ impl Resolver { match self.client.get(uri).await { Ok(resp) => return Ok(resp), Err(e) => { - provider_addresses.neighbors.remove(addr); + providers.neighbors.remove(addr); error!("Error querying the node provider: {endpoint:?} {e:?}") } }; } - if !provider_addresses.outsiders.is_empty() { + if !providers.outsiders.is_empty() { debug!( "Failed to get content from neighbors so falling back to {:?}", - provider_addresses.outsiders + providers.outsiders ); } - while let Some(addr) = provider_addresses.outsiders.next() { + while let Some(addr) = providers.outsiders.next() { let endpoint = format!("{addr}/ursa/v0/{cid}"); let uri = match endpoint.parse::() { Ok(uri) => uri, @@ -282,7 +279,7 @@ impl Resolver { match self.client.get(uri).await { Ok(resp) => return Ok(resp), Err(e) => { - provider_addresses.outsiders.remove(addr); + providers.outsiders.remove(addr); error!("Error querying the node provider: {endpoint:?} {e:?}") } };