From f40953e6d4bdf46a2c8b81b49ae490d3b66b41b4 Mon Sep 17 00:00:00 2001 From: Fanda Vacek Date: Wed, 30 Oct 2024 20:56:17 +0100 Subject: [PATCH] Add rpc call burst for huge broker load testing --- Cargo.toml | 2 +- src/main.rs | 85 ++++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 65 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a7fc1e1..6933e3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shvcall" -version = "3.2.2" +version = "3.3.2" edition = "2021" [features] diff --git a/src/main.rs b/src/main.rs index f0b9254..3153bae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,11 +19,10 @@ use shvrpc::{client, RpcMessage, RpcMessageMetaTags}; use simple_logger::SimpleLogger; use url::Url; use async_std::channel::{Sender}; -use async_std::stream::StreamExt; use shvrpc::rpcframe::RpcFrame; use futures_time::future::FutureExt as ff; use futures_time::time::Duration; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; #[cfg(feature = "readline")] use crossterm::tty::IsTty; @@ -32,6 +31,8 @@ use rustyline_async::ReadlineEvent; use shvproto::{Map, RpcValue}; #[cfg(feature = "readline")] use std::io::Write; +use futures::stream::FuturesUnordered; +use shvrpc::rpc::ShvRI; type Result = shvrpc::Result<()>; @@ -55,6 +56,9 @@ struct Opts { /// Create TCP tunnel, SSH like syntax, example: -L 2222:some.host.org:22 #[arg(short = 'L', long)] tunnel: Option, + /// Send N request in M threads, format is N[,M], default M == 1 + #[arg(long)] + burst: Option, /// Verbose mode (module, .) #[arg(short, long)] verbose: Option, @@ -259,27 +263,24 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr Ok(stdout.flush().await?) } - fn parse_line(line: &str) -> std::result::Result<(&str, &str, &str), String> { - let line = line.trim(); - let method_ix = match line.find(':') { - None => { - return Err(format!("Invalid line format, method not found: {line}")); - } - Some(ix) => ix, - }; - let param_ix = line.find(' '); - let path = line[..method_ix].trim(); - let (method, param) = match param_ix { - None => (line[method_ix + 1..].trim(), ""), - Some(ix) => (line[method_ix + 1..ix].trim(), line[ix + 1..].trim()), - }; - Ok((path, method, param)) - } - if opts.method.is_none() { - return Err("--method parameter missing".into()); - } let mut stdout = io::stdout(); if opts.method.is_none() { + fn parse_line(line: &str) -> std::result::Result<(&str, &str, &str), String> { + let line = line.trim(); + let method_ix = match line.find(':') { + None => { + return Err(format!("Invalid line format, method not found: {line}")); + } + Some(ix) => ix, + }; + let param_ix = line.find(' '); + let path = line[..method_ix].trim(); + let (method, param) = match param_ix { + None => (line[method_ix + 1..].trim(), ""), + Some(ix) => (line[method_ix + 1..ix].trim(), line[ix + 1..].trim()), + }; + Ok((path, method, param)) + } if is_tty() { #[cfg(feature = "readline")] { @@ -414,6 +415,45 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr } Ok(()) } +async fn receive_response(frame_reader: &mut BoxedFrameReader, rq_id: RqId) -> shvrpc::Result { + loop { + let frame = frame_reader.receive_frame().await?; + if frame.is_response() && frame.request_id().unwrap_or_default() == rq_id { + return Ok(frame); + } + } +} +async fn make_burst_call(url: &Url, opts: &Opts) -> Result { + if opts.method.is_none() { + return Err("--method parameter missing".into()); + } + let burst = opts.burst.clone().unwrap(); + let (nmsg, ntask) = { + let mut s = burst.split(','); + let nmsg = s.next().unwrap(); + let nmsg = nmsg.parse::().unwrap(); + let ntask = s.next().unwrap_or("1"); + let ntask = ntask.parse::().unwrap(); + (nmsg, ntask) + }; + let method = opts.method.clone().unwrap(); + let ri = ShvRI::try_from(method)?; + let param = opts.param.clone().map(|p| RpcValue::from_cpon(&p).unwrap()); + async fn burst_task(url: Url, path: String, method: String, param: Option, taskno: i32, count: i32) { + println!("Starting burst task #{taskno}, {count} calls of {path}:{method}"); + let (mut frame_reader, mut frame_writer) = login(&url).await.unwrap(); + for _ in 0 .. count { + let rqid = frame_writer.send_request(&path, &method, param.clone()).await.unwrap(); + receive_response(&mut frame_reader, rqid).await.unwrap(); + } + println!("Burst task #{taskno} finished, after {count} calls made successfully."); + } + (0..ntask).map(|taskno| { + task::spawn(burst_task(url.clone(), ri.path().to_owned(), ri.method().to_owned(), param.clone(), taskno + 1, nmsg)) + }).collect::>().collect::>().await; + + Ok(()) +} fn split_quoted(s: &str) -> Vec<&str> { let mut wrapped = false; let ret = s.split(|c| { @@ -574,6 +614,9 @@ async fn make_tunnel(mut frame_reader: BoxedFrameReader, mut frame_writer: Boxed Ok(()) } async fn try_main(url: &Url, opts: Opts) -> Result { + if opts.burst.is_some() { + return make_burst_call(url, &opts).await + } let (frame_reader, frame_writer) = login(url).await?; let res = if opts.tunnel.is_some() { make_tunnel(frame_reader, frame_writer, &opts).await