Skip to content

Commit

Permalink
Implement Phoenix Network Coordinates (#854)
Browse files Browse the repository at this point in the history
* Use bounded channels to prevent backpressure

* Add datacenter discovery

This commit adds the concept of "datacenter" to our xDS protocol. Unlike
other resources, there's slightly different behaviour between proxies
to relays and relays to agents. When agent queries for a datacenter from
the agent, it returns its own QCMP port and ICAO location. The host then
adds the address of remote agent to identify the datacenter. The proxy
then queries for datacenters, it gets all of agents addresses, ports,
and ICAO codes.

This will be used with a future commit to measure the distance between
proxies and agents.

* Implement Phoenix Network Coordinates

This commit uses the previous commit of datacentre discovery with an
implementation of Phoenix Network Coordinates[1], to build a
decentralised network coordinate map of all the datacentres that the
proxies know about, using that information, we expose a new HTTP server
at the same QCMP port that a client can use to query what datacentres
are the closest to the proxy.

For identifying datacentres we use ICAO codes, this allows us to
colnsildate multiple logical datacentres into a single location, for
example if you have multiple zones allocated in the same physical
datacentre, in order to reduce the amount of options a client has to
pick from in that case they're consolidated under their ICAO code.

This allows for clients to get their full latency to datacentres which
allows operators to a allocate datacentres dynamically, and in the
closest location available to players.

[1]: https://www.researchgate.net/publication/221198962_Phoenix_Towards_an_Accurate_Practical_and_Decentralized_Network_Coordinate_System

* Spawn phoenix task on load balancers

* add update watcher to phoenix

* Add shutdown handler

* formatting

* Slight adjustments

* ubounded

* Fix compilation

* Oops

* Ugly, but it works now

* Increase timeout

* Add documentation

---------

Co-authored-by: Jake Shadle <[email protected]>
  • Loading branch information
XAMPPRocky and Jake-Shadle authored Feb 29, 2024
1 parent 80741fb commit d22a444
Show file tree
Hide file tree
Showing 20 changed files with 1,512 additions and 137 deletions.
38 changes: 22 additions & 16 deletions benches/cluster_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,15 @@ mod serde {
for cluster in cm.iter() {
resources.push(
resource_type
.encode_to_any(&Cluster::try_from((cluster.key(), cluster.value())).unwrap())
.encode_to_any(&Cluster {
locality: cluster.key().clone().map(|l| l.try_into().unwrap()),
endpoints: cluster
.endpoints
.iter()
.map(TryFrom::try_from)
.collect::<Result<_, _>>()
.unwrap(),
})
.unwrap(),
);
}
Expand All @@ -246,7 +254,7 @@ mod serde {
let quilkin::net::xds::Resource::Cluster(cluster) = c else {
unreachable!()
};
cm.merge(
cm.insert(
cluster.locality.map(From::from),
cluster
.endpoints
Expand Down Expand Up @@ -313,20 +321,18 @@ mod ops {
let mut total_endpoints = 0;

for kv in gc.cm.iter() {
for _ep in kv.value() {
total_endpoints += 1;
}
total_endpoints += kv.endpoints.len();
}

assert_eq!(total_endpoints, gc.total_endpoints);
total_endpoints
}

#[allow(clippy::eq_op)]
fn is_equal(gc: &GenCluster) -> usize {
assert_eq!(gc.cm, gc.cm);
gc.total_endpoints
}
// #[allow(clippy::eq_op)]
// fn is_equal(gc: &GenCluster) -> usize {
// assert_eq!(gc.cm, gc.cm);
// gc.total_endpoints
// }

#[divan::bench(consts = SEEDS)]
fn iterate<const S: u64>(b: Bencher) {
Expand All @@ -346,13 +352,13 @@ mod ops {
.bench(|| divan::black_box(compute_hash::<S>(&cm)))
}

#[divan::bench(consts = SEEDS)]
fn partial_eq<const S: u64>(b: Bencher) {
let cm = gen_cluster_map::<S>();
// #[divan::bench(consts = SEEDS)]
// fn partial_eq<const S: u64>(b: Bencher) {
// let cm = gen_cluster_map::<S>();

b.counter(cm.total_endpoints)
.bench(|| divan::black_box(is_equal(&cm)))
}
// b.counter(cm.total_endpoints)
// .bench(|| divan::black_box(is_equal(&cm)))
// }
}

fn main() {
Expand Down
5 changes: 4 additions & 1 deletion benches/compression.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod shared;

use divan::Bencher;
use quilkin::{filters::compress::*, pool::*};
use quilkin::{
filters::compress::{Compressor, Mode},
pool::BufferPool,
};
use shared::*;
use std::sync::Arc;

Expand Down
40 changes: 36 additions & 4 deletions docs/src/services/proxy/qcmp.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

| services | ports | Protocol |
|----------|-------|-----------|
| QCMP | 7600 | UDP (IPv4 OR IPv6) |
| QCMP | 7600 | UDP AND TCP (IPv4 OR IPv6) |

In addition to the TCP based administration API, Quilkin provides a meta API
over UDP. The purpose of this API is to provide meta operations that can be
used by untrusted clients. Currently the API is focuses on providing pings for
latency measurement but that may change overtime as new features are added.
over UDP and TCP. The purpose of this API is to provide meta operations that can
be used by untrusted clients. Currently the API is focuses on providing pings
for latency measurement but that may change overtime as new features are added.

## Ping
The main functionality currently in QCMP is pinging, measuring the latency from
Expand Down Expand Up @@ -70,3 +70,35 @@ types:
- id: server_sent_timestamp
type: u8
```
## Datacenter Latency
In addition to being able to ping Quilkin to get the latency between the client
and proxy. In order to allow clients to send information to services like a
matchmaker about which datacentre they are closest to, Quilkin also includes
the ability to get a proxy's latency to each of its connected datacentres.
> Note: This requires a multi-cluster relay setup, as when you set up proxies
in the same cluster as gameservers, this measurement is redundant.
All that is required to set this up is to provide an ICAO code to the agent in
the gameserver cluster. (E.g. through the environment variable `ICAO_CODE`).
No further setup is required. **You can use duplicate ICAO codes**, Quilkin will
choose the best result amongst the duplicates to return. Quilkin assumes that
multiple of the same ICAO code refer to the same phyiscal datacentre, so latency
between them should negible.

> Why ICAO? ICAO is an international standard for airport codes, airport codes
are an easy human readable code that makes it easy to use geo-visualisations
in tools like Grafana, and easily allows grouping. IATA codes only cover
major airports, ICAO codes cover practically every airport making them easy to
more accurately represent the location of any datacentre.


### API And Schema

Currently the datacentre latency can be retrieved by sending a `GET /` HTTP
request to the QCMP port.

The returned data is a JSON object with each key being the ICAO code for the
datacentre, and the value being the latency in nanoseconds.
6 changes: 6 additions & 0 deletions proto/quilkin/config/v1alpha1/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ message Endpoint {
uint32 port = 2;
google.protobuf.Struct metadata = 3;
}

message Datacenter {
string host = 1;
uint32 qcmp_port = 2;
string icao_code = 3;
}
122 changes: 119 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,118 @@ mod tests {
test::{create_socket, AddressType, TestConfig, TestHelper},
};

#[tokio::test]
async fn datacenter_discovery() {
let relay_xds_port = crate::test::available_addr(&AddressType::Random)
.await
.port();
let relay_mds_port = crate::test::available_addr(&AddressType::Random)
.await
.port();
let relay_config = Arc::new(Config::default());
let relay = Relay {
xds_port: relay_xds_port,
mds_port: relay_mds_port,
..<_>::default()
};

let agent_file = tempfile::NamedTempFile::new().unwrap();
let config = Config::default();

std::fs::write(agent_file.path(), serde_yaml::to_string(&config).unwrap()).unwrap();

let agent_qcmp_port = crate::test::available_addr(&AddressType::Random)
.await
.port();

let icao_code: crate::config::IcaoCode = "EIDW".parse().unwrap();

let agent_config = Arc::new(Config::default());
let agent = Agent {
relay: vec![format!("http://localhost:{relay_mds_port}")
.parse()
.unwrap()],
region: None,
sub_zone: None,
zone: None,
idle_request_interval_secs: admin::idle_request_interval_secs(),
qcmp_port: agent_qcmp_port,
icao_code: icao_code.clone(),
provider: Some(Providers::File {
path: agent_file.path().to_path_buf(),
}),
};

let proxy_config = Arc::new(Config::default());
let proxy = Proxy {
management_server: vec![format!("http://localhost:{relay_xds_port}")
.parse()
.unwrap()],
..<_>::default()
};

let (_tx, shutdown_rx) = crate::make_shutdown_channel(Default::default());
tokio::spawn({
let config = relay_config.clone();
let shutdown_rx = shutdown_rx.clone();
async move {
relay
.relay(config, Admin::Relay(<_>::default()), shutdown_rx)
.await
}
});
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
tokio::spawn({
let config = agent_config.clone();
let shutdown_rx = shutdown_rx.clone();
async move {
agent
.run(config, Admin::Agent(<_>::default()), shutdown_rx)
.await
}
});
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
let (tx, proxy_init) = tokio::sync::oneshot::channel();
tokio::spawn({
let config = proxy_config.clone();
let shutdown_rx = shutdown_rx.clone();
async move {
proxy
.run(config, Admin::Proxy(<_>::default()), Some(tx), shutdown_rx)
.await
}
});
proxy_init.await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;

let datacenter = crate::config::Datacenter {
qcmp_port: agent_qcmp_port,
icao_code,
};

assert!(agent_config.datacenters.read().is_empty());
assert!(!relay_config.datacenters.read().is_empty());
assert!(!proxy_config.datacenters.read().is_empty());
#[track_caller]
fn assert_config(config: &Config, datacenter: &crate::config::Datacenter) {
let dcs = config.datacenters.read();
let ipv4_dc = dcs.get(&std::net::Ipv4Addr::LOCALHOST.into());
let ipv6_dc = dcs.get(&std::net::Ipv6Addr::LOCALHOST.into());

match (ipv4_dc, ipv6_dc) {
(Some(dc), None) => assert_eq!(&*dc, datacenter),
(None, Some(dc)) => assert_eq!(&*dc, datacenter),
(Some(dc1), Some(dc2)) => {
assert_eq!(&*dc1, datacenter);
assert_eq!(&*dc2, datacenter);
}
(None, None) => panic!("No datacenter found"),
};
}
assert_config(&relay_config, &datacenter);
assert_config(&proxy_config, &datacenter);
}

#[tokio::test]
async fn relay_routing() {
let mut t = TestHelper::default();
Expand Down Expand Up @@ -348,6 +460,7 @@ mod tests {
provider: Some(Providers::File {
path: endpoints_file.path().to_path_buf(),
}),
..<_>::default()
}),
log_format: LogFormats::default(),
};
Expand All @@ -368,14 +481,16 @@ mod tests {
};

tokio::spawn(relay.drive(None));
tokio::time::sleep(Duration::from_millis(150)).await;
tokio::spawn(control_plane.drive(None));
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::time::sleep(Duration::from_millis(150)).await;

let (tx, proxy_init) = tokio::sync::oneshot::channel();

tokio::spawn(proxy.drive(Some(tx)));

proxy_init.await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;

let socket = create_socket().await;
let config = TestConfig::default();
Expand All @@ -384,6 +499,7 @@ mod tests {
let server_port = server_socket.local_addr().unwrap().port();
for _ in 0..5 {
let token = Token::new();
tokio::time::sleep(Duration::from_millis(50)).await;

{
tracing::info!(%token, "writing new config");
Expand All @@ -399,15 +515,15 @@ mod tests {
config.write_to_file(endpoints_file.path());
}

tokio::time::sleep(Duration::from_millis(80)).await;
tokio::time::sleep(Duration::from_millis(280)).await;
let mut msg = b"hello".to_vec();
msg.extend_from_slice(&token.inner);
tracing::info!(%token, "sending packet");
socket.send_to(&msg, &proxy_address).await.unwrap();

assert_eq!(
"hello",
timeout(Duration::from_millis(100), rx.recv())
timeout(Duration::from_millis(1000), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand Down
5 changes: 5 additions & 0 deletions src/cli/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ impl Admin {
}
}

#[must_use]
pub fn is_agent(&self) -> bool {
matches!(self, Self::Agent(_))
}

#[track_caller]
pub fn unwrap_proxy(&self) -> &proxy::RuntimeConfig {
match self {
Expand Down
9 changes: 8 additions & 1 deletion src/cli/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ pub struct Agent {
/// request from a relay server before restarting the connection.
#[clap(long, env = "QUILKIN_IDLE_REQUEST_INTERVAL_SECS", default_value_t = super::admin::idle_request_interval_secs())]
pub idle_request_interval_secs: u64,
/// The ICAO code for the agent.
#[clap(short, long, env, default_value_t = crate::config::IcaoCode::default())]
pub icao_code: crate::config::IcaoCode,
}

impl Default for Agent {
Expand All @@ -67,6 +70,7 @@ impl Default for Agent {
sub_zone: <_>::default(),
provider: <_>::default(),
idle_request_interval_secs: super::admin::idle_request_interval_secs(),
icao_code: <_>::default(),
}
}
}
Expand All @@ -87,6 +91,9 @@ impl Agent {
)
});

config.qcmp_port.store(self.qcmp_port.into());
config.icao_code.store(self.icao_code.clone().into());

let runtime_config = mode.unwrap_agent();

let _mds_task = if !self.relay.is_empty() {
Expand Down Expand Up @@ -130,7 +137,7 @@ impl Agent {
None
};

crate::codec::qcmp::spawn(self.qcmp_port).await?;
crate::codec::qcmp::spawn(self.qcmp_port, shutdown_rx.clone());
shutdown_rx.changed().await.map_err(From::from)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cli/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Manage {
None
};

let server_task = tokio::spawn(crate::net::xds::server::spawn(self.port, config))
let server_task = tokio::spawn(crate::net::xds::server::spawn(self.port, mode, config))
.map_err(From::from)
.and_then(std::future::ready);

Expand Down
Loading

0 comments on commit d22a444

Please sign in to comment.