From d47f88b399bc32f2d2900832e3acfe66ab770f49 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 23 Oct 2023 15:33:25 +0200 Subject: [PATCH 1/7] send data to pingthing --- Cargo.lock | 17 +++++++ bench/Cargo.toml | 1 + bench/src/bin/pingthing-tester.rs | 61 ++++++++++++++++++++++++ bench/src/cli.rs | 4 ++ bench/src/lib.rs | 1 + bench/src/main.rs | 30 +++++++++--- bench/src/pingthing.rs | 77 +++++++++++++++++++++++++++++++ 7 files changed, 185 insertions(+), 6 deletions(-) create mode 100644 bench/src/bin/pingthing-tester.rs create mode 100644 bench/src/pingthing.rs diff --git a/Cargo.lock b/Cargo.lock index 159187f1..b44d533d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -585,6 +585,7 @@ dependencies = [ "log", "rand 0.8.5", "rand_chacha 0.3.1", + "reqwest", "serde", "serde_json", "solana-rpc-client", @@ -2050,6 +2051,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iana-time-zone" version = "0.1.57" @@ -3551,10 +3565,12 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -3564,6 +3580,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.1", "tokio-util", "tower-service", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 77fc461d..57daaffb 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -23,6 +23,7 @@ lazy_static = "1.4.0" bincode = { workspace = true } itertools = "0.10.5" spl-memo = "4.0.0" +reqwest = "0.11" [dev-dependencies] bincode = { workspace = true } diff --git a/bench/src/bin/pingthing-tester.rs b/bench/src/bin/pingthing-tester.rs new file mode 100644 index 00000000..363acf5e --- /dev/null +++ b/bench/src/bin/pingthing-tester.rs @@ -0,0 +1,61 @@ +use std::collections::HashMap; +use std::error::Error; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use clap::Parser; +use log::info; +use reqwest::Response; +use serde::{Deserialize, Serialize}; +use solana_rpc_client::rpc_client::RpcClient; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; +use solana_sdk::genesis_config::ClusterType; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; +use bench::pingthing::PingThing; + +/// Simple program to greet a person +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Name of the person to greet + #[arg(long)] + va_api_key: String, +} + + +/// https://www.validators.app/ping-thing +/// see https://github.com/Block-Logic/ping-thing-client/blob/main/ping-thing-client.mjs#L161C10-L181C17 +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + let va_api_key = args.va_api_key; + + let rpc_client = Arc::new(RpcClient::new_with_commitment( + "http://api.mainnet-beta.solana.com/", + CommitmentConfig::confirmed(), + )); + + let current_slot = rpc_client.get_slot().unwrap(); + info!("Current slot: {}", current_slot); + + // some random transaction + let hardcoded_example = Signature::from_str("3yKgzowsEUnXXv7TdbLcHFRqrFvn4CtaNKMELzfqrvokp8Pgw4LGFVAZqvnSLp92B9oY4HGhSEZhSuYqLzkT9sC8").unwrap(); + + let tx_success = true; + let target_slot = current_slot + 0; + info!("Target slot: {}", current_slot); + + + let pingthing = PingThing { + va_api_key, + }; + pingthing.submit_stats(Duration::from_millis(5555), hardcoded_example, tx_success, target_slot, target_slot).await??; + + Ok(()) +} + + diff --git a/bench/src/cli.rs b/bench/src/cli.rs index 11ce7ac7..e8aa433e 100644 --- a/bench/src/cli.rs +++ b/bench/src/cli.rs @@ -23,4 +23,8 @@ pub struct Args { // choose between small (179 bytes) and large (1186 bytes) transactions #[arg(short = 'L', long, default_value_t = false)] pub large_transactions: bool, + #[arg(long, default_value_t = false)] + pub pingthing_enable: bool, + #[arg(long,)] + pub pingthing_va_api_key: Option, } diff --git a/bench/src/lib.rs b/bench/src/lib.rs index 2733319e..c917d93f 100644 --- a/bench/src/lib.rs +++ b/bench/src/lib.rs @@ -1,3 +1,4 @@ pub mod cli; pub mod helpers; pub mod metrics; +pub mod pingthing; \ No newline at end of file diff --git a/bench/src/main.rs b/bench/src/main.rs index 04fdbf1f..6bf66dc4 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -1,12 +1,8 @@ -use bench::{ - cli::Args, - helpers::BenchHelper, - metrics::{AvgMetric, Metric, TxMetricData}, -}; +use bench::{cli::Args, helpers::BenchHelper, metrics::{AvgMetric, Metric, TxMetricData}, pingthing}; use clap::Parser; use dashmap::DashMap; use futures::future::join_all; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Signature; use solana_sdk::{ @@ -21,6 +17,7 @@ use tokio::{ sync::{mpsc::UnboundedSender, RwLock}, time::{Duration, Instant}, }; +use bench::pingthing::PingThing; #[tokio::main(flavor = "multi_thread", worker_threads = 16)] async fn main() { @@ -34,6 +31,8 @@ async fn main() { lite_rpc_addr, transaction_save_file, large_transactions, + pingthing_enable, + pingthing_va_api_key, } = Args::parse(); let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms)); @@ -44,6 +43,15 @@ async fn main() { TransactionSize::Small }; + let pingthing_config = Arc::new(if pingthing_enable { + Some(PingThing { + va_api_key: pingthing_va_api_key.expect("va_api_key must be provided - see https://github.com/Block-Logic/ping-thing-client/tree/main#install-notes") + }) + } else { + None + }); + + info!("Connecting to LiteRPC using {lite_rpc_addr}"); let mut avg_metric = AvgMetric::default(); @@ -113,6 +121,7 @@ async fn main() { tx_log_sx.clone(), log_transactions, transaction_size, + pingthing_config.clone(), ))); // wait for an interval run_interval_ms.tick().await; @@ -169,6 +178,7 @@ async fn bench( tx_metric_sx: UnboundedSender, log_txs: bool, transaction_size: TransactionSize, + pingthing_config: Arc>, ) -> Metric { let map_of_txs: Arc> = Arc::new(DashMap::new()); // transaction sender task @@ -254,6 +264,14 @@ async fn bench( time_to_confirm_in_millis: time_to_confirm.as_millis() as u64, }); } + + if let Some(pingthing) = pingthing_config.as_ref() { + pingthing.submit_stats( + time_to_confirm, signature.clone(), true, tx_data.sent_slot, current_slot.load(Ordering::Relaxed)); + // TODO check + + } + drop(tx_data); map_of_txs.remove(signature); confirmed_count += 1; diff --git a/bench/src/pingthing.rs b/bench/src/pingthing.rs new file mode 100644 index 00000000..6c606596 --- /dev/null +++ b/bench/src/pingthing.rs @@ -0,0 +1,77 @@ +use std::collections::HashMap; +use std::error::Error; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use futures::FutureExt; +use log::{debug, info}; +use reqwest::{Response, StatusCode}; +use serde::{Deserialize, Serialize}; +use solana_rpc_client::rpc_client::RpcClient; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; +use solana_sdk::genesis_config::ClusterType; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; +use tokio::task::JoinHandle; + + +pub struct PingThing { + pub va_api_key: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Request { + time: u128, + signature: String, // Tx sig + transaction_type: String, // 'transfer', + success: bool, // txSuccess + application: String, // e.g. 'web3' + commitment_level: String, // e.g. 'confirmed' + slot_sent: Slot, + slot_landed: Slot, +} + + +impl PingThing { + pub fn submit_stats(&self, tx_elapsed: Duration, tx_sig: Signature, tx_success: bool, slot_sent: Slot, slot_landed: Slot) -> JoinHandle> { + let jh = tokio::spawn(submit_stats_to_ping_thing( + self.va_api_key.clone(), + tx_elapsed, tx_sig, tx_success, slot_sent, slot_landed)); + jh + } + +} + +// subit to https://www.validators.app/ping-thing?network=mainnet +async fn submit_stats_to_ping_thing(va_api_key: String, tx_elapsed: Duration, tx_sig: Signature, tx_success: bool, slot_sent: Slot, slot_landed: Slot) + -> anyhow::Result<()> { + // TODO only works on mainnet - skip for all others + + let foo = Request { + time: tx_elapsed.as_millis(), + signature: tx_sig.to_string(), + transaction_type: "transfer".to_string(), + success: tx_success, + application: "LiteRPC.bench".to_string(), + commitment_level: "confirmed".to_string(), + slot_sent, + slot_landed, + }; + + let client = reqwest::Client::new(); + let response = client.post("https://www.validators.app/api/v1/ping-thing/mainnet") + .header("Content-Type", "application/json") + .header("Token", va_api_key) + .json(&foo) + .send() + .await? + .error_for_status()?; + + assert_eq!(response.status(), StatusCode::CREATED); + + debug!("Sent data for tx {} to ping-thing server", tx_sig); + Ok(()) +} + + From 460a9814e96f9aa2f1518eb504186465de71f5e7 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 23 Oct 2023 15:51:23 +0200 Subject: [PATCH 2/7] make cluster configurable --- bench/src/bin/pingthing-tester.rs | 10 ++++--- bench/src/cli.rs | 4 ++- bench/src/main.rs | 2 ++ bench/src/pingthing.rs | 45 ++++++++++++++++++++++++++----- 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/bench/src/bin/pingthing-tester.rs b/bench/src/bin/pingthing-tester.rs index 363acf5e..a546ebc2 100644 --- a/bench/src/bin/pingthing-tester.rs +++ b/bench/src/bin/pingthing-tester.rs @@ -13,7 +13,7 @@ use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_sdk::genesis_config::ClusterType; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; -use bench::pingthing::PingThing; +use bench::pingthing::{ClusterKeys, PingThing}; /// Simple program to greet a person #[derive(Parser, Debug)] @@ -38,10 +38,15 @@ async fn main() -> Result<(), Box> { "http://api.mainnet-beta.solana.com/", CommitmentConfig::confirmed(), )); + let pingthing = PingThing { + cluster: ClusterKeys::Mainnet, + va_api_key, + }; let current_slot = rpc_client.get_slot().unwrap(); info!("Current slot: {}", current_slot); + // some random transaction let hardcoded_example = Signature::from_str("3yKgzowsEUnXXv7TdbLcHFRqrFvn4CtaNKMELzfqrvokp8Pgw4LGFVAZqvnSLp92B9oY4HGhSEZhSuYqLzkT9sC8").unwrap(); @@ -50,9 +55,6 @@ async fn main() -> Result<(), Box> { info!("Target slot: {}", current_slot); - let pingthing = PingThing { - va_api_key, - }; pingthing.submit_stats(Duration::from_millis(5555), hardcoded_example, tx_success, target_slot, target_slot).await??; Ok(()) diff --git a/bench/src/cli.rs b/bench/src/cli.rs index e8aa433e..463d6e20 100644 --- a/bench/src/cli.rs +++ b/bench/src/cli.rs @@ -25,6 +25,8 @@ pub struct Args { pub large_transactions: bool, #[arg(long, default_value_t = false)] pub pingthing_enable: bool, - #[arg(long,)] + #[arg(long)] + pub pingthing_cluster: Option, + #[arg(long)] pub pingthing_va_api_key: Option, } diff --git a/bench/src/main.rs b/bench/src/main.rs index 6bf66dc4..e99d36fe 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -32,6 +32,7 @@ async fn main() { transaction_save_file, large_transactions, pingthing_enable, + pingthing_cluster, pingthing_va_api_key, } = Args::parse(); @@ -45,6 +46,7 @@ async fn main() { let pingthing_config = Arc::new(if pingthing_enable { Some(PingThing { + cluster: pingthing::ClusterKeys::from_arg(pingthing_cluster.expect("cluster must be set")), va_api_key: pingthing_va_api_key.expect("va_api_key must be provided - see https://github.com/Block-Logic/ping-thing-client/tree/main#install-notes") }) } else { diff --git a/bench/src/pingthing.rs b/bench/src/pingthing.rs index 6c606596..b65602f9 100644 --- a/bench/src/pingthing.rs +++ b/bench/src/pingthing.rs @@ -16,7 +16,39 @@ use solana_sdk::signature::Signature; use tokio::task::JoinHandle; +/// note: do not change - used on Rest Url +#[derive(Clone, Debug)] +pub enum ClusterKeys { + Mainnet, + Testnet, + Devnet, +} + +impl ClusterKeys { + pub fn from_arg(cluster: String) -> Self { + match cluster.to_lowercase().as_str() { + "mainnet" => ClusterKeys::Mainnet, + "testnet" => ClusterKeys::Testnet, + "devnet" => ClusterKeys::Devnet, + _ => panic!("incorrect cluster name"), + } + } +} + +impl ClusterKeys { + pub fn to_url_part(&self) -> String { + match self { + ClusterKeys::Mainnet => "mainnet", + ClusterKeys::Testnet => "testnet", + ClusterKeys::Devnet => "devnet", + }.to_string() + } + +} + pub struct PingThing { + // e.g. mainnet + pub cluster: ClusterKeys, pub va_api_key: String, } @@ -36,6 +68,7 @@ struct Request { impl PingThing { pub fn submit_stats(&self, tx_elapsed: Duration, tx_sig: Signature, tx_success: bool, slot_sent: Slot, slot_landed: Slot) -> JoinHandle> { let jh = tokio::spawn(submit_stats_to_ping_thing( + self.cluster.clone(), self.va_api_key.clone(), tx_elapsed, tx_sig, tx_success, slot_sent, slot_landed)); jh @@ -44,11 +77,10 @@ impl PingThing { } // subit to https://www.validators.app/ping-thing?network=mainnet -async fn submit_stats_to_ping_thing(va_api_key: String, tx_elapsed: Duration, tx_sig: Signature, tx_success: bool, slot_sent: Slot, slot_landed: Slot) - -> anyhow::Result<()> { - // TODO only works on mainnet - skip for all others +async fn submit_stats_to_ping_thing(cluster: ClusterKeys, va_api_key: String, tx_elapsed: Duration, tx_sig: Signature, tx_success: bool, slot_sent: Slot, slot_landed: Slot) + -> anyhow::Result<()> { - let foo = Request { + let submit_data_request = Request { time: tx_elapsed.as_millis(), signature: tx_sig.to_string(), transaction_type: "transfer".to_string(), @@ -60,10 +92,11 @@ async fn submit_stats_to_ping_thing(va_api_key: String, tx_elapsed: Duration, tx }; let client = reqwest::Client::new(); - let response = client.post("https://www.validators.app/api/v1/ping-thing/mainnet") + // cluster: 'mainnet' + let response = client.post(format!("https://www.validators.app/api/v1/ping-thing/{}", cluster.to_url_part())) .header("Content-Type", "application/json") .header("Token", va_api_key) - .json(&foo) + .json(&submit_data_request) .send() .await? .error_for_status()?; From d66286d509feb1b2d6ebff3530d2e994e6a0dd9b Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 23 Oct 2023 15:57:01 +0200 Subject: [PATCH 3/7] clippy+fix --- bench/src/bin/pingthing-tester.rs | 39 +++++++++--------- bench/src/lib.rs | 2 +- bench/src/main.rs | 20 ++++++--- bench/src/pingthing.rs | 67 +++++++++++++++++-------------- 4 files changed, 70 insertions(+), 58 deletions(-) diff --git a/bench/src/bin/pingthing-tester.rs b/bench/src/bin/pingthing-tester.rs index a546ebc2..240a974b 100644 --- a/bench/src/bin/pingthing-tester.rs +++ b/bench/src/bin/pingthing-tester.rs @@ -1,19 +1,12 @@ -use std::collections::HashMap; -use std::error::Error; -use std::str::FromStr; -use std::sync::Arc; -use std::time::Duration; +use bench::pingthing::{ClusterKeys, PingThing}; use clap::Parser; use log::info; -use reqwest::Response; -use serde::{Deserialize, Serialize}; use solana_rpc_client::rpc_client::RpcClient; -use solana_sdk::clock::Slot; -use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; -use solana_sdk::genesis_config::ClusterType; -use solana_sdk::pubkey::Pubkey; +use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::Signature; -use bench::pingthing::{ClusterKeys, PingThing}; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; /// Simple program to greet a person #[derive(Parser, Debug)] @@ -24,7 +17,6 @@ struct Args { va_api_key: String, } - /// https://www.validators.app/ping-thing /// see https://github.com/Block-Logic/ping-thing-client/blob/main/ping-thing-client.mjs#L161C10-L181C17 #[tokio::main] @@ -46,18 +38,23 @@ async fn main() -> Result<(), Box> { let current_slot = rpc_client.get_slot().unwrap(); info!("Current slot: {}", current_slot); - // some random transaction - let hardcoded_example = Signature::from_str("3yKgzowsEUnXXv7TdbLcHFRqrFvn4CtaNKMELzfqrvokp8Pgw4LGFVAZqvnSLp92B9oY4HGhSEZhSuYqLzkT9sC8").unwrap(); + let hardcoded_example = Signature::from_str( + "3yKgzowsEUnXXv7TdbLcHFRqrFvn4CtaNKMELzfqrvokp8Pgw4LGFVAZqvnSLp92B9oY4HGhSEZhSuYqLzkT9sC8", + ) + .unwrap(); let tx_success = true; - let target_slot = current_slot + 0; - info!("Target slot: {}", current_slot); - - pingthing.submit_stats(Duration::from_millis(5555), hardcoded_example, tx_success, target_slot, target_slot).await??; + pingthing + .submit_stats( + Duration::from_millis(5555), + hardcoded_example, + tx_success, + current_slot, + current_slot, + ) + .await??; Ok(()) } - - diff --git a/bench/src/lib.rs b/bench/src/lib.rs index c917d93f..84149809 100644 --- a/bench/src/lib.rs +++ b/bench/src/lib.rs @@ -1,4 +1,4 @@ pub mod cli; pub mod helpers; pub mod metrics; -pub mod pingthing; \ No newline at end of file +pub mod pingthing; diff --git a/bench/src/main.rs b/bench/src/main.rs index e99d36fe..7fe7d7d1 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -1,8 +1,14 @@ -use bench::{cli::Args, helpers::BenchHelper, metrics::{AvgMetric, Metric, TxMetricData}, pingthing}; +use bench::pingthing::PingThing; +use bench::{ + cli::Args, + helpers::BenchHelper, + metrics::{AvgMetric, Metric, TxMetricData}, + pingthing, +}; use clap::Parser; use dashmap::DashMap; use futures::future::join_all; -use log::{debug, error, info, warn}; +use log::{error, info, warn}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Signature; use solana_sdk::{ @@ -17,7 +23,6 @@ use tokio::{ sync::{mpsc::UnboundedSender, RwLock}, time::{Duration, Instant}, }; -use bench::pingthing::PingThing; #[tokio::main(flavor = "multi_thread", worker_threads = 16)] async fn main() { @@ -53,7 +58,6 @@ async fn main() { None }); - info!("Connecting to LiteRPC using {lite_rpc_addr}"); let mut avg_metric = AvgMetric::default(); @@ -269,9 +273,13 @@ async fn bench( if let Some(pingthing) = pingthing_config.as_ref() { pingthing.submit_stats( - time_to_confirm, signature.clone(), true, tx_data.sent_slot, current_slot.load(Ordering::Relaxed)); + time_to_confirm, + *signature, + true, + tx_data.sent_slot, + current_slot.load(Ordering::Relaxed), + ); // TODO check - } drop(tx_data); diff --git a/bench/src/pingthing.rs b/bench/src/pingthing.rs index b65602f9..16bf3326 100644 --- a/bench/src/pingthing.rs +++ b/bench/src/pingthing.rs @@ -1,21 +1,11 @@ -use std::collections::HashMap; -use std::error::Error; -use std::str::FromStr; -use std::sync::Arc; -use std::time::Duration; -use futures::FutureExt; -use log::{debug, info}; -use reqwest::{Response, StatusCode}; +use log::debug; +use reqwest::StatusCode; use serde::{Deserialize, Serialize}; -use solana_rpc_client::rpc_client::RpcClient; use solana_sdk::clock::Slot; -use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; -use solana_sdk::genesis_config::ClusterType; -use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; +use std::time::Duration; use tokio::task::JoinHandle; - /// note: do not change - used on Rest Url #[derive(Clone, Debug)] pub enum ClusterKeys { @@ -41,9 +31,9 @@ impl ClusterKeys { ClusterKeys::Mainnet => "mainnet", ClusterKeys::Testnet => "testnet", ClusterKeys::Devnet => "devnet", - }.to_string() + } + .to_string() } - } pub struct PingThing { @@ -55,31 +45,46 @@ pub struct PingThing { #[derive(Debug, Serialize, Deserialize)] struct Request { time: u128, - signature: String, // Tx sig + signature: String, // Tx sig transaction_type: String, // 'transfer', - success: bool, // txSuccess - application: String, // e.g. 'web3' + success: bool, // txSuccess + application: String, // e.g. 'web3' commitment_level: String, // e.g. 'confirmed' slot_sent: Slot, slot_landed: Slot, } - impl PingThing { - pub fn submit_stats(&self, tx_elapsed: Duration, tx_sig: Signature, tx_success: bool, slot_sent: Slot, slot_landed: Slot) -> JoinHandle> { - let jh = tokio::spawn(submit_stats_to_ping_thing( + pub fn submit_stats( + &self, + tx_elapsed: Duration, + tx_sig: Signature, + tx_success: bool, + slot_sent: Slot, + slot_landed: Slot, + ) -> JoinHandle> { + tokio::spawn(submit_stats_to_ping_thing( self.cluster.clone(), self.va_api_key.clone(), - tx_elapsed, tx_sig, tx_success, slot_sent, slot_landed)); - jh + tx_elapsed, + tx_sig, + tx_success, + slot_sent, + slot_landed, + )) } - } // subit to https://www.validators.app/ping-thing?network=mainnet -async fn submit_stats_to_ping_thing(cluster: ClusterKeys, va_api_key: String, tx_elapsed: Duration, tx_sig: Signature, tx_success: bool, slot_sent: Slot, slot_landed: Slot) - -> anyhow::Result<()> { - +async fn submit_stats_to_ping_thing( + cluster: ClusterKeys, + va_api_key: String, + tx_elapsed: Duration, + tx_sig: Signature, + tx_success: bool, + slot_sent: Slot, + slot_landed: Slot, +) -> anyhow::Result<()> { let submit_data_request = Request { time: tx_elapsed.as_millis(), signature: tx_sig.to_string(), @@ -93,7 +98,11 @@ async fn submit_stats_to_ping_thing(cluster: ClusterKeys, va_api_key: String, tx let client = reqwest::Client::new(); // cluster: 'mainnet' - let response = client.post(format!("https://www.validators.app/api/v1/ping-thing/{}", cluster.to_url_part())) + let response = client + .post(format!( + "https://www.validators.app/api/v1/ping-thing/{}", + cluster.to_url_part() + )) .header("Content-Type", "application/json") .header("Token", va_api_key) .json(&submit_data_request) @@ -106,5 +115,3 @@ async fn submit_stats_to_ping_thing(cluster: ClusterKeys, va_api_key: String, tx debug!("Sent data for tx {} to ping-thing server", tx_sig); Ok(()) } - - From bff435dcd36962c4d3035ed4940823c55840f09d Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 23 Oct 2023 22:30:20 +0200 Subject: [PATCH 4/7] comment on format --- bench/src/pingthing.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bench/src/pingthing.rs b/bench/src/pingthing.rs index 16bf3326..afa74fee 100644 --- a/bench/src/pingthing.rs +++ b/bench/src/pingthing.rs @@ -42,6 +42,8 @@ pub struct PingThing { pub va_api_key: String, } + +/// request format see https://github.com/Block-Logic/ping-thing-client/blob/4c008c741164702a639c282f1503a237f7d95e64/ping-thing-client.mjs#L160 #[derive(Debug, Serialize, Deserialize)] struct Request { time: u128, From 44a6be88436b485633f5ccd3503daf4ca9aaa858 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 25 Oct 2023 11:31:53 +0200 Subject: [PATCH 5/7] join pingthing send task --- bench/src/main.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bench/src/main.rs b/bench/src/main.rs index 7fe7d7d1..80896fb6 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -272,14 +272,16 @@ async fn bench( } if let Some(pingthing) = pingthing_config.as_ref() { - pingthing.submit_stats( + let pingthing_result = pingthing.submit_stats( time_to_confirm, *signature, true, tx_data.sent_slot, current_slot.load(Ordering::Relaxed), - ); - // TODO check + ).await; + if let Err(err) = pingthing_result { + error!("pingthing submit error {} - continue", err); + } } drop(tx_data); From b79b4156f0d4f1218e287d6493c57a58bb05ce81 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 25 Oct 2023 11:40:33 +0200 Subject: [PATCH 6/7] unwrap inner error --- bench/src/main.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/bench/src/main.rs b/bench/src/main.rs index 80896fb6..ef95022b 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -279,8 +279,15 @@ async fn bench( tx_data.sent_slot, current_slot.load(Ordering::Relaxed), ).await; - if let Err(err) = pingthing_result { - error!("pingthing submit error {} - continue", err); + match pingthing_result { + Err(err) => { + error!("pingthing thread error {} - continue", err); + } + Ok(inner) => { + if let Err(err) = inner { + error!("pingthing submit error {} - continue", err); + } + } } } From a4d59b06520d4137a9b475237d53580235b86964 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 25 Oct 2023 13:24:38 +0200 Subject: [PATCH 7/7] do not block main loop --- bench/src/main.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/bench/src/main.rs b/bench/src/main.rs index ef95022b..3f42c3cb 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -19,10 +19,12 @@ use std::sync::{ atomic::{AtomicU64, Ordering}, Arc, }; +use futures::join; use tokio::{ sync::{mpsc::UnboundedSender, RwLock}, time::{Duration, Instant}, }; +use tokio::task::JoinHandle; #[tokio::main(flavor = "multi_thread", worker_threads = 16)] async fn main() { @@ -249,6 +251,9 @@ async fn bench( } if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await { + + let mut pingthing_tasks: Vec>> = vec![]; + for (i, signature) in signatures.iter().enumerate() { let tx_status = &res.value[i]; if tx_status.is_some() { @@ -272,30 +277,25 @@ async fn bench( } if let Some(pingthing) = pingthing_config.as_ref() { - let pingthing_result = pingthing.submit_stats( + let pingthing_jh = pingthing.submit_stats( time_to_confirm, *signature, true, tx_data.sent_slot, current_slot.load(Ordering::Relaxed), - ).await; - match pingthing_result { - Err(err) => { - error!("pingthing thread error {} - continue", err); - } - Ok(inner) => { - if let Err(err) = inner { - error!("pingthing submit error {} - continue", err); - } - } - } + ); + + pingthing_tasks.push(pingthing_jh); } drop(tx_data); map_of_txs.remove(signature); confirmed_count += 1; } - } + } // -- for all signatures + + join_all(pingthing_tasks).await; + } }