From 5127a2b52efd0599f1aeed8437acc435f74f4fea Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Tue, 29 Oct 2024 08:04:55 +0100 Subject: [PATCH] feat: grpc ingest monitor --- Cargo.lock | 2 + bubblegum/src/lib.rs | 41 ++++++++--------- bubblegum/src/verify.rs | 83 ++++++++++++++++++---------------- grpc-ingest/Cargo.toml | 1 + grpc-ingest/config-monitor.yml | 8 ++++ grpc-ingest/src/config.rs | 45 ++++++++++++++---- grpc-ingest/src/monitor.rs | 44 +++++++++++++++++- grpc-ingest/src/postgres.rs | 4 +- grpc-ingest/src/prom.rs | 53 ++++++++++++++++++++++ ops/Cargo.toml | 3 +- ops/src/bubblegum/verify.rs | 14 +++++- 11 files changed, 224 insertions(+), 74 deletions(-) create mode 100644 grpc-ingest/config-monitor.yml diff --git a/Cargo.lock b/Cargo.lock index d47aef9b4..d3563d3b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,6 +1892,7 @@ dependencies = [ "cargo-lock", "chrono", "clap 4.4.8", + "das-bubblegum", "das-core", "digital_asset_types", "futures", @@ -1960,6 +1961,7 @@ dependencies = [ "sqlx", "thiserror", "tokio", + "tracing", ] [[package]] diff --git a/bubblegum/src/lib.rs b/bubblegum/src/lib.rs index 09aa52647..f1dc031c6 100644 --- a/bubblegum/src/lib.rs +++ b/bubblegum/src/lib.rs @@ -5,6 +5,7 @@ mod tree; use das_core::{MetadataJsonDownloadWorkerArgs, Rpc}; pub use error::ErrorKind; mod verify; +pub use verify::ProofReport; use anyhow::Result; use backfill::worker::{ProgramTransformerWorkerArgs, SignatureWorkerArgs, TreeWorkerArgs}; @@ -18,7 +19,7 @@ use sea_orm::{EntityTrait, QueryFilter}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; use std::str::FromStr; -use tracing::{error, info}; +use tracing::error; use tree::TreeResponse; #[derive(Clone)] @@ -50,16 +51,6 @@ pub struct BackfillArgs { pub tree_worker: TreeWorkerArgs, } -#[derive(Debug, Parser, Clone)] -pub struct VerifyArgs { - /// The list of trees to verify. If not specified, all trees will be crawled. - #[arg(long, env, use_value_delimiter = true)] - pub only_trees: Option>, - - #[arg(long, env, default_value = "20")] - pub max_concurrency: usize, -} - pub async fn start_backfill(context: BubblegumContext, args: BackfillArgs) -> Result<()> { let trees = if let Some(ref only_trees) = args.only_trees { TreeResponse::find(&context.solana_rpc, only_trees.clone()).await? @@ -161,25 +152,33 @@ pub async fn start_bubblegum_replay( Ok(()) } -pub async fn verify_bubblegum(context: BubblegumContext, args: VerifyArgs) -> Result<()> { +#[derive(Debug, Parser, Clone)] +pub struct VerifyArgs { + /// The list of trees to verify. If not specified, all trees will be crawled. + #[arg(long, env, use_value_delimiter = true)] + pub only_trees: Option>, + + #[arg(long, env, default_value = "20")] + pub max_concurrency: usize, +} + +pub async fn verify_bubblegum( + context: BubblegumContext, + args: VerifyArgs, +) -> Result> { let trees = if let Some(ref only_trees) = args.only_trees { TreeResponse::find(&context.solana_rpc, only_trees.clone()).await? } else { TreeResponse::all(&context.solana_rpc).await? }; + let mut reports = Vec::new(); + for tree in trees { let report = verify::check(context.clone(), tree, args.max_concurrency).await?; - info!( - "Tree: {}, Total Leaves: {}, Incorrect Proofs: {}, Not Found Proofs: {}, Correct Proofs: {}", - report.tree_pubkey, - report.total_leaves, - report.incorrect_proofs, - report.not_found_proofs, - report.correct_proofs - ); + reports.push(report); } - Ok(()) + Ok(reports) } diff --git a/bubblegum/src/verify.rs b/bubblegum/src/verify.rs index e83297ee9..fae6051d6 100644 --- a/bubblegum/src/verify.rs +++ b/bubblegum/src/verify.rs @@ -2,21 +2,15 @@ use super::BubblegumContext; use crate::error::ErrorKind; use crate::tree::TreeResponse; use anyhow::{anyhow, Result}; -use borsh::BorshDeserialize; use digital_asset_types::dapi::get_proof_for_asset; use digital_asset_types::rpc::AssetProof; use futures::stream::{FuturesUnordered, StreamExt}; use mpl_bubblegum::accounts::TreeConfig; use sea_orm::SqlxPostgresConnector; use sha3::{Digest, Keccak256}; -use solana_sdk::{pubkey::Pubkey, syscalls::MAX_CPI_INSTRUCTION_ACCOUNTS}; -use spl_account_compression::{ - canopy::fill_in_proof_from_canopy, - concurrent_tree_wrapper::ProveLeafArgs, - state::{ - merkle_tree_get_size, ConcurrentMerkleTreeHeader, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1, - }, -}; +use solana_sdk::pubkey::Pubkey; +use spl_account_compression::concurrent_tree_wrapper::ProveLeafArgs; +use std::fmt; use std::sync::Arc; use tokio::sync::Mutex; use tracing::error; @@ -65,7 +59,7 @@ fn hash(left: &[u8], right: &[u8]) -> [u8; 32] { hash } -fn verify_merkle_proof(root: [u8; 32], proof: &ProveLeafArgs) -> bool { +fn verify_merkle_proof(proof: &ProveLeafArgs) -> bool { let mut node = proof.leaf; for (i, sibling) in proof.proof_vec.iter().enumerate() { if (proof.index >> i) & 1 == 0 { @@ -74,22 +68,43 @@ fn verify_merkle_proof(root: [u8; 32], proof: &ProveLeafArgs) -> bool { node = hash(sibling, &node); } } - node == root + node == proof.current_root } -#[derive(Debug)] +fn leaf_proof_result(proof: AssetProof) -> Result { + match ProveLeafArgs::try_from_asset_proof(proof) { + Ok(proof) if verify_merkle_proof(&proof) => Ok(ProofResult::Correct), + Ok(_) => Ok(ProofResult::Incorrect), + Err(_) => Ok(ProofResult::Corrupt), + } +} + +#[derive(Debug, Default)] pub struct ProofReport { pub tree_pubkey: Pubkey, pub total_leaves: usize, pub incorrect_proofs: usize, pub not_found_proofs: usize, pub correct_proofs: usize, + pub corrupt_proofs: usize, } enum ProofResult { Correct, Incorrect, NotFound, + Corrupt, +} + +impl fmt::Display for ProofResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ProofResult::Correct => write!(f, "Correct proof found"), + ProofResult::Incorrect => write!(f, "Incorrect proof found"), + ProofResult::NotFound => write!(f, "Proof not found"), + ProofResult::Corrupt => write!(f, "Corrupt proof found"), + } + } } pub async fn check( @@ -111,9 +126,7 @@ pub async fn check( let report = Arc::new(Mutex::new(ProofReport { tree_pubkey: tree.pubkey, total_leaves: tree_config.num_minted as usize, - incorrect_proofs: 0, - not_found_proofs: 0, - correct_proofs: 0, + ..ProofReport::default() })); let mut tasks = FuturesUnordered::new(); @@ -124,7 +137,7 @@ pub async fn check( } let db = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone()); - let tree_pubkey = tree.pubkey.clone(); + let tree_pubkey = tree.pubkey; let report = Arc::clone(&report); tasks.push(tokio::spawn(async move { @@ -132,33 +145,23 @@ pub async fn check( &[b"asset", &tree_pubkey.to_bytes(), &i.to_le_bytes()], &mpl_bubblegum::ID, ); - let result: Result = - match get_proof_for_asset(&db, asset.to_bytes().to_vec()).await { - Ok(proof) => match ProveLeafArgs::try_from_asset_proof(proof) { - Ok(prove_leaf_args) => { - if verify_merkle_proof(prove_leaf_args.current_root, &prove_leaf_args) { - Ok(ProofResult::Correct) - } else { - Ok(ProofResult::Incorrect) - } - } - Err(_) => Ok(ProofResult::Incorrect), - }, - Err(_) => Ok(ProofResult::NotFound), - }; - - if let Ok(proof_result) = result { + let proof_lookup: Result = + get_proof_for_asset(&db, asset.to_bytes().to_vec()) + .await + .map_or_else(|_| Ok(ProofResult::NotFound), leaf_proof_result); + + if let Ok(proof_result) = proof_lookup { let mut report = report.lock().await; match proof_result { ProofResult::Correct => report.correct_proofs += 1, - ProofResult::Incorrect => { - report.incorrect_proofs += 1; - error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "Incorrect proof found"); - } - ProofResult::NotFound => { - report.not_found_proofs += 1; - error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "Proof not found"); - } + ProofResult::Incorrect => report.incorrect_proofs += 1, + ProofResult::NotFound => report.not_found_proofs += 1, + ProofResult::Corrupt => report.corrupt_proofs += 1, + } + if let ProofResult::Incorrect | ProofResult::NotFound | ProofResult::Corrupt = + proof_result + { + error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "{}", proof_result); } } })); diff --git a/grpc-ingest/Cargo.toml b/grpc-ingest/Cargo.toml index b4237fdc2..4036be057 100644 --- a/grpc-ingest/Cargo.toml +++ b/grpc-ingest/Cargo.toml @@ -9,6 +9,7 @@ publish = { workspace = true } anyhow = { workspace = true } async-stream = { workspace = true } atty = { workspace = true } +das-bubblegum = { workspace = true } sqlx = { workspace = true, features = [ "macros", "runtime-tokio-rustls", diff --git a/grpc-ingest/config-monitor.yml b/grpc-ingest/config-monitor.yml new file mode 100644 index 000000000..8b3278864 --- /dev/null +++ b/grpc-ingest/config-monitor.yml @@ -0,0 +1,8 @@ +prometheus: 127.0.0.1:8876 +rpc: http://127.0.0.1:8899 +postgres: + url: postgres://solana:solana@localhost/solana + min_connections: 10 + max_connections: 50 +bubblegum: + only_trees: null diff --git a/grpc-ingest/src/config.rs b/grpc-ingest/src/config.rs index ecad0d46b..cab28ad6c 100644 --- a/grpc-ingest/src/config.rs +++ b/grpc-ingest/src/config.rs @@ -214,7 +214,7 @@ where #[derive(Debug, Clone, Deserialize)] pub struct ConfigIngester { pub redis: String, - pub postgres: ConfigIngesterPostgres, + pub postgres: ConfigPostgres, pub download_metadata: ConfigIngesterDownloadMetadata, pub snapshots: ConfigIngestStream, pub accounts: ConfigIngestStream, @@ -231,31 +231,31 @@ pub enum ConfigIngesterRedisStreamType { } #[derive(Debug, Clone, Deserialize)] -pub struct ConfigIngesterPostgres { +pub struct ConfigPostgres { pub url: String, #[serde( - default = "ConfigIngesterPostgres::default_min_connections", + default = "ConfigPostgres::default_min_connections", deserialize_with = "deserialize_usize_str" )] pub min_connections: usize, #[serde( - default = "ConfigIngesterPostgres::default_max_connections", + default = "ConfigPostgres::default_max_connections", deserialize_with = "deserialize_usize_str" )] pub max_connections: usize, #[serde( - default = "ConfigIngesterPostgres::default_idle_timeout", + default = "ConfigPostgres::default_idle_timeout", deserialize_with = "deserialize_duration_str" )] pub idle_timeout: Duration, #[serde( - default = "ConfigIngesterPostgres::default_max_lifetime", + default = "ConfigPostgres::default_max_lifetime", deserialize_with = "deserialize_duration_str" )] pub max_lifetime: Duration, } -impl ConfigIngesterPostgres { +impl ConfigPostgres { pub const fn default_min_connections() -> usize { 10 } @@ -337,4 +337,33 @@ impl ConfigIngesterDownloadMetadata { } #[derive(Debug, Clone, Deserialize)] -pub struct ConfigMonitor {} +pub struct ConfigMonitor { + pub postgres: ConfigPostgres, + pub rpc: String, + pub bubblegum: ConfigBubblegumVerify, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ConfigBubblegumVerify { + #[serde( + default = "ConfigBubblegumVerify::default_report_interval", + deserialize_with = "deserialize_duration_str" + )] + pub report_interval: Duration, + #[serde(default)] + pub only_trees: Option>, + #[serde( + default = "ConfigBubblegumVerify::default_max_concurrency", + deserialize_with = "deserialize_usize_str" + )] + pub max_concurrency: usize, +} + +impl ConfigBubblegumVerify { + pub const fn default_report_interval() -> Duration { + Duration::from_millis(5 * 60 * 1000) + } + pub const fn default_max_concurrency() -> usize { + 20 + } +} diff --git a/grpc-ingest/src/monitor.rs b/grpc-ingest/src/monitor.rs index 88e0bf308..ac0c687ec 100644 --- a/grpc-ingest/src/monitor.rs +++ b/grpc-ingest/src/monitor.rs @@ -1,5 +1,47 @@ -use crate::config::ConfigMonitor; +use crate::postgres::create_pool; +use crate::util::create_shutdown; +use crate::{config::ConfigMonitor, prom::update_tree_proof_report}; +use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs}; +use das_core::{Rpc, SolanaRpcArgs}; +use futures::stream::StreamExt; +use tracing::error; pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> { + let mut shutdown = create_shutdown()?; + let database_pool = create_pool(config.postgres).await?; + let rpc = Rpc::from_config(&SolanaRpcArgs { + solana_rpc_url: config.rpc, + }); + + let bubblegum_verify = tokio::spawn(async move { + loop { + let bubblegum_context = BubblegumContext::new(database_pool.clone(), rpc.clone()); + let verify_args = VerifyArgs { + only_trees: config.bubblegum.only_trees.clone(), + max_concurrency: config.bubblegum.max_concurrency, + }; + + match verify_bubblegum(bubblegum_context, verify_args).await { + Ok(reports) => { + for report in reports { + update_tree_proof_report(&report); + } + } + Err(e) => { + error!( + message = "Error verifying bubblegum", + error = ?e + ); + } + } + + tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; + } + }); + + if let Some(_signal) = shutdown.next().await {} + + bubblegum_verify.abort(); + Ok(()) } diff --git a/grpc-ingest/src/postgres.rs b/grpc-ingest/src/postgres.rs index 1077bc49b..d38c5250c 100644 --- a/grpc-ingest/src/postgres.rs +++ b/grpc-ingest/src/postgres.rs @@ -1,6 +1,6 @@ use { crate::{ - config::ConfigIngesterPostgres, + config::ConfigPostgres, prom::{pgpool_connections_set, PgpoolConnectionsKind}, }, sqlx::{ @@ -9,7 +9,7 @@ use { }, }; -pub async fn create_pool(config: ConfigIngesterPostgres) -> anyhow::Result { +pub async fn create_pool(config: ConfigPostgres) -> anyhow::Result { let options: PgConnectOptions = config.url.parse()?; PgPoolOptions::new() .min_connections(config.min_connections.try_into()?) diff --git a/grpc-ingest/src/prom.rs b/grpc-ingest/src/prom.rs index e70e14671..a82022796 100644 --- a/grpc-ingest/src/prom.rs +++ b/grpc-ingest/src/prom.rs @@ -1,5 +1,6 @@ use { crate::{redis::RedisStreamMessageError, version::VERSION as VERSION_INFO}, + das_bubblegum::ProofReport, das_core::MetadataJsonTaskError, hyper::{ server::conn::AddrStream, @@ -72,6 +73,31 @@ lazy_static::lazy_static! { Opts::new("grpc_tasks", "Number of tasks spawned for writing grpc messages to redis "), &[] ).unwrap(); + + static ref BUBBLEGUM_TREE_TOTAL_LEAVES: IntGaugeVec = IntGaugeVec::new( + Opts::new("bubblegum_tree_total_leaves", "Total number of leaves in the bubblegum tree"), + &["tree"] + ).unwrap(); + + static ref BUBBLEGUM_TREE_INCORRECT_PROOFS: IntGaugeVec = IntGaugeVec::new( + Opts::new("bubblegum_tree_incorrect_proofs", "Number of incorrect proofs in the bubblegum tree"), + &["tree"] + ).unwrap(); + + static ref BUBBLEGUM_TREE_NOT_FOUND_PROOFS: IntGaugeVec = IntGaugeVec::new( + Opts::new("bubblegum_tree_not_found_proofs", "Number of not found proofs in the bubblegum tree"), + &["tree"] + ).unwrap(); + + static ref BUBBLEGUM_TREE_CORRECT_PROOFS: IntGaugeVec = IntGaugeVec::new( + Opts::new("bubblegum_tree_correct_proofs", "Number of correct proofs in the bubblegum tree"), + &["tree"] + ).unwrap(); + + static ref BUBBLEGUM_TREE_CORRUPT_PROOFS: IntGaugeVec = IntGaugeVec::new( + Opts::new("bubblegum_tree_corrupt_proofs", "Number of corrupt proofs in the bubblegum tree"), + &["tree"] + ).unwrap(); } pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { @@ -96,6 +122,11 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { register!(INGEST_TASKS); register!(ACK_TASKS); register!(GRPC_TASKS); + register!(BUBBLEGUM_TREE_TOTAL_LEAVES); + register!(BUBBLEGUM_TREE_INCORRECT_PROOFS); + register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS); + register!(BUBBLEGUM_TREE_CORRECT_PROOFS); + register!(BUBBLEGUM_TREE_CORRUPT_PROOFS); VERSION_INFO_METRIC .with_label_values(&[ @@ -319,3 +350,25 @@ pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKin .with_label_values(&[kind.to_str()]) .inc() } + +pub fn update_tree_proof_report(report: &ProofReport) { + BUBBLEGUM_TREE_TOTAL_LEAVES + .with_label_values(&[&report.tree_pubkey.to_string()]) + .set(report.total_leaves as i64); + + BUBBLEGUM_TREE_INCORRECT_PROOFS + .with_label_values(&[&report.tree_pubkey.to_string()]) + .set(report.incorrect_proofs as i64); + + BUBBLEGUM_TREE_NOT_FOUND_PROOFS + .with_label_values(&[&report.tree_pubkey.to_string()]) + .set(report.not_found_proofs as i64); + + BUBBLEGUM_TREE_CORRECT_PROOFS + .with_label_values(&[&report.tree_pubkey.to_string()]) + .set(report.correct_proofs as i64); + + BUBBLEGUM_TREE_CORRUPT_PROOFS + .with_label_values(&[&report.tree_pubkey.to_string()]) + .set(report.corrupt_proofs as i64); +} diff --git a/ops/Cargo.toml b/ops/Cargo.toml index d0a3077ea..14f741870 100644 --- a/ops/Cargo.toml +++ b/ops/Cargo.toml @@ -36,5 +36,6 @@ spl-account-compression = { workspace = true } sqlx = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } mpl-token-metadata = { workspace = true } -serde_json = { workspace = true } \ No newline at end of file +serde_json = { workspace = true } diff --git a/ops/src/bubblegum/verify.rs b/ops/src/bubblegum/verify.rs index aa9a362b4..947d2b3ad 100644 --- a/ops/src/bubblegum/verify.rs +++ b/ops/src/bubblegum/verify.rs @@ -2,6 +2,7 @@ use anyhow::Result; use clap::Parser; use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs}; use das_core::{connect_db, PoolArgs, Rpc, SolanaRpcArgs}; +use tracing::info; #[derive(Debug, Parser, Clone)] pub struct Args { @@ -24,7 +25,18 @@ pub async fn run(config: Args) -> Result<()> { let solana_rpc = Rpc::from_config(&config.solana); let context = BubblegumContext::new(database_pool, solana_rpc); - verify_bubblegum(context, config.verify_bubblegum).await?; + let reports = verify_bubblegum(context, config.verify_bubblegum).await?; + + for report in reports { + info!( + "Tree: {}, Total Leaves: {}, Incorrect Proofs: {}, Not Found Proofs: {}, Correct Proofs: {}", + report.tree_pubkey, + report.total_leaves, + report.incorrect_proofs, + report.not_found_proofs, + report.correct_proofs + ); + } Ok(()) }