Skip to content

Commit

Permalink
Implement load-balancing for outgoing requests
Browse files Browse the repository at this point in the history
  • Loading branch information
NickAcPT committed Nov 2, 2024
1 parent e621000 commit e621000
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 18 deletions.
4 changes: 2 additions & 2 deletions nmsr-aas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ serde_json = { workspace = true }
base64 = "0.22"

# Hyper - HTTP client
hyper = { version = "1.4", features = ["client"] }
hyper = { version = "1.5", features = ["client"] }
hyper-util = { version = "0.1", features = [
"client",
"client-legacy",
Expand Down Expand Up @@ -91,7 +91,7 @@ thiserror = { workspace = true }
derive_more = { workspace = true }

# Tower - Service framework
tower = { version = "0.5", features = ["buffer", "limit", "timeout", "retry"] }
tower = { version = "0.5", features = ["buffer", "limit", "timeout", "retry", "balance"] }
tower-http = { version = "0.6", features = [
"set-header",
"trace",
Expand Down
2 changes: 1 addition & 1 deletion nmsr-aas/src/model/armor/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl VanillaMinecraftArmorManager {
.explain("Unable to create armor cache folder".to_string())?;

let manager = Self {
client: NmsrHttpClient::new(20, 5 * 60 /* 5 minutes */, 5),
client: NmsrHttpClient::new(20, 5 * 60 /* 5 minutes */, 5, &[]),
material_location,
trims_location,
};
Expand Down
2 changes: 2 additions & 0 deletions nmsr-aas/src/model/resolver/mojang/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ impl MojangClient {
mojank.session_server_rate_limit,
mojank.session_server_timeout,
mojank.session_server_retries,
&mojank.outgoing_addresses
),
name_lookup_client: NmsrHttpClient::new(
mojank
.username_resolve_rate_limit
.unwrap_or(mojank.session_server_rate_limit),
mojank.session_server_timeout,
mojank.session_server_retries,
&mojank.outgoing_addresses
),
mojank_config: mojank,
})
Expand Down
11 changes: 7 additions & 4 deletions nmsr-aas/src/utils/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::{
collections::HashMap,
fs::Metadata,
path::PathBuf,
time::{Duration, SystemTime},
collections::HashMap, fs::Metadata, net::IpAddr, path::PathBuf, time::{Duration, SystemTime}
};

use chrono::{DateTime, Local};
Expand Down Expand Up @@ -69,6 +66,7 @@ impl Default for ModelCacheConfiguration {
}
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct MojankConfiguration {
Expand Down Expand Up @@ -118,6 +116,9 @@ pub struct MojankConfiguration {

/// The template to use for resolving player cape textures.
pub textures_server_cape_url_template: String,

/// The outgoing addresses for load-balancing requests to Mojang servers.
pub outgoing_addresses: Vec<IpAddr>,
}

pub const DEFAULT_TEXTURES_SERVER_SKIN_URL_TEMPLATE: &str =
Expand Down Expand Up @@ -146,6 +147,8 @@ impl Default for MojankConfiguration {
.to_string(),
textures_server_cape_url_template: DEFAULT_TEXTURES_SERVER_SKIN_URL_TEMPLATE
.to_string(),

outgoing_addresses: Vec::new(),
}
}
}
Expand Down
134 changes: 123 additions & 11 deletions nmsr-aas/src/utils/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ use hyper_util::{
};
use std::{
future::{ready, Ready},
net::IpAddr,
time::Duration,
};
use tower::{
balance::p2c::Balance,
buffer::Buffer,
discover::ServiceList,
limit::RateLimit,
load::{CompleteOnResponse, PendingRequestsDiscover},
retry::{Policy, Retry},
timeout::{Timeout, TimeoutLayer},
Service, ServiceBuilder, ServiceExt,
Expand Down Expand Up @@ -51,13 +55,42 @@ pub(crate) type NmsrTraceLayer = Trace<
DefaultOnFailure,
>;

pub struct NmsrHttpClient {
inner: Buffer<Request<SyncBody>, <RateLimit<Retry<MojankRetryPolicy, Timeout<NmsrTraceLayer>>> as Service<Request<SyncBody>>>::Future>,
pub(crate) type HttpClientInnerService =
Buffer<
Request<SyncBody>,
<RateLimit<Retry<MojankRetryPolicy, Timeout<NmsrTraceLayer>>> as Service<
Request<SyncBody>,
>>::Future,
>;

pub enum NmsrHttpClient {
SingleIp {
inner: HttpClientInnerService,
},
LoadBalanced {
inner: Buffer<
Request<SyncBody>,
<Balance<
PendingRequestsDiscover<ServiceList<Vec<HttpClientInnerService>>>,
Request<SyncBody>,
> as Service<Request<SyncBody>>>::Future,
>,
},
}

impl NmsrHttpClient {
pub fn new(rate_limit_per_second: u64, request_timeout_seconds: u64, request_retries_count: usize) -> Self {
create_http_client(rate_limit_per_second, request_timeout_seconds, request_retries_count)
pub fn new(
rate_limit_per_second: u64,
request_timeout_seconds: u64,
request_retries_count: usize,
client_ips: &[IpAddr],
) -> Self {
create_http_client(
rate_limit_per_second,
request_timeout_seconds,
request_retries_count,
client_ips,
)
}

#[instrument(skip(self, parent_span, on_error), parent = parent_span, err)]
Expand All @@ -75,8 +108,20 @@ impl NmsrHttpClient {
unreachable!("Empty body should not error: {}", e)
})))?;

let response = {
let mut svc = self.inner.clone();
let response = if let NmsrHttpClient::SingleIp { inner } = self {
let mut svc = inner.clone();

let service = svc
.ready()
.await
.map_err(MojangRequestError::BoxedRequestError)?;

service
.call(request)
.await
.map_err(MojangRequestError::BoxedRequestError)?
} else if let NmsrHttpClient::LoadBalanced { inner } = self {
let mut svc = inner.clone();

let service = svc
.ready()
Expand All @@ -87,6 +132,8 @@ impl NmsrHttpClient {
.call(request)
.await
.map_err(MojangRequestError::BoxedRequestError)?
} else {
unreachable!("Invalid NmsrHttpClient variant")
};

if response.status() != StatusCode::OK {
Expand All @@ -104,17 +151,78 @@ impl NmsrHttpClient {
}
}

fn create_http_client(rate_limit_per_second: u64, request_timeout_seconds: u64, request_retries_count: usize) -> NmsrHttpClient {
fn create_http_client(
rate_limit_per_second: u64,
request_timeout_seconds: u64,
request_retries_count: usize,
client_ips: &[IpAddr],
) -> NmsrHttpClient {
if client_ips.is_empty() {
create_http_client_internal(
rate_limit_per_second,
request_timeout_seconds,
request_retries_count,
None,
)
} else if client_ips.len() == 1 {
create_http_client_internal(
rate_limit_per_second,
request_timeout_seconds,
request_retries_count,
Some(client_ips[0]),
)
} else {
let clients = client_ips
.into_iter()
.map(|ip| {
create_http_client_internal(
rate_limit_per_second,
request_timeout_seconds,
request_retries_count,
Some(*ip),
)
})
.flat_map(|svc| {
if let NmsrHttpClient::SingleIp { inner } = svc {
Some(inner)
} else {
None
}
})
.collect::<Vec<_>>();

let discover = ServiceList::new(clients);
let load = PendingRequestsDiscover::new(discover, CompleteOnResponse::default());
let balanced = Balance::new(load);

let balanced = ServiceBuilder::new()
.buffer(rate_limit_per_second.saturating_mul(2) as usize)
.check_clone()
.service(balanced);

NmsrHttpClient::LoadBalanced { inner: balanced }
}
}

fn create_http_client_internal(
rate_limit_per_second: u64,
request_timeout_seconds: u64,
request_retries_count: usize,
client_ip: Option<IpAddr>,
) -> NmsrHttpClient {
let mut http = HttpConnector::new();
http.set_nodelay(true);
http.enforce_http(false);
http.set_local_address(client_ip);

let tls = TlsConnector::new().expect("Expected TLS connector to be valid");

let https = HttpsConnector::from((http, tls.into()));

// A new higher level client from hyper is in the works, so we gotta use the legacy one
let client = Client::builder(TokioExecutor::new()).build(https);
let client = Client::builder(TokioExecutor::new())
.http2_keep_alive_while_idle(true)
.build(https);

let tracing = TraceLayer::new_for_http().on_body_chunk(()).on_eos(());

Expand All @@ -135,11 +243,11 @@ fn create_http_client(rate_limit_per_second: u64, request_timeout_seconds: u64,
.check_clone()
.service(client);

NmsrHttpClient { inner: service }
NmsrHttpClient::SingleIp { inner: service }
}

#[derive(Copy, Clone, Debug)]
struct MojankRetryPolicy {
pub(crate) struct MojankRetryPolicy {
attempts: usize,
}

Expand All @@ -152,7 +260,11 @@ impl MojankRetryPolicy {
impl<P, Res> Policy<Request<SyncBody>, Res, P> for MojankRetryPolicy {
type Future = Ready<()>;

fn retry(&mut self, _req: &mut Request<SyncBody>, result: &mut Result<Res, P>) -> Option<Self::Future> {
fn retry(
&mut self,
_req: &mut Request<SyncBody>,
result: &mut Result<Res, P>,
) -> Option<Self::Future> {
match result {
Ok(_) => None,
Err(_) => {
Expand Down

0 comments on commit e621000

Please sign in to comment.