diff --git a/Cargo.lock b/Cargo.lock index 1006752f..f79d6660 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -585,6 +585,7 @@ dependencies = [ "log", "rand 0.8.5", "rand_chacha 0.3.1", + "reqwest", "serde", "serde_json", "solana-rpc-client", @@ -2050,6 +2051,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iana-time-zone" version = "0.1.57" @@ -3551,10 +3565,12 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -3564,6 +3580,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.1", "tokio-util", "tower-service", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 77fc461d..57daaffb 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -23,6 +23,7 @@ lazy_static = "1.4.0" bincode = { workspace = true } itertools = "0.10.5" spl-memo = "4.0.0" +reqwest = "0.11" [dev-dependencies] bincode = { workspace = true } diff --git a/bench/src/bin/pingthing-tester.rs b/bench/src/bin/pingthing-tester.rs new file mode 100644 index 00000000..240a974b --- /dev/null +++ b/bench/src/bin/pingthing-tester.rs @@ -0,0 +1,60 @@ +use bench::pingthing::{ClusterKeys, PingThing}; +use clap::Parser; +use log::info; +use solana_rpc_client::rpc_client::RpcClient; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::signature::Signature; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +/// Simple program to greet a person +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Name of the person to greet + #[arg(long)] + va_api_key: String, +} + +/// https://www.validators.app/ping-thing +/// see https://github.com/Block-Logic/ping-thing-client/blob/main/ping-thing-client.mjs#L161C10-L181C17 +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + let va_api_key = args.va_api_key; + + let rpc_client = Arc::new(RpcClient::new_with_commitment( + "http://api.mainnet-beta.solana.com/", + CommitmentConfig::confirmed(), + )); + let pingthing = PingThing { + cluster: ClusterKeys::Mainnet, + va_api_key, + }; + + let current_slot = rpc_client.get_slot().unwrap(); + info!("Current slot: {}", current_slot); + + // some random transaction + let hardcoded_example = Signature::from_str( + "3yKgzowsEUnXXv7TdbLcHFRqrFvn4CtaNKMELzfqrvokp8Pgw4LGFVAZqvnSLp92B9oY4HGhSEZhSuYqLzkT9sC8", + ) + .unwrap(); + + let tx_success = true; + + pingthing + .submit_stats( + Duration::from_millis(5555), + hardcoded_example, + tx_success, + current_slot, + current_slot, + ) + .await??; + + Ok(()) +} diff --git a/bench/src/cli.rs b/bench/src/cli.rs index 11ce7ac7..463d6e20 100644 --- a/bench/src/cli.rs +++ b/bench/src/cli.rs @@ -23,4 +23,10 @@ pub struct Args { // choose between small (179 bytes) and large (1186 bytes) transactions #[arg(short = 'L', long, default_value_t = false)] pub large_transactions: bool, + #[arg(long, default_value_t = false)] + pub pingthing_enable: bool, + #[arg(long)] + pub pingthing_cluster: Option, + #[arg(long)] + pub pingthing_va_api_key: Option, } diff --git a/bench/src/lib.rs b/bench/src/lib.rs index 2733319e..84149809 100644 --- a/bench/src/lib.rs +++ b/bench/src/lib.rs @@ -1,3 +1,4 @@ pub mod cli; pub mod helpers; pub mod metrics; +pub mod pingthing; diff --git a/bench/src/main.rs b/bench/src/main.rs index 04fdbf1f..3f42c3cb 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -1,7 +1,9 @@ +use bench::pingthing::PingThing; use bench::{ cli::Args, helpers::BenchHelper, metrics::{AvgMetric, Metric, TxMetricData}, + pingthing, }; use clap::Parser; use dashmap::DashMap; @@ -17,10 +19,12 @@ use std::sync::{ atomic::{AtomicU64, Ordering}, Arc, }; +use futures::join; use tokio::{ sync::{mpsc::UnboundedSender, RwLock}, time::{Duration, Instant}, }; +use tokio::task::JoinHandle; #[tokio::main(flavor = "multi_thread", worker_threads = 16)] async fn main() { @@ -34,6 +38,9 @@ async fn main() { lite_rpc_addr, transaction_save_file, large_transactions, + pingthing_enable, + pingthing_cluster, + pingthing_va_api_key, } = Args::parse(); let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms)); @@ -44,6 +51,15 @@ async fn main() { TransactionSize::Small }; + let pingthing_config = Arc::new(if pingthing_enable { + Some(PingThing { + cluster: pingthing::ClusterKeys::from_arg(pingthing_cluster.expect("cluster must be set")), + va_api_key: pingthing_va_api_key.expect("va_api_key must be provided - see https://github.com/Block-Logic/ping-thing-client/tree/main#install-notes") + }) + } else { + None + }); + info!("Connecting to LiteRPC using {lite_rpc_addr}"); let mut avg_metric = AvgMetric::default(); @@ -113,6 +129,7 @@ async fn main() { tx_log_sx.clone(), log_transactions, transaction_size, + pingthing_config.clone(), ))); // wait for an interval run_interval_ms.tick().await; @@ -169,6 +186,7 @@ async fn bench( tx_metric_sx: UnboundedSender, log_txs: bool, transaction_size: TransactionSize, + pingthing_config: Arc>, ) -> Metric { let map_of_txs: Arc> = Arc::new(DashMap::new()); // transaction sender task @@ -233,6 +251,9 @@ async fn bench( } if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await { + + let mut pingthing_tasks: Vec>> = vec![]; + for (i, signature) in signatures.iter().enumerate() { let tx_status = &res.value[i]; if tx_status.is_some() { @@ -254,11 +275,27 @@ async fn bench( time_to_confirm_in_millis: time_to_confirm.as_millis() as u64, }); } + + if let Some(pingthing) = pingthing_config.as_ref() { + let pingthing_jh = pingthing.submit_stats( + time_to_confirm, + *signature, + true, + tx_data.sent_slot, + current_slot.load(Ordering::Relaxed), + ); + + pingthing_tasks.push(pingthing_jh); + } + drop(tx_data); map_of_txs.remove(signature); confirmed_count += 1; } - } + } // -- for all signatures + + join_all(pingthing_tasks).await; + } } diff --git a/bench/src/pingthing.rs b/bench/src/pingthing.rs new file mode 100644 index 00000000..afa74fee --- /dev/null +++ b/bench/src/pingthing.rs @@ -0,0 +1,119 @@ +use log::debug; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; +use solana_sdk::clock::Slot; +use solana_sdk::signature::Signature; +use std::time::Duration; +use tokio::task::JoinHandle; + +/// note: do not change - used on Rest Url +#[derive(Clone, Debug)] +pub enum ClusterKeys { + Mainnet, + Testnet, + Devnet, +} + +impl ClusterKeys { + pub fn from_arg(cluster: String) -> Self { + match cluster.to_lowercase().as_str() { + "mainnet" => ClusterKeys::Mainnet, + "testnet" => ClusterKeys::Testnet, + "devnet" => ClusterKeys::Devnet, + _ => panic!("incorrect cluster name"), + } + } +} + +impl ClusterKeys { + pub fn to_url_part(&self) -> String { + match self { + ClusterKeys::Mainnet => "mainnet", + ClusterKeys::Testnet => "testnet", + ClusterKeys::Devnet => "devnet", + } + .to_string() + } +} + +pub struct PingThing { + // e.g. mainnet + pub cluster: ClusterKeys, + pub va_api_key: String, +} + + +/// request format see https://github.com/Block-Logic/ping-thing-client/blob/4c008c741164702a639c282f1503a237f7d95e64/ping-thing-client.mjs#L160 +#[derive(Debug, Serialize, Deserialize)] +struct Request { + time: u128, + signature: String, // Tx sig + transaction_type: String, // 'transfer', + success: bool, // txSuccess + application: String, // e.g. 'web3' + commitment_level: String, // e.g. 'confirmed' + slot_sent: Slot, + slot_landed: Slot, +} + +impl PingThing { + pub fn submit_stats( + &self, + tx_elapsed: Duration, + tx_sig: Signature, + tx_success: bool, + slot_sent: Slot, + slot_landed: Slot, + ) -> JoinHandle> { + tokio::spawn(submit_stats_to_ping_thing( + self.cluster.clone(), + self.va_api_key.clone(), + tx_elapsed, + tx_sig, + tx_success, + slot_sent, + slot_landed, + )) + } +} + +// subit to https://www.validators.app/ping-thing?network=mainnet +async fn submit_stats_to_ping_thing( + cluster: ClusterKeys, + va_api_key: String, + tx_elapsed: Duration, + tx_sig: Signature, + tx_success: bool, + slot_sent: Slot, + slot_landed: Slot, +) -> anyhow::Result<()> { + let submit_data_request = Request { + time: tx_elapsed.as_millis(), + signature: tx_sig.to_string(), + transaction_type: "transfer".to_string(), + success: tx_success, + application: "LiteRPC.bench".to_string(), + commitment_level: "confirmed".to_string(), + slot_sent, + slot_landed, + }; + + let client = reqwest::Client::new(); + // cluster: 'mainnet' + let response = client + .post(format!( + "https://www.validators.app/api/v1/ping-thing/{}", + cluster.to_url_part() + )) + .header("Content-Type", "application/json") + .header("Token", va_api_key) + .json(&submit_data_request) + .send() + .await? + .error_for_status()?; + + assert_eq!(response.status(), StatusCode::CREATED); + + debug!("Sent data for tx {} to ping-thing server", tx_sig); + Ok(()) +}