Skip to content

Commit

Permalink
feat: grpc ingest monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Oct 29, 2024
1 parent 2748927 commit dcc939c
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 74 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 20 additions & 21 deletions bubblegum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod tree;
use das_core::{MetadataJsonDownloadWorkerArgs, Rpc};
pub use error::ErrorKind;
mod verify;
pub use verify::ProofReport;

use anyhow::Result;
use backfill::worker::{ProgramTransformerWorkerArgs, SignatureWorkerArgs, TreeWorkerArgs};
Expand All @@ -18,7 +19,7 @@ use sea_orm::{EntityTrait, QueryFilter};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use std::str::FromStr;
use tracing::{error, info};
use tracing::error;
use tree::TreeResponse;

#[derive(Clone)]
Expand Down Expand Up @@ -50,16 +51,6 @@ pub struct BackfillArgs {
pub tree_worker: TreeWorkerArgs,
}

#[derive(Debug, Parser, Clone)]
pub struct VerifyArgs {
/// The list of trees to verify. If not specified, all trees will be crawled.
#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

#[arg(long, env, default_value = "20")]
pub max_concurrency: usize,
}

pub async fn start_backfill(context: BubblegumContext, args: BackfillArgs) -> Result<()> {
let trees = if let Some(ref only_trees) = args.only_trees {
TreeResponse::find(&context.solana_rpc, only_trees.clone()).await?
Expand Down Expand Up @@ -161,25 +152,33 @@ pub async fn start_bubblegum_replay(
Ok(())
}

pub async fn verify_bubblegum(context: BubblegumContext, args: VerifyArgs) -> Result<()> {
#[derive(Debug, Parser, Clone)]
pub struct VerifyArgs {
/// The list of trees to verify. If not specified, all trees will be crawled.
#[arg(long, env, use_value_delimiter = true)]
pub only_trees: Option<Vec<String>>,

#[arg(long, env, default_value = "20")]
pub max_concurrency: usize,
}

pub async fn verify_bubblegum(
context: BubblegumContext,
args: VerifyArgs,
) -> Result<Vec<verify::ProofReport>> {
let trees = if let Some(ref only_trees) = args.only_trees {
TreeResponse::find(&context.solana_rpc, only_trees.clone()).await?
} else {
TreeResponse::all(&context.solana_rpc).await?
};

let mut reports = Vec::new();

for tree in trees {
let report = verify::check(context.clone(), tree, args.max_concurrency).await?;

info!(
"Tree: {}, Total Leaves: {}, Incorrect Proofs: {}, Not Found Proofs: {}, Correct Proofs: {}",
report.tree_pubkey,
report.total_leaves,
report.incorrect_proofs,
report.not_found_proofs,
report.correct_proofs
);
reports.push(report);
}

Ok(())
Ok(reports)
}
73 changes: 33 additions & 40 deletions bubblegum/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,14 @@ use super::BubblegumContext;
use crate::error::ErrorKind;
use crate::tree::TreeResponse;
use anyhow::{anyhow, Result};
use borsh::BorshDeserialize;
use digital_asset_types::dapi::get_proof_for_asset;
use digital_asset_types::rpc::AssetProof;
use futures::stream::{FuturesUnordered, StreamExt};
use mpl_bubblegum::accounts::TreeConfig;
use sea_orm::SqlxPostgresConnector;
use sha3::{Digest, Keccak256};
use solana_sdk::{pubkey::Pubkey, syscalls::MAX_CPI_INSTRUCTION_ACCOUNTS};
use spl_account_compression::{
canopy::fill_in_proof_from_canopy,
concurrent_tree_wrapper::ProveLeafArgs,
state::{
merkle_tree_get_size, ConcurrentMerkleTreeHeader, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1,
},
};
use solana_sdk::pubkey::Pubkey;
use spl_account_compression::concurrent_tree_wrapper::ProveLeafArgs;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
Expand Down Expand Up @@ -65,7 +58,7 @@ fn hash(left: &[u8], right: &[u8]) -> [u8; 32] {
hash
}

fn verify_merkle_proof(root: [u8; 32], proof: &ProveLeafArgs) -> bool {
fn verify_merkle_proof(proof: &ProveLeafArgs) -> bool {
let mut node = proof.leaf;
for (i, sibling) in proof.proof_vec.iter().enumerate() {
if (proof.index >> i) & 1 == 0 {
Expand All @@ -74,22 +67,31 @@ fn verify_merkle_proof(root: [u8; 32], proof: &ProveLeafArgs) -> bool {
node = hash(sibling, &node);
}
}
node == root
node == proof.current_root
}
fn leaf_proof_result(proof: AssetProof) -> Result<ProofResult, anyhow::Error> {
match ProveLeafArgs::try_from_asset_proof(proof) {
Ok(proof) if verify_merkle_proof(&proof) => Ok(ProofResult::Correct),
Ok(_) => Ok(ProofResult::Incorrect),
Err(_) => Ok(ProofResult::Corrupt),
}
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct ProofReport {
pub tree_pubkey: Pubkey,
pub total_leaves: usize,
pub incorrect_proofs: usize,
pub not_found_proofs: usize,
pub correct_proofs: usize,
pub corrupt_proofs: usize,
}

enum ProofResult {
Correct,
Incorrect,
NotFound,
Corrupt,
}

pub async fn check(
Expand All @@ -111,9 +113,7 @@ pub async fn check(
let report = Arc::new(Mutex::new(ProofReport {
tree_pubkey: tree.pubkey,
total_leaves: tree_config.num_minted as usize,
incorrect_proofs: 0,
not_found_proofs: 0,
correct_proofs: 0,
..ProofReport::default()
}));

let mut tasks = FuturesUnordered::new();
Expand All @@ -124,41 +124,34 @@ pub async fn check(
}

let db = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone());
let tree_pubkey = tree.pubkey.clone();
let tree_pubkey = tree.pubkey;
let report = Arc::clone(&report);

tasks.push(tokio::spawn(async move {
let (asset, _) = Pubkey::find_program_address(
&[b"asset", &tree_pubkey.to_bytes(), &i.to_le_bytes()],
&mpl_bubblegum::ID,
);
let result: Result<ProofResult, anyhow::Error> =
match get_proof_for_asset(&db, asset.to_bytes().to_vec()).await {
Ok(proof) => match ProveLeafArgs::try_from_asset_proof(proof) {
Ok(prove_leaf_args) => {
if verify_merkle_proof(prove_leaf_args.current_root, &prove_leaf_args) {
Ok(ProofResult::Correct)
} else {
Ok(ProofResult::Incorrect)
}
}
Err(_) => Ok(ProofResult::Incorrect),
},
Err(_) => Ok(ProofResult::NotFound),
};

if let Ok(proof_result) = result {
let proof_lookup: Result<ProofResult, anyhow::Error> = get_proof_for_asset(&db, asset.to_bytes().to_vec())
.await
.map_or_else(|_| Ok(ProofResult::NotFound), leaf_proof_result);

if let Ok(proof_result) = proof_lookup {
let mut report = report.lock().await;
match proof_result {
ProofResult::Correct => report.correct_proofs += 1,
ProofResult::Incorrect => {
report.incorrect_proofs += 1;
error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "Incorrect proof found");
}
ProofResult::NotFound => {
report.not_found_proofs += 1;
error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "Proof not found");
}
ProofResult::Incorrect => report.incorrect_proofs += 1,
ProofResult::NotFound => report.not_found_proofs += 1,
ProofResult::Corrupt => report.corrupt_proofs += 1,
}
if let ProofResult::Incorrect | ProofResult::NotFound | ProofResult::Corrupt = proof_result {
let error_message = match proof_result {
ProofResult::Incorrect => "Incorrect proof found",
ProofResult::NotFound => "Proof not found",
ProofResult::Corrupt => "Corrupt proof found",
_ => unreachable!(),
};
error!(tree = %tree_pubkey, leaf_index = i, asset = %asset, "{}", error_message);
}
}
}));
Expand Down
1 change: 1 addition & 0 deletions grpc-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ publish = { workspace = true }
anyhow = { workspace = true }
async-stream = { workspace = true }
atty = { workspace = true }
das-bubblegum = { workspace = true }
sqlx = { workspace = true, features = [
"macros",
"runtime-tokio-rustls",
Expand Down
8 changes: 8 additions & 0 deletions grpc-ingest/config-monitor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
prometheus: 127.0.0.1:8876
rpc: http://127.0.0.1:8899
postgres:
url: postgres://solana:solana@localhost/solana
min_connections: 10
max_connections: 50
bubblegum:
only_trees: null
45 changes: 37 additions & 8 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ where
#[derive(Debug, Clone, Deserialize)]
pub struct ConfigIngester {
pub redis: String,
pub postgres: ConfigIngesterPostgres,
pub postgres: ConfigPostgres,
pub download_metadata: ConfigIngesterDownloadMetadata,
pub snapshots: ConfigIngestStream,
pub accounts: ConfigIngestStream,
Expand All @@ -231,31 +231,31 @@ pub enum ConfigIngesterRedisStreamType {
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigIngesterPostgres {
pub struct ConfigPostgres {
pub url: String,
#[serde(
default = "ConfigIngesterPostgres::default_min_connections",
default = "ConfigPostgres::default_min_connections",
deserialize_with = "deserialize_usize_str"
)]
pub min_connections: usize,
#[serde(
default = "ConfigIngesterPostgres::default_max_connections",
default = "ConfigPostgres::default_max_connections",
deserialize_with = "deserialize_usize_str"
)]
pub max_connections: usize,
#[serde(
default = "ConfigIngesterPostgres::default_idle_timeout",
default = "ConfigPostgres::default_idle_timeout",
deserialize_with = "deserialize_duration_str"
)]
pub idle_timeout: Duration,
#[serde(
default = "ConfigIngesterPostgres::default_max_lifetime",
default = "ConfigPostgres::default_max_lifetime",
deserialize_with = "deserialize_duration_str"
)]
pub max_lifetime: Duration,
}

impl ConfigIngesterPostgres {
impl ConfigPostgres {
pub const fn default_min_connections() -> usize {
10
}
Expand Down Expand Up @@ -337,4 +337,33 @@ impl ConfigIngesterDownloadMetadata {
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigMonitor {}
pub struct ConfigMonitor {
pub postgres: ConfigPostgres,
pub rpc: String,
pub bubblegum: ConfigBubblegumVerify,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigBubblegumVerify {
#[serde(
default = "ConfigBubblegumVerify::default_report_interval",
deserialize_with = "deserialize_duration_str"
)]
pub report_interval: Duration,
#[serde(default)]
pub only_trees: Option<Vec<String>>,
#[serde(
default = "ConfigBubblegumVerify::default_max_concurrency",
deserialize_with = "deserialize_usize_str"
)]
pub max_concurrency: usize,
}

impl ConfigBubblegumVerify {
pub const fn default_report_interval() -> Duration {
Duration::from_millis(5 * 60 * 1000)
}
pub const fn default_max_concurrency() -> usize {
20
}
}
44 changes: 43 additions & 1 deletion grpc-ingest/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,47 @@
use crate::config::ConfigMonitor;
use crate::postgres::create_pool;
use crate::util::create_shutdown;
use crate::{config::ConfigMonitor, prom::update_tree_proof_report};
use das_bubblegum::{verify_bubblegum, BubblegumContext, VerifyArgs};
use das_core::{Rpc, SolanaRpcArgs};
use futures::stream::StreamExt;
use tracing::error;

pub async fn run(config: ConfigMonitor) -> anyhow::Result<()> {
let mut shutdown = create_shutdown()?;
let database_pool = create_pool(config.postgres).await?;
let rpc = Rpc::from_config(&SolanaRpcArgs {
solana_rpc_url: config.rpc,
});

let bubblegum_verify = tokio::spawn(async move {
loop {
let bubblegum_context = BubblegumContext::new(database_pool.clone(), rpc.clone());
let verify_args = VerifyArgs {
only_trees: config.bubblegum.only_trees.clone(),
max_concurrency: config.bubblegum.max_concurrency,
};

match verify_bubblegum(bubblegum_context, verify_args).await {
Ok(reports) => {
for report in reports {
update_tree_proof_report(&report);
}
}
Err(e) => {
error!(
message = "Error verifying bubblegum",
error = ?e
);
}
}

tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
}
});

if let Some(_signal) = shutdown.next().await {}

bubblegum_verify.abort();

Ok(())
}
4 changes: 2 additions & 2 deletions grpc-ingest/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
config::ConfigIngesterPostgres,
config::ConfigPostgres,
prom::{pgpool_connections_set, PgpoolConnectionsKind},
},
sqlx::{
Expand All @@ -9,7 +9,7 @@ use {
},
};

pub async fn create_pool(config: ConfigIngesterPostgres) -> anyhow::Result<PgPool> {
pub async fn create_pool(config: ConfigPostgres) -> anyhow::Result<PgPool> {
let options: PgConnectOptions = config.url.parse()?;
PgPoolOptions::new()
.min_connections(config.min_connections.try_into()?)
Expand Down
Loading

0 comments on commit dcc939c

Please sign in to comment.