Skip to content

Commit

Permalink
Use kanal
Browse files Browse the repository at this point in the history
- Using github codes. Waiting for stable release
hatoo committed Oct 22, 2022
1 parent 91726fd commit b002666
Showing 6 changed files with 54 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Unreleased

- Use `kanal` instead of `flume` as channel library #200
- Support [Ipv6] format requested_host in --connect-to #197

# 0.5.5 (2022-09-19)
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ byte-unit = "4.0.8"
clap = {version = "3.0.0", features = ["derive"]}
crossterm = "0.25.0"
float-ord = "0.3.1"
flume = "0.10"
kanal = { git = "https://github.com/fereidani/kanal.git" }
futures = "0.3.12"
humantime = "2.0.0"
libc = "0.2.67"
@@ -61,3 +61,4 @@ get-port = "4.0.0"
http = "0.2"
lazy_static = "1.4.0"
warp = "0.3"
flume = "0.10"
61 changes: 31 additions & 30 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use futures::future::FutureExt;
use futures::StreamExt;
use kanal::AsyncSender;
use rand::prelude::*;
use std::sync::Arc;
use thiserror::Error;
@@ -542,23 +543,23 @@ fn is_too_many_open_files(res: &Result<RequestResult, ClientError>) -> bool {
/// Run n tasks by m workers
pub async fn work(
client_builder: ClientBuilder,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: AsyncSender<Result<RequestResult, ClientError>>,
n_tasks: usize,
n_workers: usize,
) {
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = Arc::new(AtomicUsize::new(0));

let futures = (0..n_workers)
.map(|_| {
.map(move |_| {
let mut w = client_builder.build();
let report_tx = report_tx.clone();
let counter = counter.clone();
tokio::spawn(async move {
while counter.fetch_add(1, Ordering::Relaxed) < n_tasks {
let res = w.work().await;
let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
report_tx.send(res).await.unwrap();
if is_cancel {
break;
}
@@ -575,12 +576,12 @@ pub async fn work(
/// n tasks by m workers limit to qps works in a second
pub async fn work_with_qps(
client_builder: ClientBuilder,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: AsyncSender<Result<RequestResult, ClientError>>,
qps: usize,
n_tasks: usize,
n_workers: usize,
) {
let (tx, rx) = flume::unbounded();
let (tx, rx) = kanal::unbounded_async();

tokio::spawn(async move {
let start = std::time::Instant::now();
@@ -589,21 +590,21 @@ pub async fn work_with_qps(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
tx.send_async(()).await.unwrap();
tx.send(()).await.unwrap();
}
// tx gone
});

let futures = (0..n_workers)
.map(|_| {
.map(move |_| {
let mut w = client_builder.build();
let report_tx = report_tx.clone();
let rx = rx.clone();
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
while let Ok(()) = rx.recv().await {
let res = w.work().await;
let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
report_tx.send(res).await.unwrap();
if is_cancel {
break;
}
@@ -620,17 +621,17 @@ pub async fn work_with_qps(
/// n tasks by m workers limit to qps works in a second with latency correction
pub async fn work_with_qps_latency_correction(
client_builder: ClientBuilder,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: AsyncSender<Result<RequestResult, ClientError>>,
qps: usize,
n_tasks: usize,
n_workers: usize,
) {
let (tx, rx) = flume::unbounded();
let (tx, rx) = kanal::unbounded_async();

tokio::spawn(async move {
let start = std::time::Instant::now();
for i in 0..n_tasks {
tx.send_async(std::time::Instant::now()).await.unwrap();
tx.send(std::time::Instant::now()).await.unwrap();
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
@@ -640,20 +641,20 @@ pub async fn work_with_qps_latency_correction(
});

let futures = (0..n_workers)
.map(|_| {
.map(move |_| {
let mut w = client_builder.build();
let report_tx = report_tx.clone();
let rx = rx.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
while let Ok(start) = rx.recv().await {
let mut res = w.work().await;

if let Ok(request_result) = &mut res {
request_result.start = start;
}

let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
report_tx.send(res).await.unwrap();
if is_cancel {
break;
}
@@ -670,19 +671,19 @@ pub async fn work_with_qps_latency_correction(
/// Run until dead_line by n workers
pub async fn work_until(
client_builder: ClientBuilder,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: AsyncSender<Result<RequestResult, ClientError>>,
dead_line: std::time::Instant,
n_workers: usize,
) {
let futures = (0..n_workers)
.map(|_| {
.map(move |_| {
let report_tx = report_tx.clone();
let mut w = client_builder.build();
tokio::spawn(tokio::time::timeout_at(dead_line.into(), async move {
loop {
let res = w.work().await;
let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
report_tx.send(res).await.unwrap();
if is_cancel {
break;
}
@@ -699,13 +700,13 @@ pub async fn work_until(
/// Run until dead_line by n workers limit to qps works in a second
pub async fn work_until_with_qps(
client_builder: ClientBuilder,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: AsyncSender<Result<RequestResult, ClientError>>,
qps: usize,
start: std::time::Instant,
dead_line: std::time::Instant,
n_workers: usize,
) {
let (tx, rx) = flume::bounded(qps);
let (tx, rx) = kanal::bounded_async(qps);

let gen = tokio::spawn(async move {
for i in 0.. {
@@ -716,25 +717,25 @@ pub async fn work_until_with_qps(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
if tx.send_async(()).await.is_err() {
if tx.send(()).await.is_err() {
break;
}
}
// tx gone
});

let futures = (0..n_workers)
.map(|_| {
.map(move |_| {
let mut w = client_builder.build();
let report_tx = report_tx.clone();
let rx = rx.clone();
tokio::time::timeout_at(
dead_line.into(),
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
while let Ok(()) = rx.recv().await {
let res = w.work().await;
let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
report_tx.send(res).await.unwrap();
if is_cancel {
break;
}
@@ -754,13 +755,13 @@ pub async fn work_until_with_qps(
/// Run until dead_line by n workers limit to qps works in a second with latency correction
pub async fn work_until_with_qps_latency_correction(
client_builder: ClientBuilder,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
report_tx: AsyncSender<Result<RequestResult, ClientError>>,
qps: usize,
start: std::time::Instant,
dead_line: std::time::Instant,
n_workers: usize,
) {
let (tx, rx) = flume::unbounded();
let (tx, rx) = kanal::unbounded_async();

let gen = tokio::spawn(async move {
for i in 0.. {
@@ -772,30 +773,30 @@ pub async fn work_until_with_qps_latency_correction(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
if tx.send_async(now).await.is_err() {
if tx.send(now).await.is_err() {
break;
}
}
// tx gone
});

let futures = (0..n_workers)
.map(|_| {
.map(move |_| {
let mut w = client_builder.build();
let report_tx = report_tx.clone();
let rx = rx.clone();
tokio::time::timeout_at(
dead_line.into(),
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
while let Ok(start) = rx.recv().await {
let mut res = w.work().await;

if let Ok(request_result) = &mut res {
request_result.start = start;
}

let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
report_tx.send(res).await.unwrap();
if is_cancel {
break;
}
10 changes: 5 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -278,33 +278,33 @@ async fn main() -> anyhow::Result<()> {
PrintMode::Text
};

let (result_tx, result_rx) = flume::unbounded();
let (result_tx, result_rx) = kanal::unbounded_async();

let start = std::time::Instant::now();

let data_collector = if opts.no_tui || !std::io::stdout().is_tty() {
// When `--no-tui` is enabled, just collect all data.
tokio::spawn(
async move {
let (ctrl_c_tx, ctrl_c_rx) = flume::unbounded();
let (ctrl_c_tx, ctrl_c_rx) = kanal::unbounded_async();

tokio::spawn(async move {
if let Ok(()) = tokio::signal::ctrl_c().await {
if let Ok(()) = tokio::signal::ctrl_c().await {
let _ = ctrl_c_tx.send(());
}
});

let mut all: Vec<Result<RequestResult, ClientError>> = Vec::new();
loop {
tokio::select! {
report = result_rx.recv_async() => {
report = result_rx.recv() => {
if let Ok(report) = report {
all.push(report);
} else {
break;
}
}
_ = ctrl_c_rx.recv_async() => {
_ = ctrl_c_rx.recv() => {
// User pressed ctrl-c.
let _ = printer::print_result(&mut std::io::stdout(),print_mode,start, &all, start.elapsed());
std::process::exit(libc::EXIT_SUCCESS);
10 changes: 5 additions & 5 deletions src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use byte_unit::Byte;
use crossterm::event::{Event, KeyCode, KeyEvent, KeyModifiers};
use crossterm::ExecutableCommand;
use flume::TryRecvError;
use kanal::AsyncReceiver;
use std::collections::BTreeMap;
use std::io;
use tui::backend::CrosstermBackend;
@@ -52,7 +52,7 @@ pub struct Monitor {
pub print_mode: PrintMode,
pub end_line: EndLine,
/// All workers sends each result to this channel
pub report_receiver: flume::Receiver<Result<RequestResult, ClientError>>,
pub report_receiver: AsyncReceiver<Result<RequestResult, ClientError>>,
// When started
pub start: std::time::Instant,
// Frame per scond of TUI
@@ -97,17 +97,17 @@ impl Monitor {
let frame_start = std::time::Instant::now();
loop {
match self.report_receiver.try_recv() {
Ok(report) => {
Ok(Some(report)) => {
match report.as_ref() {
Ok(report) => *status_dist.entry(report.status).or_default() += 1,
Err(e) => *error_dist.entry(e.to_string()).or_default() += 1,
}
all.push(report);
}
Err(TryRecvError::Empty) => {
Ok(None) => {
break;
}
Err(TryRecvError::Disconnected) => {
Err(_) => {
// Application ends.
break 'outer;
}

0 comments on commit b002666

Please sign in to comment.