Skip to content

Commit

Permalink
Add rpc call burst for huge broker load testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Oct 30, 2024
1 parent 9da28cd commit f40953e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shvcall"
version = "3.2.2"
version = "3.3.2"
edition = "2021"

[features]
Expand Down
85 changes: 64 additions & 21 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use shvrpc::{client, RpcMessage, RpcMessageMetaTags};
use simple_logger::SimpleLogger;
use url::Url;
use async_std::channel::{Sender};
use async_std::stream::StreamExt;
use shvrpc::rpcframe::RpcFrame;
use futures_time::future::FutureExt as ff;
use futures_time::time::Duration;
use futures::FutureExt;
use futures::{FutureExt, StreamExt};

#[cfg(feature = "readline")]
use crossterm::tty::IsTty;
Expand All @@ -32,6 +31,8 @@ use rustyline_async::ReadlineEvent;
use shvproto::{Map, RpcValue};
#[cfg(feature = "readline")]
use std::io::Write;
use futures::stream::FuturesUnordered;
use shvrpc::rpc::ShvRI;

type Result = shvrpc::Result<()>;

Expand All @@ -55,6 +56,9 @@ struct Opts {
/// Create TCP tunnel, SSH like syntax, example: -L 2222:some.host.org:22
#[arg(short = 'L', long)]
tunnel: Option<String>,
/// Send N request in M threads, format is N[,M], default M == 1
#[arg(long)]
burst: Option<String>,
/// Verbose mode (module, .)
#[arg(short, long)]
verbose: Option<String>,
Expand Down Expand Up @@ -259,27 +263,24 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr
Ok(stdout.flush().await?)
}

fn parse_line(line: &str) -> std::result::Result<(&str, &str, &str), String> {
let line = line.trim();
let method_ix = match line.find(':') {
None => {
return Err(format!("Invalid line format, method not found: {line}"));
}
Some(ix) => ix,
};
let param_ix = line.find(' ');
let path = line[..method_ix].trim();
let (method, param) = match param_ix {
None => (line[method_ix + 1..].trim(), ""),
Some(ix) => (line[method_ix + 1..ix].trim(), line[ix + 1..].trim()),
};
Ok((path, method, param))
}
if opts.method.is_none() {
return Err("--method parameter missing".into());
}
let mut stdout = io::stdout();
if opts.method.is_none() {
fn parse_line(line: &str) -> std::result::Result<(&str, &str, &str), String> {
let line = line.trim();
let method_ix = match line.find(':') {
None => {
return Err(format!("Invalid line format, method not found: {line}"));
}
Some(ix) => ix,
};
let param_ix = line.find(' ');
let path = line[..method_ix].trim();
let (method, param) = match param_ix {
None => (line[method_ix + 1..].trim(), ""),
Some(ix) => (line[method_ix + 1..ix].trim(), line[ix + 1..].trim()),
};
Ok((path, method, param))
}
if is_tty() {
#[cfg(feature = "readline")]
{
Expand Down Expand Up @@ -414,6 +415,45 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr
}
Ok(())
}
async fn receive_response(frame_reader: &mut BoxedFrameReader, rq_id: RqId) -> shvrpc::Result<RpcFrame> {
loop {
let frame = frame_reader.receive_frame().await?;
if frame.is_response() && frame.request_id().unwrap_or_default() == rq_id {
return Ok(frame);
}
}
}
async fn make_burst_call(url: &Url, opts: &Opts) -> Result {
if opts.method.is_none() {
return Err("--method parameter missing".into());
}
let burst = opts.burst.clone().unwrap();
let (nmsg, ntask) = {
let mut s = burst.split(',');
let nmsg = s.next().unwrap();
let nmsg = nmsg.parse::<i32>().unwrap();
let ntask = s.next().unwrap_or("1");
let ntask = ntask.parse::<i32>().unwrap();
(nmsg, ntask)
};
let method = opts.method.clone().unwrap();
let ri = ShvRI::try_from(method)?;
let param = opts.param.clone().map(|p| RpcValue::from_cpon(&p).unwrap());
async fn burst_task(url: Url, path: String, method: String, param: Option<RpcValue>, taskno: i32, count: i32) {
println!("Starting burst task #{taskno}, {count} calls of {path}:{method}");
let (mut frame_reader, mut frame_writer) = login(&url).await.unwrap();
for _ in 0 .. count {
let rqid = frame_writer.send_request(&path, &method, param.clone()).await.unwrap();
receive_response(&mut frame_reader, rqid).await.unwrap();
}
println!("Burst task #{taskno} finished, after {count} calls made successfully.");
}
(0..ntask).map(|taskno| {
task::spawn(burst_task(url.clone(), ri.path().to_owned(), ri.method().to_owned(), param.clone(), taskno + 1, nmsg))
}).collect::<FuturesUnordered<_>>().collect::<Vec<_>>().await;

Ok(())
}
fn split_quoted(s: &str) -> Vec<&str> {
let mut wrapped = false;
let ret = s.split(|c| {
Expand Down Expand Up @@ -574,6 +614,9 @@ async fn make_tunnel(mut frame_reader: BoxedFrameReader, mut frame_writer: Boxed
Ok(())
}
async fn try_main(url: &Url, opts: Opts) -> Result {
if opts.burst.is_some() {
return make_burst_call(url, &opts).await
}
let (frame_reader, frame_writer) = login(url).await?;
let res = if opts.tunnel.is_some() {
make_tunnel(frame_reader, frame_writer, &opts).await
Expand Down

0 comments on commit f40953e

Please sign in to comment.