Skip to content

Commit

Permalink
add offline analytics queries
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Dec 8, 2024
1 parent 88cf89f commit c481e53
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 15 deletions.
14 changes: 4 additions & 10 deletions src/analytics/enrich_account_funding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use log::{error, trace};
use neo4rs::{Graph, Query};
// use log::trace;
// use neo4rs::{Graph, Query};
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
Expand All @@ -13,6 +11,9 @@ use std::{

use crate::schema_exchange_orders::ExchangeOrder;

#[cfg(test)]
use crate::date_util::parse_date;

#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct AccountDataAlt {
pub current_balance: f64,
Expand Down Expand Up @@ -248,14 +249,6 @@ pub fn generate_cypher_query(map: String) -> String {
)
}

/// Helper function to parse "YYYY-MM-DD" into `DateTime<Utc>`
pub fn parse_date(date_str: &str) -> DateTime<Utc> {
let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset
DateTime::parse_from_rfc3339(&datetime_str)
.expect("Invalid date format; expected YYYY-MM-DD")
.with_timezone(&Utc)
}

#[test]
fn test_replay_transactions() {
let mut orders = vec![
Expand Down Expand Up @@ -364,6 +357,7 @@ fn test_replay_transactions() {
#[test]
fn test_example_user() -> Result<()> {
use crate::extract_exchange_orders;

use std::path::PathBuf;
let path = env!("CARGO_MANIFEST_DIR");
let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json");
Expand Down
1 change: 1 addition & 0 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod enrich_account_funding;
pub mod enrich_rms;
pub mod exchange_stats;
pub mod offline_matching;
101 changes: 101 additions & 0 deletions src/analytics/offline_matching.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use diem_types::account_address::AccountAddress;
use neo4rs::Graph;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Deposit {
account: AccountAddress,
deposited: f64,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MinFunding {
user_id: u32,
funded: f64,
}

pub async fn get_date_range_deposits(
pool: &Graph,
top_n: u64,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<Vec<Deposit>> {
let mut top_deposits = vec![];

let q = format!(
r#"
WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit
MATCH
(u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}})
WHERE
tx.`block_datetime` > datetime("{}")
AND tx.`block_datetime` < datetime("{}")
WITH
u,
SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount
ORDER BY totalTxAmount DESCENDING
RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited
LIMIT {}
"#,
start.to_rfc3339(),
end.to_rfc3339(),
top_n,
);
let cypher_query = neo4rs::query(&q);

// Execute the query
let mut result = pool.execute(cypher_query).await?;

// Fetch the first row only
while let Some(r) = result.next().await? {
let account_str = r.get::<String>("account").unwrap_or("unknown".to_string());
let deposited = r.get::<f64>("deposited").unwrap_or(0.0);
let d = Deposit {
account: account_str.parse().unwrap_or(AccountAddress::ZERO),
deposited,
};
top_deposits.push(d);
// dbg!(&d);
}
Ok(top_deposits)
}

pub async fn get_min_funding(
pool: &Graph,
top_n: u64,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<Vec<MinFunding>> {
let mut min_funding = vec![];

let q = format!(
r#"
MATCH p=(e:SwapAccount)-[d:DailyLedger]-(ul:UserLedger)
WHERE d.date > datetime("{}")
AND d.date < datetime("{}")
WITH e.swap_id AS user_id, toFloat(max(ul.`total_funded`)) as funded
RETURN user_id, funded
ORDER BY funded DESC
LIMIT {}
"#,
start.to_rfc3339(),
end.to_rfc3339(),
top_n,
);
let cypher_query = neo4rs::query(&q);

// Execute the query
let mut result = pool.execute(cypher_query).await?;

// Fetch the first row only
while let Some(r) = result.next().await? {
let user_id = r.get::<u32>("user_id").unwrap_or(0);
let funded = r.get::<f64>("funded").unwrap_or(0.0);
let d = MinFunding { user_id, funded };
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
8 changes: 8 additions & 0 deletions src/date_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use chrono::{DateTime, Utc};
/// Helper function to parse "YYYY-MM-DD" into `DateTime<Utc>`
pub fn parse_date(date_str: &str) -> DateTime<Utc> {
let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset
DateTime::parse_from_rfc3339(&datetime_str)
.expect("Invalid date format; expected YYYY-MM-DD")
.with_timezone(&Utc)
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod analytics;
pub mod batch_tx_type;
pub mod cypher_templates;
pub mod date_util;
pub mod decode_entry_function;
pub mod enrich_exchange_onboarding;
pub mod enrich_whitepages;
Expand Down
26 changes: 21 additions & 5 deletions tests/test_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ use anyhow::Result;
use std::path::PathBuf;

use libra_forensic_db::{
analytics::{
self,
enrich_account_funding::{parse_date, BalanceTracker},
},
analytics::{self, enrich_account_funding::BalanceTracker, offline_matching},
date_util::parse_date,
extract_exchange_orders, load_exchange_orders,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
neo4j_init::{self, get_neo4j_localhost_pool, maybe_create_indexes},
};
use support::neo4j_testcontainer::start_neo4j_container;

Expand Down Expand Up @@ -230,5 +228,23 @@ async fn test_submit_exchange_ledger_all() -> Result<()> {

assert!(i == user.0.len());

Ok(())
}

#[tokio::test]
async fn test_offline_analytics() -> Result<()> {
libra_forensic_db::log_setup();
let (uri, user, pass) = neo4j_init::get_credentials_from_env()?;
let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?;

let start_time = parse_date("2024-01-01");
let end_time = parse_date("2024-01-10");

let r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?;
// dbg!(&r);

let r = offline_matching::get_min_funding(&pool, 20, start_time, end_time).await?;


Ok(())
}

0 comments on commit c481e53

Please sign in to comment.