Skip to content

Commit

Permalink
e2e latency bench
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Zaikin committed Jan 15, 2024
1 parent 74f7608 commit d7453ff
Show file tree
Hide file tree
Showing 5 changed files with 4,308 additions and 1 deletion.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions simple-listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ roaring.workspace = true
env_logger = "0.10.0"
log = "0.4"
hex = "*"
csv = "1.3.0"

[target.'cfg(not(target_env = "msvc"))'.build-dependencies]
protobuf-src.workspace = true
Expand Down
46 changes: 45 additions & 1 deletion simple-listener/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::Parser;
use std::fs::File;
use std::time::Duration;
use tokio::time::sleep;
use tonic::transport::Channel;
Expand All @@ -21,6 +22,9 @@ struct Args {
/// Subdag id from which to receive updates
#[arg(short, long, default_value_t = 0)]
from_id: u64,
/// Path to csv file to store transaction stats
#[arg(short, long)]
tx_output: Option<String>,
}

#[tokio::main]
Expand All @@ -32,7 +36,7 @@ async fn main() {
match connect(args.endpoint.clone()).await {
Ok(client) => {
info!("Connected. Exporting subdags from #{}...", args.from_id);
match export(client, args.from_id).await {
match export(client, args.from_id, args.tx_output.clone()).await {
Ok(_) => {
info!("Exit");
break;
Expand All @@ -57,10 +61,20 @@ async fn connect(endpoint: String) -> Result<ExporterClient<Channel>, tonic::tra
async fn export(
mut client: ExporterClient<Channel>,
from_id: u64,
tx_output: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut stream = client.export(ExportRequest { from_id }).await?.into_inner();

let mut tx_writer = match tx_output {
Some(path) => Some(csv::Writer::from_path(path).unwrap()),
None => None,
};

while let Some(subdag) = stream.message().await? {
if let Some(ref mut writer) = tx_writer {
write_tx_stats(&subdag, writer);
}

let stats = stats(&subdag);
if stats.num_txs > 0 {
info!(
Expand Down Expand Up @@ -133,3 +147,33 @@ fn stats(subdag: &SubDag) -> Stats {
cert_round_delta: last_cert_round - first_cert_round,
}
}

fn write_tx_stats(subdag: &SubDag, writer: &mut csv::Writer<File>) {
let received_at = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
let first_cert_round = subdag.certificates[0].clone().header.unwrap().round;
let last_cert_round = subdag.leader.clone().unwrap().header.unwrap().round;
let num_rounds = last_cert_round - first_cert_round + 1;
let num_blocks = subdag.payloads.len();

for payload in subdag.payloads.iter() {
for batch in payload.batches.iter() {
for tx in batch.transactions.iter() {
let tx_time_bytes: [u8; 16] = match tx.get(0..16) {
Some(value) => value.try_into().unwrap(),
None => {
warn!("Foreign transaction {}", hex::encode(tx));
continue
}
};
let tx_time = u128::from_be_bytes(tx_time_bytes);

writer.write_field(tx_time.to_string()).unwrap();
writer.write_field(received_at.to_string()).unwrap();
writer.write_field(tx.len().to_string()).unwrap();
writer.write_field(num_rounds.to_string()).unwrap();
writer.write_field(num_blocks.to_string()).unwrap();
writer.write_record(None::<&[u8]>).unwrap();
}
}
}
}
Loading

0 comments on commit d7453ff

Please sign in to comment.