Skip to content

Commit

Permalink
run batches of analytical queries
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 30, 2024
1 parent 37ab2da commit f73ca16
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 0 deletions.
121 changes: 121 additions & 0 deletions src/analytics/exchange_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::{sync::Arc, thread::available_parallelism};

use anyhow::{Context, Result};
use log::{info, warn};
use neo4rs::Graph;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;

#[derive(Debug, Serialize, Deserialize)]
pub struct RMSResults {
id: u64,
time: String,
matching_trades: u64,
rms: f64,
}

static BATCH_SIZE: u64 = 100;

pub async fn query_rms_analytics(pool: &Graph, threads: Option<usize>) -> Result<Vec<RMSResults>> {
let threads = threads.unwrap_or(available_parallelism().unwrap().get());

let n = query_trades_count(pool).await?;

let mut batches = 1;
if n > BATCH_SIZE {
batches = (n / BATCH_SIZE) + 1
};


let semaphore = Arc::new(Semaphore::new(threads)); // Semaphore to limit concurrency
let mut tasks = vec![];

for batch_sequence in 0..batches {
let pool = pool.clone(); // Clone pool for each task
let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task

let task = tokio::spawn(async move {
let _permit = semaphore.acquire().await; // Acquire semaphore permit
info!("PROGRESS: {batch_sequence}/{n}");
query_rms_analytics_chunk(&pool, batches).await // Perform the task
});

tasks.push(task);
}

// // Await all tasks and handle results
let results = futures::future::join_all(tasks).await;

let mut rms_vec = vec![];
for el in results {
let mut v = el??;
rms_vec.append(&mut v);
}
Ok(rms_vec)
}

// get rms analytics on transaction
pub async fn query_rms_analytics_chunk(
pool: &Graph,
batch_sequence: u64,
) -> Result<Vec<RMSResults>> {
let cypher_string = format!(
r#"
MATCH (from_user:SwapAccount)-[t:Swap]->(to_accepter:SwapAccount)
ORDER BY t.filled_at
SKIP 100 * {batch_sequence} LIMIT 100
WITH DISTINCT t as txs, from_user, to_accepter, t.filled_at AS current_time
MATCH (from_user2:SwapAccount)-[other:Swap]->(to_accepter2:SwapAccount)
WHERE datetime(other.filled_at) >= datetime(current_time) - duration({{ hours: 1 }})
AND datetime(other.filled_at) < datetime(current_time)
AND (from_user2 <> from_user OR from_user2 <> to_accepter OR to_accepter2 <> from_user OR to_accepter2 <> to_accepter) // Exclude same from_user and to_accepter
// WITH txs, other, sqrt(avg(other.price * other.price)) AS rms
RETURN id(txs) AS id, txs.filled_at AS time, COUNT(other) AS matching_trades, sqrt(avg(other.price * other.price)) AS rms
"#
);
let cypher_query = neo4rs::query(&cypher_string);

let mut res = pool
.execute(cypher_query)
.await
.context("execute query error")?;

let mut results = vec![];
while let Some(row) = res.next().await? {
match row.to::<RMSResults>() {
Ok(r) => results.push(r),
Err(e) => {
warn!("unknown row returned {}", e)
}
}
}

Ok(results)
}

// get rms analytics on transaction
pub async fn query_trades_count(pool: &Graph) -> Result<u64> {
let cypher_string = r#"
MATCH (:SwapAccount)-[t:Swap]->(:SwapAccount)
RETURN COUNT(DISTINCT t) as trades_count
"#
.to_string();
let cypher_query = neo4rs::query(&cypher_string);

let mut res = pool
.execute(cypher_query)
.await
.context("execute query error")?;

while let Some(row) = res.next().await? {
match row.get::<i64>("trades_count") {
Ok(r) => return Ok(r as u64),
Err(e) => {
warn!("unknown row returned {}", e);
}
}
}

anyhow::bail!("no trades_count found");
}
1 change: 1 addition & 0 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod exchange_stats;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod analytics;
pub mod batch_tx_type;
pub mod cypher_templates;
pub mod enrich_exchange_onboarding;
Expand Down
3 changes: 3 additions & 0 deletions src/neo4j_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub static INDEX_TX_FUNCTION: &str =
pub static INDEX_SWAP_ID: &str =
"CREATE INDEX swap_account_id IF NOT EXISTS FOR (n:SwapAccount) ON (n.swap_id)";

pub static INDEX_SWAP_TIME: &str =
"CREATE INDEX swap_time IF NOT EXISTS FOR ()-[r:Swap]-() ON (r.filled_at)";

/// get the testing neo4j connection
pub async fn get_neo4j_localhost_pool(port: u16) -> Result<Graph> {
let uri = format!("127.0.0.1:{port}");
Expand Down
23 changes: 23 additions & 0 deletions src/warehouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use anyhow::{bail, Result};
use clap::{Parser, Subcommand};
use log::info;
use neo4rs::Graph;
use serde_json::json;
use std::path::PathBuf;

use crate::{
analytics,
enrich_exchange_onboarding::{self, ExchangeOnRamp},
enrich_whitepages::{self, Whitepages},
json_rescue_v5_load,
Expand Down Expand Up @@ -97,6 +99,18 @@ pub enum Sub {
/// starting path for v5 .tgz files
archive_dir: PathBuf,
},
#[clap(subcommand)]
Analytics(AnalyticsSub),
}

#[derive(Subcommand)]

pub enum AnalyticsSub {
ExchangeRMS {
#[clap(long)]
/// commits the analytics to the db
commit: bool,
},
}

impl WarehouseCli {
Expand Down Expand Up @@ -185,6 +199,15 @@ impl WarehouseCli {
)
.await?;
}
Sub::Analytics(analytics_sub) => match analytics_sub {
AnalyticsSub::ExchangeRMS { commit } => {
info!("ExchangeRMS: {}", commit);
let pool = try_db_connection_pool(self).await?;
let results =
analytics::exchange_stats::query_rms_analytics(&pool, None).await?;
println!("{:#}", json!(&results).to_string());
}
},
};
Ok(())
}
Expand Down
45 changes: 45 additions & 0 deletions tests/test_analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
mod support;
use anyhow::Result;
use std::path::PathBuf;

use libra_forensic_db::{
analytics, extract_exchange_orders, load_exchange_orders,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
};
use support::neo4j_testcontainer::start_neo4j_container;

// #[tokio::test]
// async fn test_rms() -> anyhow::Result<()> {
// let c = start_neo4j_container();
// let port = c.get_host_port_ipv4(7687);
// let pool = get_neo4j_localhost_pool(port).await?;
// maybe_create_indexes(&pool).await?;

// Ok(())
// }

#[tokio::test]
async fn test_rms() -> Result<()> {
libra_forensic_db::log_setup();

let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = get_neo4j_localhost_pool(port).await?;
maybe_create_indexes(&graph).await?;

let path = env!("CARGO_MANIFEST_DIR");
let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json");
let orders = extract_exchange_orders::read_orders_from_file(buf).unwrap();

assert!(orders.len() == 25450);

// load 1000 orders
load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?;

let list = analytics::exchange_stats::query_rms_analytics(&graph, None).await?;
dbg!(&list);

// assert!(n.len() == 1000);

Ok(())
}
File renamed without changes.

0 comments on commit f73ca16

Please sign in to comment.