From f73ca16af93d3c67f400861e739eddefda548a23 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 29 Nov 2024 21:46:33 -0500 Subject: [PATCH] run batches of analytical queries --- src/analytics/exchange_stats.rs | 121 ++++++++++++++++++ src/analytics/mod.rs | 1 + src/lib.rs | 1 + src/neo4j_init.rs | 3 + src/warehouse_cli.rs | 23 ++++ tests/test_analytics.rs | 45 +++++++ ...orting_data.rs => test_enrich_exchange.rs} | 0 7 files changed, 194 insertions(+) create mode 100644 src/analytics/exchange_stats.rs create mode 100644 src/analytics/mod.rs create mode 100644 tests/test_analytics.rs rename tests/{test_supporting_data.rs => test_enrich_exchange.rs} (100%) diff --git a/src/analytics/exchange_stats.rs b/src/analytics/exchange_stats.rs new file mode 100644 index 0000000..36e44ce --- /dev/null +++ b/src/analytics/exchange_stats.rs @@ -0,0 +1,121 @@ +use std::{sync::Arc, thread::available_parallelism}; + +use anyhow::{Context, Result}; +use log::{info, warn}; +use neo4rs::Graph; +use serde::{Deserialize, Serialize}; +use tokio::sync::Semaphore; + +#[derive(Debug, Serialize, Deserialize)] +pub struct RMSResults { + id: u64, + time: String, + matching_trades: u64, + rms: f64, +} + +static BATCH_SIZE: u64 = 100; + +pub async fn query_rms_analytics(pool: &Graph, threads: Option) -> Result> { + let threads = threads.unwrap_or(available_parallelism().unwrap().get()); + + let n = query_trades_count(pool).await?; + + let mut batches = 1; + if n > BATCH_SIZE { + batches = (n / BATCH_SIZE) + 1 + }; + + + let semaphore = Arc::new(Semaphore::new(threads)); // Semaphore to limit concurrency + let mut tasks = vec![]; + + for batch_sequence in 0..batches { + let pool = pool.clone(); // Clone pool for each task + let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task + + let task = tokio::spawn(async move { + let _permit = semaphore.acquire().await; // Acquire semaphore permit + info!("PROGRESS: {batch_sequence}/{n}"); + query_rms_analytics_chunk(&pool, batches).await // Perform the task + }); + + tasks.push(task); + } + + // // Await all tasks and handle results + let results = futures::future::join_all(tasks).await; + + let mut rms_vec = vec![]; + for el in results { + let mut v = el??; + rms_vec.append(&mut v); + } + Ok(rms_vec) +} + +// get rms analytics on transaction +pub async fn query_rms_analytics_chunk( + pool: &Graph, + batch_sequence: u64, +) -> Result> { + let cypher_string = format!( + r#" +MATCH (from_user:SwapAccount)-[t:Swap]->(to_accepter:SwapAccount) +ORDER BY t.filled_at +SKIP 100 * {batch_sequence} LIMIT 100 +WITH DISTINCT t as txs, from_user, to_accepter, t.filled_at AS current_time + +MATCH (from_user2:SwapAccount)-[other:Swap]->(to_accepter2:SwapAccount) +WHERE datetime(other.filled_at) >= datetime(current_time) - duration({{ hours: 1 }}) + AND datetime(other.filled_at) < datetime(current_time) + AND (from_user2 <> from_user OR from_user2 <> to_accepter OR to_accepter2 <> from_user OR to_accepter2 <> to_accepter) // Exclude same from_user and to_accepter +// WITH txs, other, sqrt(avg(other.price * other.price)) AS rms +RETURN id(txs) AS id, txs.filled_at AS time, COUNT(other) AS matching_trades, sqrt(avg(other.price * other.price)) AS rms + "# + ); + let cypher_query = neo4rs::query(&cypher_string); + + let mut res = pool + .execute(cypher_query) + .await + .context("execute query error")?; + + let mut results = vec![]; + while let Some(row) = res.next().await? { + match row.to::() { + Ok(r) => results.push(r), + Err(e) => { + warn!("unknown row returned {}", e) + } + } + } + + Ok(results) +} + +// get rms analytics on transaction +pub async fn query_trades_count(pool: &Graph) -> Result { + let cypher_string = r#" +MATCH (:SwapAccount)-[t:Swap]->(:SwapAccount) +RETURN COUNT(DISTINCT t) as trades_count + "# + .to_string(); + let cypher_query = neo4rs::query(&cypher_string); + + let mut res = pool + .execute(cypher_query) + .await + .context("execute query error")?; + + while let Some(row) = res.next().await? { + match row.get::("trades_count") { + Ok(r) => return Ok(r as u64), + Err(e) => { + warn!("unknown row returned {}", e); + } + } + } + + anyhow::bail!("no trades_count found"); +} diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs new file mode 100644 index 0000000..f3b75c9 --- /dev/null +++ b/src/analytics/mod.rs @@ -0,0 +1 @@ +pub mod exchange_stats; diff --git a/src/lib.rs b/src/lib.rs index 52621e7..46f3ac6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod analytics; pub mod batch_tx_type; pub mod cypher_templates; pub mod enrich_exchange_onboarding; diff --git a/src/neo4j_init.rs b/src/neo4j_init.rs index e62139e..6d19eb8 100644 --- a/src/neo4j_init.rs +++ b/src/neo4j_init.rs @@ -32,6 +32,9 @@ pub static INDEX_TX_FUNCTION: &str = pub static INDEX_SWAP_ID: &str = "CREATE INDEX swap_account_id IF NOT EXISTS FOR (n:SwapAccount) ON (n.swap_id)"; +pub static INDEX_SWAP_TIME: &str = + "CREATE INDEX swap_time IF NOT EXISTS FOR ()-[r:Swap]-() ON (r.filled_at)"; + /// get the testing neo4j connection pub async fn get_neo4j_localhost_pool(port: u16) -> Result { let uri = format!("127.0.0.1:{port}"); diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index fdcf321..4e93ce2 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -2,9 +2,11 @@ use anyhow::{bail, Result}; use clap::{Parser, Subcommand}; use log::info; use neo4rs::Graph; +use serde_json::json; use std::path::PathBuf; use crate::{ + analytics, enrich_exchange_onboarding::{self, ExchangeOnRamp}, enrich_whitepages::{self, Whitepages}, json_rescue_v5_load, @@ -97,6 +99,18 @@ pub enum Sub { /// starting path for v5 .tgz files archive_dir: PathBuf, }, + #[clap(subcommand)] + Analytics(AnalyticsSub), +} + +#[derive(Subcommand)] + +pub enum AnalyticsSub { + ExchangeRMS { + #[clap(long)] + /// commits the analytics to the db + commit: bool, + }, } impl WarehouseCli { @@ -185,6 +199,15 @@ impl WarehouseCli { ) .await?; } + Sub::Analytics(analytics_sub) => match analytics_sub { + AnalyticsSub::ExchangeRMS { commit } => { + info!("ExchangeRMS: {}", commit); + let pool = try_db_connection_pool(self).await?; + let results = + analytics::exchange_stats::query_rms_analytics(&pool, None).await?; + println!("{:#}", json!(&results).to_string()); + } + }, }; Ok(()) } diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs new file mode 100644 index 0000000..b3a0447 --- /dev/null +++ b/tests/test_analytics.rs @@ -0,0 +1,45 @@ +mod support; +use anyhow::Result; +use std::path::PathBuf; + +use libra_forensic_db::{ + analytics, extract_exchange_orders, load_exchange_orders, + neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, +}; +use support::neo4j_testcontainer::start_neo4j_container; + +// #[tokio::test] +// async fn test_rms() -> anyhow::Result<()> { +// let c = start_neo4j_container(); +// let port = c.get_host_port_ipv4(7687); +// let pool = get_neo4j_localhost_pool(port).await?; +// maybe_create_indexes(&pool).await?; + +// Ok(()) +// } + +#[tokio::test] +async fn test_rms() -> Result<()> { + libra_forensic_db::log_setup(); + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port).await?; + maybe_create_indexes(&graph).await?; + + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + + assert!(orders.len() == 25450); + + // load 1000 orders + load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + + let list = analytics::exchange_stats::query_rms_analytics(&graph, None).await?; + dbg!(&list); + + // assert!(n.len() == 1000); + + Ok(()) +} diff --git a/tests/test_supporting_data.rs b/tests/test_enrich_exchange.rs similarity index 100% rename from tests/test_supporting_data.rs rename to tests/test_enrich_exchange.rs