Skip to content

Commit

Permalink
export analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 30, 2024
1 parent f73ca16 commit af99cbb
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 28 deletions.
34 changes: 20 additions & 14 deletions src/analytics/exchange_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,29 @@ use tokio::sync::Semaphore;

#[derive(Debug, Serialize, Deserialize)]
pub struct RMSResults {
id: u64,
time: String,
matching_trades: u64,
rms: f64,
pub id: String,
pub time: String,
pub matching_trades: u64,
pub rms: f64,
}

static BATCH_SIZE: u64 = 100;
static DEFAULT_BATCH_SIZE: u64 = 100;

pub async fn query_rms_analytics(pool: &Graph, threads: Option<usize>) -> Result<Vec<RMSResults>> {
pub async fn query_rms_analytics_concurrent(
pool: &Graph,
threads: Option<usize>,
batch_size: Option<u64>,
) -> Result<Vec<RMSResults>> {
let threads = threads.unwrap_or(available_parallelism().unwrap().get());

let n = query_trades_count(pool).await?;

let mut batches = 1;
if n > BATCH_SIZE {
batches = (n / BATCH_SIZE) + 1
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
if n > batch_size {
batches = (n / batch_size) + 1
};


let semaphore = Arc::new(Semaphore::new(threads)); // Semaphore to limit concurrency
let mut tasks = vec![];

Expand All @@ -37,7 +41,8 @@ pub async fn query_rms_analytics(pool: &Graph, threads: Option<usize>) -> Result
let task = tokio::spawn(async move {
let _permit = semaphore.acquire().await; // Acquire semaphore permit
info!("PROGRESS: {batch_sequence}/{n}");
query_rms_analytics_chunk(&pool, batches).await // Perform the task
let skip_to = batch_sequence * batch_size;
query_rms_analytics_chunk(&pool, skip_to, batch_size).await // Perform the task
});

tasks.push(task);
Expand All @@ -57,21 +62,22 @@ pub async fn query_rms_analytics(pool: &Graph, threads: Option<usize>) -> Result
// get rms analytics on transaction
pub async fn query_rms_analytics_chunk(
pool: &Graph,
batch_sequence: u64,
skip_to: u64,
limit: u64,
) -> Result<Vec<RMSResults>> {
let cypher_string = format!(
r#"
MATCH (from_user:SwapAccount)-[t:Swap]->(to_accepter:SwapAccount)
ORDER BY t.filled_at
SKIP 100 * {batch_sequence} LIMIT 100
SKIP {skip_to} LIMIT {limit}
WITH DISTINCT t as txs, from_user, to_accepter, t.filled_at AS current_time
MATCH (from_user2:SwapAccount)-[other:Swap]->(to_accepter2:SwapAccount)
WHERE datetime(other.filled_at) >= datetime(current_time) - duration({{ hours: 1 }})
WHERE datetime(other.filled_at) >= datetime(current_time) - duration({{hours: 6}})
AND datetime(other.filled_at) < datetime(current_time)
AND (from_user2 <> from_user OR from_user2 <> to_accepter OR to_accepter2 <> from_user OR to_accepter2 <> to_accepter) // Exclude same from_user and to_accepter
// WITH txs, other, sqrt(avg(other.price * other.price)) AS rms
RETURN id(txs) AS id, txs.filled_at AS time, COUNT(other) AS matching_trades, sqrt(avg(other.price * other.price)) AS rms
RETURN DISTINCT(elementId(txs)) AS id, txs.filled_at AS time, COUNT(other) AS matching_trades, sqrt(avg(other.price * other.price)) AS rms
"#
);
let cypher_query = neo4rs::query(&cypher_string);
Expand Down
6 changes: 4 additions & 2 deletions src/warehouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ impl WarehouseCli {
AnalyticsSub::ExchangeRMS { commit } => {
info!("ExchangeRMS: {}", commit);
let pool = try_db_connection_pool(self).await?;
let results =
analytics::exchange_stats::query_rms_analytics(&pool, None).await?;
let results = analytics::exchange_stats::query_rms_analytics_concurrent(
&pool, None, None,
)
.await?;
println!("{:#}", json!(&results).to_string());
}
},
Expand Down
47 changes: 35 additions & 12 deletions tests/test_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,38 @@ use libra_forensic_db::{
};
use support::neo4j_testcontainer::start_neo4j_container;

// #[tokio::test]
// async fn test_rms() -> anyhow::Result<()> {
// let c = start_neo4j_container();
// let port = c.get_host_port_ipv4(7687);
// let pool = get_neo4j_localhost_pool(port).await?;
// maybe_create_indexes(&pool).await?;
#[tokio::test]
async fn test_rms_single() -> Result<()> {
libra_forensic_db::log_setup();

// Ok(())
// }
let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = get_neo4j_localhost_pool(port).await?;
maybe_create_indexes(&graph).await?;

let path = env!("CARGO_MANIFEST_DIR");
let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json");
let orders = extract_exchange_orders::read_orders_from_file(buf).unwrap();

assert!(orders.len() == 25450);

// load 1000 orders
load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?;

// get just one analytics result, never more than one (but can be empty)
let list = analytics::exchange_stats::query_rms_analytics_chunk(&graph, 900, 1).await?;

assert!(list.len() == 1);
let first = list.first().unwrap();
assert!(&first.time == "2024-05-15T17:41:34+00:00");
assert!(first.matching_trades == 1);
assert!(first.rms == 0.00403);

Ok(())
}

#[tokio::test]
async fn test_rms() -> Result<()> {
async fn test_rms_batch() -> Result<()> {
libra_forensic_db::log_setup();

let c = start_neo4j_container();
Expand All @@ -36,10 +56,13 @@ async fn test_rms() -> Result<()> {
// load 1000 orders
load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?;

let list = analytics::exchange_stats::query_rms_analytics(&graph, None).await?;
dbg!(&list);
let list =
analytics::exchange_stats::query_rms_analytics_concurrent(&graph, None, None).await?;

// NOTE: this list is incomplete, the rms is dropping
// cases where there are no matches.

// assert!(n.len() == 1000);
assert!(list.len() == 800);

Ok(())
}

0 comments on commit af99cbb

Please sign in to comment.