Skip to content

Commit

Permalink
tRevert "perf: Remove async channels for pushing buffers to be sent (#…
Browse files Browse the repository at this point in the history
…1043)"

This reverts commit 632ec6a.
  • Loading branch information
XAMPPRocky committed Dec 6, 2024
1 parent 8e0f703 commit cd86ef5
Show file tree
Hide file tree
Showing 14 changed files with 531 additions and 423 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
run: curl -LsSf https://get.nexte.st/latest/linux | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin
- name: Build
run: cargo build -p qt -p quilkin -p quilkin-xds --tests
- run: cargo nextest run --no-tests=pass -p qt -p quilkin -p quilkin-xds quilkin
- run: cargo nextest run -p qt -p quilkin -p quilkin-xds quilkin

build:
name: Build
Expand Down
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ quilkin-proto.workspace = true

# Crates.io
arc-swap.workspace = true
async-channel.workspace = true
async-stream.workspace = true
base64.workspace = true
base64-serde = "0.8.0"
bytes = { version = "1.8.0", features = ["serde"] }
cached.workspace = true
cfg-if = "1.0"
crossbeam-utils = { version = "0.8", optional = true }
clap = { version = "4.5.21", features = ["cargo", "derive", "env"] }
dashmap = { version = "6.1", features = ["serde"] }
Expand Down Expand Up @@ -153,6 +153,7 @@ hickory-resolver = { version = "0.24", features = [
async-trait = "0.1.83"
strum = "0.26"
strum_macros = "0.26"
cfg-if = "1.0.0"
libflate = "2.1.0"
form_urlencoded = "1.2.1"
enum_dispatch = "0.3.13"
Expand Down Expand Up @@ -193,6 +194,7 @@ edition = "2021"

[workspace.dependencies]
arc-swap = { version = "1.7.1", features = ["serde"] }
async-channel = "2.3.1"
async-stream = "0.3.6"
base64 = "0.22.1"
cached = { version = "0.54", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions crates/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ publish = false
workspace = true

[dependencies]
async-channel.workspace = true
once_cell.workspace = true
quilkin.workspace = true
rand.workspace = true
Expand Down
47 changes: 28 additions & 19 deletions crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use qt::*;
use quilkin::{components::proxy, test::TestConfig};
use quilkin::test::TestConfig;
use tracing::Instrument as _;

trace_test!(server, {
Expand Down Expand Up @@ -87,7 +87,8 @@ trace_test!(uring_receiver, {

let (mut packet_rx, endpoint) = sb.server("server");

let (error_sender, mut error_receiver) = tokio::sync::mpsc::channel::<proxy::ErrorMap>(20);
let (error_sender, mut error_receiver) =
tokio::sync::mpsc::channel::<quilkin::components::proxy::ErrorMap>(20);

tokio::task::spawn(
async move {
Expand All @@ -104,32 +105,37 @@ trace_test!(uring_receiver, {
config
.clusters
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));
let (tx, rx) = async_channel::unbounded();
let (_shutdown_tx, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);

let socket = sb.client();
let (ws, addr) = sb.socket();

let pending_sends = proxy::PendingSends::new(1).unwrap();

// we'll test a single DownstreamReceiveWorkerConfig
proxy::packet_router::DownstreamReceiveWorkerConfig {
let ready = quilkin::components::proxy::packet_router::DownstreamReceiveWorkerConfig {
worker_id: 1,
port: addr.port(),
upstream_receiver: rx.clone(),
config: config.clone(),
error_sender,
buffer_pool: quilkin::test::BUFFER_POOL.clone(),
sessions: proxy::SessionPool::new(
sessions: quilkin::components::proxy::SessionPool::new(
config,
vec![pending_sends.0.clone()],
tx,
BUFFER_POOL.clone(),
shutdown_rx.clone(),
),
}
.spawn(pending_sends)
.spawn(shutdown_rx)
.await
.expect("failed to spawn task");

// Drop the socket, otherwise it can
drop(ws);

ready.recv().unwrap();

let msg = "hello-downstream";
tracing::debug!("sending packet");
socket.send_to(msg.as_bytes(), addr).await.unwrap();
Expand All @@ -152,33 +158,36 @@ trace_test!(
.clusters
.modify(|clusters| clusters.insert_default([endpoint.into()].into()));

let pending_sends: Vec<_> = [
proxy::PendingSends::new(1).unwrap(),
proxy::PendingSends::new(1).unwrap(),
proxy::PendingSends::new(1).unwrap(),
]
.into_iter()
.collect();
let (tx, rx) = async_channel::unbounded();
let (_shutdown_tx, shutdown_rx) =
quilkin::make_shutdown_channel(quilkin::ShutdownKind::Testing);

let sessions = proxy::SessionPool::new(
let sessions = quilkin::components::proxy::SessionPool::new(
config.clone(),
pending_sends.iter().map(|ps| ps.0.clone()).collect(),
tx,
BUFFER_POOL.clone(),
shutdown_rx.clone(),
);

const WORKER_COUNT: usize = 3;

let (socket, addr) = sb.socket();
proxy::packet_router::spawn_receivers(
let workers = quilkin::components::proxy::packet_router::spawn_receivers(
config,
socket,
pending_sends,
WORKER_COUNT,
&sessions,
rx,
BUFFER_POOL.clone(),
shutdown_rx,
)
.await
.unwrap();

for wn in workers {
wn.recv().unwrap();
}

let socket = std::sync::Arc::new(sb.client());
let msg = "recv-from";

Expand Down
9 changes: 0 additions & 9 deletions src/collections/ttl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ impl<V> Value<V> {
/// Get the expiration time for this value. The returned value is the
/// number of seconds relative to some reference point (e.g UNIX_EPOCH), based
/// on the clock being used.
#[inline]
fn expiration_secs(&self) -> u64 {
self.expires_at.load(Ordering::Relaxed)
}

/// Update the value's expiration time to (now + TTL).
#[inline]
fn update_expiration(&self, ttl: Duration) {
match self.clock.compute_expiration_secs(ttl) {
Ok(new_expiration_time) => {
Expand Down Expand Up @@ -162,7 +160,6 @@ where
/// Returns the current time as the number of seconds relative to some initial
/// reference point (e.g UNIX_EPOCH), based on the clock implementation being used.
/// In tests, this will be driven by [`tokio::time`]
#[inline]
pub(crate) fn now_relative_secs(&self) -> u64 {
self.0.clock.now_relative_secs().unwrap_or_default()
}
Expand Down Expand Up @@ -240,12 +237,6 @@ where
self.0.inner.remove(&key).is_some()
}

/// Removes all entries from the map
#[inline]
pub fn clear(&self) {
self.0.inner.clear();
}

/// Returns an entry for in-place updates of the specified key-value pair.
/// Note: This acquires a write lock on the map's shard that corresponds
/// to the entry.
Expand Down
Loading

0 comments on commit cd86ef5

Please sign in to comment.