Skip to content

Commit

Permalink
Fix relay listener updates (#966)
Browse files Browse the repository at this point in the history
* Checkpoint

* Fixup rebase

* Fix test

* Don't unneccessarily watch for filter changes
  • Loading branch information
Jake-Shadle authored May 28, 2024
1 parent fe21f08 commit bbd0092
Show file tree
Hide file tree
Showing 17 changed files with 397 additions and 177 deletions.
67 changes: 52 additions & 15 deletions crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ pub struct ConfigFile {
}

impl ConfigFile {
pub fn update(&self, update: impl FnOnce(&TestConfig)) {
update(&self.config);
pub fn update(&mut self, update: impl FnOnce(&mut TestConfig)) {
update(&mut self.config);
self.config.write_to_file(&self.path)
}
}
Expand Down Expand Up @@ -204,6 +204,7 @@ pub struct RelayPail {
pub mds_port: u16,
pub task: JoinHandle,
pub shutdown: ShutdownTx,
pub config_file: Option<ConfigFile>,
pub config: Arc<Config>,
}

Expand All @@ -213,7 +214,7 @@ pub struct AgentPail {
pub qcmp_port: u16,
pub task: JoinHandle,
pub shutdown: ShutdownTx,
pub config_file: ConfigFile,
pub config_file: Option<ConfigFile>,
pub config: Arc<Config>,
}

Expand All @@ -226,6 +227,8 @@ pub struct ProxyPail {
pub task: JoinHandle,
pub shutdown: ShutdownTx,
pub config: Arc<Config>,
pub delta_applies:
Option<tokio::sync::mpsc::UnboundedReceiver<quilkin::net::xds::ResourceType>>,
}

abort_task!(ProxyPail);
Expand Down Expand Up @@ -313,14 +316,17 @@ impl Pail {
let mds_port = mds_listener.port();

let path = td.join(spc.name);
if let Some(cfg) = rpc.config {
cfg.write_to_file(&path);
}
let mut tc = rpc.config.unwrap_or_default();
tc.id = spc.name.into();
tc.write_to_file(&path);

let config_path = path.clone();

let (shutdown, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);

let config = Arc::new(Config::default_non_agent());
config.id.store(Arc::new(spc.name.into()));

let task = tokio::spawn(
relay::Relay {
Expand All @@ -340,6 +346,10 @@ impl Pail {
mds_port,
task,
shutdown,
config_file: Some(ConfigFile {
path: config_path,
config: tc,
}),
config,
})
}
Expand All @@ -359,8 +369,9 @@ impl Pail {
));
}

let tc = TestConfig::new();
let mut tc = TestConfig::new();
tc.clusters.insert_default(endpoints);
tc.id = spc.name.into();

let path = td.join(spc.name);
tc.write_to_file(&path);
Expand Down Expand Up @@ -389,7 +400,8 @@ impl Pail {

let config_path = path.clone();
let config = Arc::new(Config::default_agent());
let aconfig = Arc::new(Config::default_agent());
config.id.store(Arc::new(spc.name.into()));
let acfg = config.clone();

let task = tokio::spawn(async move {
components::agent::Agent {
Expand All @@ -401,7 +413,7 @@ impl Pail {
address_selector: None,
}
.run(RunArgs {
config: aconfig,
config,
ready: Default::default(),
shutdown_rx,
})
Expand All @@ -412,11 +424,11 @@ impl Pail {
qcmp_port,
task,
shutdown,
config_file: ConfigFile {
config_file: Some(ConfigFile {
path: config_path,
config: tc,
},
config,
}),
config: acfg,
})
}
PailConfig::Proxy(ppc) => {
Expand Down Expand Up @@ -481,8 +493,11 @@ impl Pail {
.modify(|clusters| clusters.insert_default(endpoints));
}

config.id.store(Arc::new(spc.name.into()));
let pconfig = config.clone();

let (rttx, rtrx) = tokio::sync::mpsc::unbounded_channel();

let task = tokio::spawn(async move {
components::proxy::Proxy {
num_workers: NonZeroUsize::new(1).unwrap(),
Expand All @@ -492,6 +507,7 @@ impl Pail {
socket,
qcmp,
phoenix,
notifier: Some(rttx),
}
.run(
RunArgs {
Expand All @@ -513,6 +529,7 @@ impl Pail {
shutdown,
task,
config,
delta_applies: Some(rtrx),
})
}
};
Expand Down Expand Up @@ -614,11 +631,20 @@ impl SandboxConfig {

impl Sandbox {
#[inline]
pub fn proxy_addr(&self) -> SocketAddr {
let Pail::Proxy(pp) = &self.pails["proxy"] else {
pub fn proxy(
&mut self,
name: &str,
) -> (
SocketAddr,
tokio::sync::mpsc::UnboundedReceiver<quilkin::net::xds::ResourceType>,
) {
let Some(Pail::Proxy(pp)) = self.pails.get_mut(name) else {
unreachable!()
};
SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, pp.port))
(
SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, pp.port)),
pp.delta_applies.take().unwrap(),
)
}

#[inline]
Expand All @@ -640,6 +666,17 @@ impl Sandbox {
)
}

#[inline]
pub fn config_file(&mut self, name: &str) -> ConfigFile {
let pail = self.pails.get_mut(name).unwrap();

match pail {
Pail::Relay(rp) => rp.config_file.take().unwrap(),
Pail::Agent(ap) => ap.config_file.take().unwrap(),
_ => unimplemented!("no config_file for this pail"),
}
}

#[inline]
pub fn socket(&self) -> (socket2::Socket, SocketAddr) {
let socket = quilkin::net::raw_socket_with_reuse(0).unwrap();
Expand Down
141 changes: 128 additions & 13 deletions crates/test/tests/mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,9 @@ trace_test!(relay_routing, {

let mut sandbox = sc.spinup().await;

let (server_port, mut server_rx) = {
let Some(Pail::Server(sp)) = sandbox.pails.get_mut("server") else {
unreachable!()
};

(sp.port, sp.packet_rx.take().unwrap())
};
let proxy_address = sandbox.proxy_addr();
let Pail::Agent(ap) = &sandbox.pails["agent"] else {
unreachable!()
};
let (mut server_rx, server_addr) = sandbox.server("server");
let (proxy_address, _) = sandbox.proxy("proxy");
let mut agent_config = sandbox.config_file("agent");

let client = sandbox.client();

Expand All @@ -105,10 +97,10 @@ trace_test!(relay_routing, {
})
.collect();

ap.config_file.update(|config| {
agent_config.update(|config| {
config.clusters.insert_default(
[Endpoint::with_metadata(
(std::net::Ipv6Addr::LOCALHOST, server_port).into(),
server_addr.into(),
quilkin::net::endpoint::Metadata { tokens },
)]
.into(),
Expand Down Expand Up @@ -205,3 +197,126 @@ trace_test!(datacenter_discovery, {
assert_config(&relay_config, &datacenter);
assert_config(&proxy_config, &datacenter);
});

trace_test!(filter_update, {
let mut sc = qt::sandbox_config!();

sc.push("server", ServerPailConfig::default(), &[]);
sc.push(
"relay",
RelayPailConfig {
config: Some(TestConfig {
filters: FilterChain::try_create([
Capture::as_filter_config(capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
strategy: filters::capture::Strategy::Suffix(capture::Suffix {
size: 0,
remove: true,
}),
})
.unwrap(),
HashedTokenRouter::as_filter_config(None).unwrap(),
])
.unwrap(),
..Default::default()
}),
},
&[],
);
sc.push(
"agent",
AgentPailConfig {
endpoints: vec![("server", &[])],
..Default::default()
},
&["server", "relay"],
);
sc.push("proxy", ProxyPailConfig::default(), &["relay"]);

let mut sandbox = sc.spinup().await;

let (mut server_rx, server_addr) = sandbox.server("server");
let (proxy_address, mut proxy_delta_rx) = sandbox.proxy("proxy");

let mut agent_config = sandbox.config_file("agent");
let mut relay_config = sandbox.config_file("relay");

let client = sandbox.client();

let mut token = b"g".to_vec();

sandbox.sleep(1000).await;
loop {
match proxy_delta_rx.try_recv() {
Ok(_rt) => {}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Err(_) => unreachable!(),
}
}

for _ in 0..100 {
relay_config.update(|config| {
config.filters = FilterChain::try_create([
Capture::as_labeled_filter_config(
capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
strategy: filters::capture::Strategy::Suffix(capture::Suffix {
size: token.len() as _,
remove: true,
}),
},
token.len().to_string(),
)
.unwrap(),
HashedTokenRouter::as_filter_config(None).unwrap(),
])
.unwrap();
});

agent_config.update(|config| {
config.clusters.insert_default(
[Endpoint::with_metadata(
server_addr.into(),
quilkin::net::endpoint::Metadata {
tokens: Some(token.clone()).into_iter().collect(),
},
)]
.into(),
);
});

let mut updates = 0x0;
while (updates & 0x11) != 0x11 {
let rt = sandbox.timeout(10000, proxy_delta_rx.recv()).await.unwrap();

match rt {
quilkin::net::xds::ResourceType::Listener => updates |= 0x1,
quilkin::net::xds::ResourceType::Cluster => updates |= 0x10,
_ => {}
}
}

let mut msg = b"hello".to_vec();
msg.extend_from_slice(&token);

tracing::info!(len = token.len(), "sending packet");
client.send_to(&msg, &proxy_address).await.unwrap();

tracing::info!(len = token.len(), "received packet");
assert_eq!(
"hello",
sandbox.timeout(10000, server_rx.recv()).await.unwrap()
);

tracing::info!(len = token.len(), "sending bad packet");
// send an invalid packet
msg.truncate(5);
msg.extend((0..token.len()).into_iter().map(|_| b'b'));
client.send_to(&msg, &proxy_address).await.unwrap();

sandbox.expect_timeout(50, server_rx.recv()).await;
tracing::info!(len = token.len(), "didn't receive bad packet");

token.push(b'g');
}
});
6 changes: 3 additions & 3 deletions crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ trace_test!(server, {
let mut server1_rx = sb.packet_rx("server1");
let mut server2_rx = sb.packet_rx("server2");

let addr = sb.proxy_addr();
let (addr, _) = sb.proxy("proxy");

tracing::trace!(%addr, "sending packet");
let msg = "hello";
Expand Down Expand Up @@ -45,7 +45,7 @@ trace_test!(client, {
let mut sb = sc.spinup().await;

let mut dest_rx = sb.packet_rx("dest");
let local_addr = sb.proxy_addr();
let (local_addr, _) = sb.proxy("proxy");
let client = sb.client();

let msg = "hello";
Expand All @@ -68,7 +68,7 @@ trace_test!(with_filter, {
);

let mut sb = sc.spinup().await;
let local_addr = sb.proxy_addr();
let (local_addr, _) = sb.proxy("proxy");
let mut rx = sb.packet_rx("server");
let client = sb.client();

Expand Down
Loading

0 comments on commit bbd0092

Please sign in to comment.