From 4725efa8d560fe9fca31a21c45404851c2bb6539 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 5 Dec 2024 10:40:13 -0500 Subject: [PATCH 01/21] add community wallet flag to snapshot state --- src/extract_snapshot.rs | 10 ++++++++- src/schema_account_state.rs | 8 +++++-- tests/test_load_state.rs | 43 +++++++++++++++++++++++++++++++++++-- 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/src/extract_snapshot.rs b/src/extract_snapshot.rs index 4f7a812..e492c24 100644 --- a/src/extract_snapshot.rs +++ b/src/extract_snapshot.rs @@ -10,7 +10,10 @@ use libra_backwards_compatibility::version_five::{ use libra_storage::read_snapshot::{accounts_from_snapshot_backup, load_snapshot_manifest}; use libra_types::{ exports::AccountAddress, - move_resource::{libra_coin::LibraCoinStoreResource, wallet::SlowWalletResource}, + move_resource::{ + cumulative_deposits::CumulativeDepositResource, libra_coin::LibraCoinStoreResource, + wallet::SlowWalletResource, + }, }; use log::{error, info, warn}; @@ -112,6 +115,11 @@ pub async fn extract_current_snapshot(archive_path: &Path) -> Result()? { + s.donor_voice_acc = true; + } + warehouse_state.push(s); } } diff --git a/src/schema_account_state.rs b/src/schema_account_state.rs index ad9f858..a44fc3d 100644 --- a/src/schema_account_state.rs +++ b/src/schema_account_state.rs @@ -19,6 +19,7 @@ pub struct WarehouseAccState { pub balance: u64, pub slow_wallet_locked: u64, pub slow_wallet_transferred: u64, + pub donor_voice_acc: bool, } impl Default for WarehouseAccState { @@ -30,6 +31,7 @@ impl Default for WarehouseAccState { balance: Default::default(), slow_wallet_locked: Default::default(), slow_wallet_transferred: Default::default(), + donor_voice_acc: false, } } } @@ -43,6 +45,7 @@ impl WarehouseAccState { balance: 0, slow_wallet_locked: 0, slow_wallet_transferred: 0, + donor_voice_acc: false, } } pub fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) { @@ -57,7 +60,7 @@ impl WarehouseAccState { /// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000 pub fn to_cypher_object_template(&self) -> String { format!( - r#"{{address: "{}", balance: {}, version: {}, sequence_num: {}, slow_locked: {}, slow_transfer: {}, framework_version: "{}" }}"#, + r#"{{address: "{}", balance: {}, version: {}, sequence_num: {}, slow_locked: {}, slow_transfer: {}, framework_version: "{}", donor_voice: {} }}"#, self.address.to_hex_literal(), self.balance, self.time.version, @@ -65,6 +68,7 @@ impl WarehouseAccState { self.slow_wallet_locked, self.slow_wallet_transferred, self.time.framework_version, + self.donor_voice_acc, ) } @@ -87,7 +91,7 @@ impl WarehouseAccState { UNWIND tx_data AS tx MERGE (addr:Account {{address: tx.address}}) - MERGE (snap:Snapshot {{address: tx.address, balance: tx.balance, framework_version: tx.framework_version, version: tx.version, sequence_num: tx.sequence_num, slow_locked: tx.slow_locked, slow_transfer: tx.slow_transfer }}) + MERGE (snap:Snapshot {{address: tx.address, balance: tx.balance, framework_version: tx.framework_version, version: tx.version, sequence_num: tx.sequence_num, slow_locked: tx.slow_locked, slow_transfer: tx.slow_transfer, donor_voice: tx.donor_voice }}) MERGE (addr)-[rel:State {{version: tx.version}} ]->(snap) RETURN diff --git a/tests/test_load_state.rs b/tests/test_load_state.rs index cb93665..cabea63 100644 --- a/tests/test_load_state.rs +++ b/tests/test_load_state.rs @@ -1,13 +1,14 @@ mod support; use libra_forensic_db::{ - extract_snapshot::extract_v5_snapshot, + extract_snapshot::{extract_current_snapshot, extract_v5_snapshot}, load_account_state::{impl_batch_snapshot_insert, snapshot_batch}, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, schema_account_state::WarehouseAccState, }; use support::{ - fixtures::v5_state_manifest_fixtures_path, neo4j_testcontainer::start_neo4j_container, + fixtures::{v5_state_manifest_fixtures_path, v7_state_manifest_fixtures_path}, + neo4j_testcontainer::start_neo4j_container, }; #[tokio::test] @@ -36,6 +37,44 @@ async fn test_snapshot_unit() -> anyhow::Result<()> { #[tokio::test] async fn test_snapshot_batch() -> anyhow::Result<()> { + libra_forensic_db::log_setup(); + let manifest_file = v7_state_manifest_fixtures_path().join("state.manifest"); + assert!(manifest_file.exists()); + let vec_snap = extract_current_snapshot(&manifest_file).await?; + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port) + .await + .expect("could not get neo4j connection pool"); + maybe_create_indexes(&graph) + .await + .expect("could start index"); + + let merged_snapshots = impl_batch_snapshot_insert(&graph, &vec_snap[..100]).await?; + dbg!(&merged_snapshots.created_tx); + // assert!(merged_snapshots.created_tx == 100); + + // check DB to see what is persisted + let cypher_query = neo4rs::query( + "MATCH ()-[r:State]->() + RETURN count(r) AS count_state_edges", + ); + + // Execute the query + let mut result = graph.execute(cypher_query).await?; + + // Fetch the first row only + let row = result.next().await?.unwrap(); + let count: i64 = row.get("count_state_edges").unwrap(); + dbg!(&count); + // assert!(count == 100i64); + + Ok(()) +} + +#[tokio::test] +async fn test_v5_snapshot_batch() -> anyhow::Result<()> { libra_forensic_db::log_setup(); let manifest_file = v5_state_manifest_fixtures_path().join("state.manifest"); assert!(manifest_file.exists()); From 65f530e10eaad880d2fbf48b9494328f3e32765e Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 5 Dec 2024 10:46:44 -0500 Subject: [PATCH 02/21] add asserts --- tests/test_load_state.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/test_load_state.rs b/tests/test_load_state.rs index cabea63..3d8b1a9 100644 --- a/tests/test_load_state.rs +++ b/tests/test_load_state.rs @@ -38,9 +38,9 @@ async fn test_snapshot_unit() -> anyhow::Result<()> { #[tokio::test] async fn test_snapshot_batch() -> anyhow::Result<()> { libra_forensic_db::log_setup(); - let manifest_file = v7_state_manifest_fixtures_path().join("state.manifest"); - assert!(manifest_file.exists()); - let vec_snap = extract_current_snapshot(&manifest_file).await?; + let archive_path = v7_state_manifest_fixtures_path(); + assert!(archive_path.exists()); + let vec_snap = extract_current_snapshot(&archive_path ).await?; let c = start_neo4j_container(); let port = c.get_host_port_ipv4(7687); @@ -52,8 +52,8 @@ async fn test_snapshot_batch() -> anyhow::Result<()> { .expect("could start index"); let merged_snapshots = impl_batch_snapshot_insert(&graph, &vec_snap[..100]).await?; - dbg!(&merged_snapshots.created_tx); - // assert!(merged_snapshots.created_tx == 100); + + assert!(merged_snapshots.created_tx == 100); // check DB to see what is persisted let cypher_query = neo4rs::query( @@ -67,8 +67,8 @@ async fn test_snapshot_batch() -> anyhow::Result<()> { // Fetch the first row only let row = result.next().await?.unwrap(); let count: i64 = row.get("count_state_edges").unwrap(); - dbg!(&count); - // assert!(count == 100i64); + + assert!(count == 100i64); Ok(()) } From 21447467ac907856222acc6b5a09e1a7516b6b59 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:36:12 -0500 Subject: [PATCH 03/21] patch load single archive --- src/load.rs | 9 ++++++--- src/scan.rs | 12 ++++++++++-- src/warehouse_cli.rs | 34 +++++++++++++++------------------- tests/test_load_state.rs | 2 +- 4 files changed, 32 insertions(+), 25 deletions(-) diff --git a/src/load.rs b/src/load.rs index c50dff8..c40af79 100644 --- a/src/load.rs +++ b/src/load.rs @@ -8,8 +8,8 @@ use crate::{ scan::{ArchiveMap, ManifestInfo}, }; -use anyhow::{Context, Result}; -use log::{info, warn}; +use anyhow::{bail, Context, Result}; +use log::{error, info, warn}; use neo4rs::Graph; /// takes all the archives from a map, and tries to load them sequentially @@ -74,7 +74,10 @@ pub async fn try_load_one_archive( crate::scan::BundleContent::Unknown => todo!(), crate::scan::BundleContent::StateSnapshot => { let snaps = match man.version { - crate::scan::FrameworkVersion::Unknown => todo!(), + crate::scan::FrameworkVersion::Unknown => { + error!("no framework version detected"); + bail!("could not load archive from manifest"); + } crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?, crate::scan::FrameworkVersion::V6 => { extract_current_snapshot(&man.archive_dir).await? diff --git a/src/scan.rs b/src/scan.rs index c9cb407..bd1844d 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -36,10 +36,13 @@ impl ManifestInfo { match self.contents { BundleContent::Unknown => return FrameworkVersion::Unknown, BundleContent::StateSnapshot => { + let man_path = self.archive_dir.join(self.contents.filename()); + dbg!(&man_path); + // first check if the v7 manifest will parse - if load_snapshot_manifest(&self.archive_dir).is_ok() { + if let Ok(_bak) = load_snapshot_manifest(&man_path) { self.version = FrameworkVersion::V7; - } + }; if v5_read_from_snapshot_manifest(&self.archive_dir).is_ok() { self.version = FrameworkVersion::V5; @@ -98,15 +101,19 @@ pub fn scan_dir_archive( ) -> Result { let path = parent_dir.canonicalize()?; let filename = content_opt.unwrap_or(BundleContent::Unknown).filename(); + dbg!(&filename); let pattern = format!( "{}/**/{}", path.to_str().context("cannot parse starting dir")?, filename, ); + dbg!(&pattern); + let mut archive = BTreeMap::new(); for entry in glob(&pattern)? { + dbg!(&entry); match entry { Ok(manifest_path) => { let dir = manifest_path @@ -114,6 +121,7 @@ pub fn scan_dir_archive( .context("no parent dir found")? .to_owned(); let contents = test_content(&manifest_path); + dbg!(&contents); let archive_id = dir.file_name().unwrap().to_str().unwrap().to_owned(); let mut m = ManifestInfo { archive_dir: dir.clone(), diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index fe1e220..c0114a7 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -1,6 +1,6 @@ use anyhow::{bail, Result}; use clap::{Parser, Subcommand}; -use log::{info, warn}; +use log::{error, info, warn}; use neo4rs::Graph; use serde_json::json; use std::path::PathBuf; @@ -129,29 +129,25 @@ impl WarehouseCli { Sub::LoadOne { archive_dir, batch_size, - } => match scan_dir_archive(archive_dir, None)?.0.get(archive_dir) { - Some(man) => { + } => { + let am = scan_dir_archive(archive_dir, None)?; + if am.0.is_empty() { + error!("cannot find .manifest file under {}", archive_dir.display()); + } + for (_p, man) in am.0 { let pool = try_db_connection_pool(self).await?; neo4j_init::maybe_create_indexes(&pool).await?; - try_load_one_archive(man, &pool, batch_size.unwrap_or(250)).await?; - } - None => { - bail!(format!( - "ERROR: cannot find .manifest file under {}", - archive_dir.display() - )); + try_load_one_archive(&man, &pool, batch_size.unwrap_or(250)).await?; } - }, + } Sub::Check { archive_dir } => { - match scan_dir_archive(archive_dir, None)?.0.get(archive_dir) { - Some(_) => todo!(), - None => { - bail!(format!( - "ERROR: cannot find .manifest file under {}", - archive_dir.display() - )); - } + let am = scan_dir_archive(archive_dir, None)?; + if am.0.is_empty() { + error!("cannot find .manifest file under {}", archive_dir.display()); + } + for (p, man) in am.0 { + info!("manifest found at {} \n {:?}", p.display(), man); } } Sub::EnrichExchange { diff --git a/tests/test_load_state.rs b/tests/test_load_state.rs index 3d8b1a9..2d32694 100644 --- a/tests/test_load_state.rs +++ b/tests/test_load_state.rs @@ -40,7 +40,7 @@ async fn test_snapshot_batch() -> anyhow::Result<()> { libra_forensic_db::log_setup(); let archive_path = v7_state_manifest_fixtures_path(); assert!(archive_path.exists()); - let vec_snap = extract_current_snapshot(&archive_path ).await?; + let vec_snap = extract_current_snapshot(&archive_path).await?; let c = start_neo4j_container(); let port = c.get_host_port_ipv4(7687); From 395acf9a33ab9bde555eb47b0280f02555a04529 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:54:47 -0500 Subject: [PATCH 04/21] rename cli subcommand --- src/warehouse_cli.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index c0114a7..584e151 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -59,7 +59,7 @@ pub enum Sub { batch_size: Option, }, /// process and load a single archive - LoadOne { + IngestOne { #[clap(long, short('d'))] /// location of archive archive_dir: PathBuf, @@ -126,7 +126,7 @@ impl WarehouseCli { neo4j_init::maybe_create_indexes(&pool).await?; ingest_all(&map, &pool, self.clear_queue, batch_size.unwrap_or(250)).await?; } - Sub::LoadOne { + Sub::IngestOne { archive_dir, batch_size, } => { From 02f52df718dfacc39387ab2ae38ad044500519f2 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 5 Dec 2024 20:38:05 -0500 Subject: [PATCH 05/21] wip --- src/analytics/enrich_account_funding.rs | 734 ++++++++++++++---------- src/load_exchange_orders.rs | 28 +- 2 files changed, 442 insertions(+), 320 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index fbbf4c6..12d27cf 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use log::trace; use neo4rs::{Graph, Query}; @@ -12,97 +12,70 @@ use std::{ use crate::schema_exchange_orders::ExchangeOrder; #[derive(Default, Debug, Deserialize, Serialize)] -pub struct AccountData { - pub daily_balances: HashMap, f64>, // Map of daily balances - pub daily_funding: HashMap, f64>, // Map of daily funding amounts - pub daily_inflows: HashMap, f64>, // Map of daily inflow amounts - pub daily_outflows: HashMap, f64>, // Map of daily outflow amounts - pub daily_user_flows: HashMap, f64>, // Amount when the account was a `user` - pub daily_accepter_flows: HashMap, f64>, // Amount when the account was an `accepter` -} - -impl AccountData { - pub fn to_cypher_map(&self, id: u32) -> String { - let mut list_literal: String = "".to_owned(); - self.daily_balances.iter().for_each(|(date, _) | { - let obj = format!( - r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, - id, - date.to_rfc3339(), - self.daily_balances.get(date).unwrap_or(&0.0), - self.daily_funding.get(date).unwrap_or(&0.0), - self.daily_inflows.get(date).unwrap_or(&0.0), - self.daily_outflows.get(date).unwrap_or(&0.0), - self.daily_user_flows.get(date).unwrap_or(&0.0), - self.daily_accepter_flows.get(date).unwrap_or(&0.0) - ); - - list_literal.push_str(&obj); - list_literal.push(','); - - }); - - list_literal.pop(); // need to drop last comma "," - format!("[{}]", list_literal) - } +pub struct AccountDataAlt { + pub current_balance: f64, + pub total_funded: f64, + pub total_outflows: f64, + pub total_inflows: f64, + pub daily_funding: f64, + pub daily_inflows: f64, + pub daily_outflows: f64, } #[derive(Default, Debug, Deserialize, Serialize)] -pub struct BalanceTracker { - pub accounts: HashMap, // Tracks data for each user -} +pub struct UserLedger(HashMap, AccountDataAlt>); -impl BalanceTracker { - pub fn new() -> Self { - Self { - accounts: HashMap::new(), - } - } - - pub fn process_transaction(&mut self, order: &ExchangeOrder) { +impl UserLedger { + pub fn process_transaction_alt(&mut self, order: &ExchangeOrder) { let date = order.created_at; - let (buyer_id, seller_id, amount) = match order.order_type.as_str() { - "Buy" => (order.user, order.accepter, order.amount * order.price), - "Sell" => (order.accepter, order.user, order.amount * order.price), + match order.order_type.as_str() { + "Buy" => { + // user offered to buy coins (Buyer) + // he sends USD + // accepter sends coins. (Seller) + + self.update_balance_and_flows_alt(order.user, date, order.amount, true); + self.update_balance_and_flows_alt(order.accepter, date, order.amount, false); + } + "Sell" => { + // user offered to sell coins (Seller) + // he sends Coins + // accepter sends USD. (Buyer) + self.update_balance_and_flows_alt(order.accepter, date, order.amount, true); + self.update_balance_and_flows_alt(order.user, date, order.amount, false); + } _ => { println!("ERROR: not a valid Buy/Sell order, {:?}", &order); - return; } - }; - - self.update_balance_and_flows(seller_id, date, -amount, false); - self.update_balance_and_flows(buyer_id, date, amount, true); + } } - - fn update_balance_and_flows( + fn update_balance_and_flows_alt( &mut self, user_id: u32, date: DateTime, amount: f64, - is_user: bool, + credit: bool, ) { - let account = self.accounts.entry(user_id).or_default(); - let daily_balance = account.daily_balances.entry(date).or_insert(0.0); - - if amount > 0.0 { - *account.daily_inflows.entry(date).or_insert(0.0) += amount; - } else { - *account.daily_outflows.entry(date).or_insert(0.0) += -amount; - } - - if is_user { - *account.daily_user_flows.entry(date).or_insert(0.0) += amount; + let account = self.0.entry(&user_id).or_default(); + // let daily_balance = account.current_balance.entry(date).or_insert(0.0); + // let day = account.day.entry(date).or_default(); + if credit { + account.current_balance += amount; + account.total_inflows += amount; + account.daily_inflows += amount; + // *daily_balance += amount } else { - *account.daily_accepter_flows.entry(date).or_insert(0.0) += amount; + // debit + account.current_balance += -amount; + account.total_outflows += -amount; + account.daily_outflows += -amount; } - let new_balance = *daily_balance + amount; - if new_balance < 0.0 { - let funding_needed = -new_balance; - *account.daily_funding.entry(date).or_insert(0.0) += funding_needed; - *daily_balance = 0.0; - } else { - *daily_balance = new_balance; + if account.current_balance < 0.0 { + let negative_balance = account.current_balance; + // funding was needed + account.total_funded += negative_balance; + account.daily_funding += negative_balance; } } @@ -125,77 +98,221 @@ impl BalanceTracker { } None } - /// Generate a Cypher query string to insert data into Neo4j - pub fn generate_cypher_query(&self, map: String) -> String { - // r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, - format!( - r#" - UNWIND {map} AS account - MERGE (sa:SwapAccount {{swap_id: account.swap_id}}) - MERGE (ul:UserLedger {{date: datetime(account.date)}}) - SET ul.balance = account.balance, - ul.funding = account.funding, - ul.inflows = account.inflows, - ul.outflows = account.outflows, - ul.user_flows = account.user_flows, - ul.accepter_flows = account.accepter_flows - MERGE (sa)-[r:Daily]->(ul) - SET r.date = datetime(account.date) - RETURN COUNT(r) as merged_relations - "#, - ) - } } -/// Manages cache logic and invokes replay_transactions only if necessary -pub fn get_or_recalculate_balances( - orders: &mut [ExchangeOrder], - cache_file: Option, - force_recalculate: bool, -) -> BalanceTracker { - if !force_recalculate && cache_file.is_some() { - if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file.as_ref().unwrap()) - { - return cached_tracker; +#[derive(Default, Debug, Deserialize, Serialize)] +pub struct BalanceTracker { + pub accounts: HashMap, // Tracks data for each user +} + +impl BalanceTracker { + /// Replay all transactions sequentially and return a balance tracker + pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> Self { + let mut tracker: HashMap = HashMap::new(); + + orders.sort_by_key(|order| order.filled_at); + for order in sorted_orders { + tracker.process_transaction_alt(order); } + tracker } + pub fn to_cypher_map(&self, id: u32) -> Result { + let ul = self.accounts.get(&id).context("no user")?; + let mut list_literal: String = "".to_owned(); - let tracker = replay_transactions(orders); - if let Some(p) = cache_file { - tracker.save_to_cache(&p); - } - tracker -} + for (date, _) in &ul.0 { + if let Some(acc) = ul.0.get(date) { + let obj = format!( + r#"{{ swap_id: {}, date: "{}", current_balance: {}, total_funded: {}, total_inflows: {}, total_outflows: {}, daily_funding: {}, daily_inflows: {}, daily_outflows: {} }}"#, + id, + date.to_rfc3339(), + acc.current_balance, + acc.total_funded + acc.total_inflows, + acc.total_outflows, + acc.daily_funding, + acc.daily_inflows, + acc.daily_outflows, + ); + + list_literal.push_str(&obj); + list_literal.push(','); + } else { + continue; + } + } -/// Replay all transactions sequentially and return a balance tracker -pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> BalanceTracker { - let mut tracker = BalanceTracker::new(); - let sorted_orders = orders; - sorted_orders.sort_by_key(|order| order.created_at); - for order in sorted_orders { - tracker.process_transaction(order); + list_literal.pop(); // need to drop last comma "," + Ok(format!("[{}]", list_literal)) } - tracker } -/// submit to db -pub async fn submit_ledger(balances: &BalanceTracker, pool: &Graph) -> Result { - let mut merged_relations = 0u64; - for (id, acc) in balances.accounts.iter() { - let data = acc.to_cypher_map(*id); - let query_literal = balances.generate_cypher_query(data); - let query = Query::new(query_literal); - let mut result = pool.execute(query).await?; - - while let Some(r) = result.next().await? { - if let Ok(i) = r.get::("merged_relations") { - trace!("merged ledger in tx: {i}"); - merged_relations += i; - }; - } - } - Ok(merged_relations) +/// Generate a Cypher query string to insert data into Neo4j +pub fn generate_cypher_query(map: String) -> String { + // r#"{{ swap_id: {}, date: "{}", current_balance: {}, total_funded: {}, total_inflows: {}, total_outflows: {}, daily_funding: {}, daily_inflows: {}, daily_outflows: {} }}"#, + format!( + r#" + UNWIND {map} AS account + MERGE (sa:SwapAccount {{swap_id: account.swap_id}}) + MERGE (ul:UserLedger {{date: datetime(account.date)}}) + SET ul.current_balance = account.current_balance, + ul.total_funded = account.total_funded, + ul.total_inflows = account.total_inflows, + ul.total_outflows = account.total_outflows, + ul.daily_funding = account.daily_funding, + ul.daily_inflows = account.daily_inflows, + ul.daily_outflows = account.daily_outflows, + MERGE (sa)-[r:DailyLedger]->(ul) + SET r.date = datetime(account.date) + RETURN COUNT(r) as merged_relations + "#, + ) } +// pub fn new() -> Self { +// Self { +// accounts: HashMap::new(), +// } +// } + +// pub fn process_transaction(&mut self, order: &ExchangeOrder) { +// let date = order.created_at; +// let (buyer_id, seller_id, amount) = match order.order_type.as_str() { +// "Buy" => (order.user, order.accepter, order.amount * order.price), +// "Sell" => (order.accepter, order.user, order.amount * order.price), +// _ => { +// println!("ERROR: not a valid Buy/Sell order, {:?}", &order); +// return; +// } +// }; + +// self.update_balance_and_flows(seller_id, date, -amount, false); +// self.update_balance_and_flows(buyer_id, date, amount, true); +// } + +// fn update_balance_and_flows( +// &mut self, +// user_id: u32, +// date: DateTime, +// amount: f64, +// credit: bool, +// ) { +// let account = self.accounts.entry(user_id).or_default(); +// let daily_balance = account.daily_balances.entry(date).or_insert(0.0); + +// if credit { +// *account.daily_inflows.entry(date).or_insert(0.0) += amount; +// *daily_balance += amount +// } else { +// // debit +// *account.daily_outflows.entry(date).or_insert(0.0) += -amount; + +// *daily_balance += -amount +// } + +// // if credit { +// // *account.daily_user_flows.entry(date).or_insert(0.0) += amount; +// // } else { +// // *account.daily_accepter_flows.entry(date).or_insert(0.0) += amount; +// // } + +// if *daily_balance < 0.0 { +// // funding was needed +// *account.daily_funding.entry(date).or_insert(0.0) += *daily_balance; +// // reset +// *daily_balance = 0.0; +// } +// } + +// /// Save the balance tracker to a JSON file +// pub fn save_to_cache(&self, file_path: &str) { +// if let Ok(json) = serde_json::to_string(self) { +// let _ = fs::write(file_path, json); +// } +// } + +// /// Load the balance tracker from a JSON file +// pub fn load_from_cache(file_path: &str) -> Option { +// if let Ok(mut file) = File::open(file_path) { +// let mut contents = String::new(); +// if file.read_to_string(&mut contents).is_ok() { +// if let Ok(tracker) = serde_json::from_str(&contents) { +// return Some(tracker); +// } +// } +// } +// None +// } +// /// Generate a Cypher query string to insert data into Neo4j +// pub fn generate_cypher_query(&self, map: String) -> String { +// // r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, +// format!( +// r#" +// UNWIND {map} AS account +// MERGE (sa:SwapAccount {{swap_id: account.swap_id}}) +// MERGE (ul:UserLedger {{date: datetime(account.date)}}) +// SET ul.balance = account.balance, +// ul.funding = account.funding, +// ul.inflows = account.inflows, +// ul.outflows = account.outflows, +// ul.user_flows = account.user_flows, +// ul.accepter_flows = account.accepter_flows +// MERGE (sa)-[r:Daily]->(ul) +// SET r.date = datetime(account.date) +// RETURN COUNT(r) as merged_relations +// "#, +// ) +// } +// } + +// /// Manages cache logic and invokes replay_transactions only if necessary +// pub fn get_or_recalculate_balances( +// orders: &mut [ExchangeOrder], +// cache_file: Option, +// force_recalculate: bool, +// ) -> BalanceTracker { +// if !force_recalculate && cache_file.is_some() { +// if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file.as_ref().unwrap()) +// { +// return cached_tracker; +// } +// } + +// let tracker = replay_transactions(orders); +// if let Some(p) = cache_file { +// tracker.save_to_cache(&p); +// } +// tracker +// } + +// /// Replay all transactions sequentially and return a balance tracker +// pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> BalanceTracker { +// let mut tracker = BalanceTracker::new(); +// let sorted_orders = orders; +// sorted_orders.sort_by_key(|order| order.filled_at); +// for order in sorted_orders { +// tracker.process_transaction(order); +// } +// tracker +// } + +// /// submit to db +// pub async fn submit_ledger(balances: &BalanceTracker, pool: &Graph) -> Result { +// let mut merged_relations = 0u64; +// for (id, acc) in balances.accounts.iter() { +// let data = acc.to_cypher_map(*id); +// let query_literal = balances.generate_cypher_query(data); +// let query = Query::new(query_literal); +// let mut result = pool.execute(query).await?; + +// while let Some(r) = result.next().await? { +// if let Ok(i) = r.get::("merged_relations") { +// trace!("merged ledger in tx: {i}"); +// merged_relations += i; +// }; +// } +// } +// Ok(merged_relations) +// } /// Helper function to parse "YYYY-MM-DD" into `DateTime` fn parse_date(date_str: &str) -> DateTime { @@ -204,83 +321,84 @@ fn parse_date(date_str: &str) -> DateTime { .expect("Invalid date format; expected YYYY-MM-DD") .with_timezone(&Utc) } -/// Reusable function to print account data -pub fn print_account_data(user_id: u32, data: &AccountData) { - println!("User: {}", user_id); - for (date, balance) in &data.daily_balances { - println!(" Date: {}, Balance: {}", date, balance); - } - for (date, funding) in &data.daily_funding { - println!(" Date: {}, Funding: {}", date, funding); - } - for (date, inflow) in &data.daily_inflows { - println!(" Date: {}, Inflows: {}", date, inflow); - } - for (date, outflow) in &data.daily_outflows { - println!(" Date: {}, Outflows: {}", date, outflow); - } - for (date, user_flow) in &data.daily_user_flows { - println!(" Date: {}, User Flow: {}", date, user_flow); - } - for (date, accepter_flow) in &data.daily_accepter_flows { - println!(" Date: {}, Accepter Flow: {}", date, accepter_flow); - } -} - -/// Display statistics for a specific account within a date range -pub fn display_account_statistics( - user_id: u32, - data: &AccountData, - start_date: &str, - end_date: &str, -) { - let start = parse_date(start_date); - let end = parse_date(end_date); - - println!( - "Statistics for User {} from {} to {}", - user_id, start_date, end_date - ); - - let mut total_balance = 0.0; - let mut total_funding = 0.0; - let mut total_inflows = 0.0; - let mut total_outflows = 0.0; - - for (date, balance) in &data.daily_balances { - if *date >= start && *date <= end { - total_balance += balance; - } - } - - for (date, funding) in &data.daily_funding { - if *date >= start && *date <= end { - total_funding += funding; - } - } - - for (date, inflow) in &data.daily_inflows { - if *date >= start && *date <= end { - total_inflows += inflow; - } - } - - for (date, outflow) in &data.daily_outflows { - if *date >= start && *date <= end { - total_outflows += outflow; - } - } +// /// Reusable function to print account data +// pub fn print_account_data(user_id: u32, data: &AccountData) { +// println!("User: {}", user_id); +// for (date, balance) in &data.daily_balances { +// println!(" Date: {}, Balance: {}", date, balance); +// } +// for (date, funding) in &data.daily_funding { +// println!(" Date: {}, Funding: {}", date, funding); +// } +// for (date, inflow) in &data.daily_inflows { +// println!(" Date: {}, Inflows: {}", date, inflow); +// } +// for (date, outflow) in &data.daily_outflows { +// println!(" Date: {}, Outflows: {}", date, outflow); +// } +// for (date, user_flow) in &data.daily_user_flows { +// println!(" Date: {}, User Flow: {}", date, user_flow); +// } +// for (date, accepter_flow) in &data.daily_accepter_flows { +// println!(" Date: {}, Accepter Flow: {}", date, accepter_flow); +// } +// } - println!(" Total Balance: {:.2}", total_balance); - println!(" Total Funding: {:.2}", total_funding); - println!(" Total Inflows: {:.2}", total_inflows); - println!(" Total Outflows: {:.2}", total_outflows); -} +// /// Display statistics for a specific account within a date range +// pub fn display_account_statistics( +// user_id: u32, +// data: &AccountData, +// start_date: &str, +// end_date: &str, +// ) { +// let start = parse_date(start_date); +// let end = parse_date(end_date); + +// println!( +// "Statistics for User {} from {} to {}", +// user_id, start_date, end_date +// ); + +// let mut total_balance = 0.0; +// let mut total_funding = 0.0; +// let mut total_inflows = 0.0; +// let mut total_outflows = 0.0; + +// for (date, balance) in &data.daily_balances { +// if *date >= start && *date <= end { +// total_balance += balance; +// } +// } + +// for (date, funding) in &data.daily_funding { +// if *date >= start && *date <= end { +// total_funding += funding; +// } +// } + +// for (date, inflow) in &data.daily_inflows { +// if *date >= start && *date <= end { +// total_inflows += inflow; +// } +// } + +// for (date, outflow) in &data.daily_outflows { +// if *date >= start && *date <= end { +// total_outflows += outflow; +// } +// } + +// println!(" Total Balance: {:.2}", total_balance); +// println!(" Total Funding: {:.2}", total_funding); +// println!(" Total Inflows: {:.2}", total_inflows); +// println!(" Total Outflows: {:.2}", total_outflows); +// } #[test] fn test_replay_transactions() { - // Create orders with meaningful data and specific dates let mut orders = vec![ + // user 1 creates an offer to BUY, user 2 accepts. + // user 1 sends USD user 2 move amount of coins. ExchangeOrder { user: 1, order_type: "BUY".to_string(), @@ -296,6 +414,8 @@ fn test_replay_transactions() { shill_bid: None, }, ExchangeOrder { + // user 2 creates an offer to SELL, user 3 accepts. + // user 3 sends USD user 2 moves amount of coins. user: 2, order_type: "SELL".to_string(), amount: 5.0, @@ -309,6 +429,8 @@ fn test_replay_transactions() { price_vs_rms_24hour: 0.0, shill_bid: None, }, + // user 3 creates an offer to BUY, user 1 accepts. + // user 3 sends USD user 1 moves amount of coins. ExchangeOrder { user: 3, order_type: "BUY".to_string(), @@ -325,107 +447,107 @@ fn test_replay_transactions() { }, ]; - let tracker = replay_transactions(&mut orders); + let tracker = BalanceTracker::replay_transactions(&mut orders); - // Analyze results for March 2024 - for (user_id, data) in &tracker.accounts { - print_account_data(*user_id, data); - display_account_statistics(*user_id, data, "2024-03-01", "2024-03-31"); - } -} - -#[ignore] -// TODO: check paths -#[test] -fn test_cache_mechanism() { - let cache_file = "balance_tracker_cache.json".to_string(); - let mut orders = vec![ - ExchangeOrder { - user: 1, - order_type: "BUY".to_string(), - amount: 10.0, - price: 2.0, - created_at: parse_date("2024-03-01"), - filled_at: parse_date("2024-03-02"), - accepter: 2, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - ExchangeOrder { - user: 2, - order_type: "SELL".to_string(), - amount: 5.0, - price: 3.0, - created_at: parse_date("2024-03-05"), - filled_at: parse_date("2024-03-06"), - accepter: 3, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - ]; - - let tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), true); - assert!(tracker.accounts.contains_key(&1)); - assert!(tracker.accounts.contains_key(&2)); - - // Test loading from cache - let cached_tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), false); - assert!(cached_tracker.accounts.contains_key(&1)); - assert!(cached_tracker.accounts.contains_key(&2)); - - // Cleanup - let _ = fs::remove_file(cache_file); + // // Analyze results for March 2024 + // for (user_id, data) in &tracker.accounts { + // print_account_data(*user_id, data); + // display_account_statistics(*user_id, data, "2024-03-01", "2024-03-31"); + // } } +// #[ignore] +// // TODO: check paths // #[test] - -// fn test_cypher_query() { -// let tracker = BalanceTracker::new(); // Assume tracker is populated -// // let params = tracker.generate_cypher_params(); -// let query = tracker.generate_cypher_query(); -// // dbg!(¶ms); -// dbg!(&query); -// } - -// I'm coding some data analysis in rust. - -// I have a vector structs that looks like this: - -// pub struct ExchangeOrder { -// pub user: u32, -// #[serde(rename = "orderType")] -// pub order_type: String, -// #[serde(deserialize_with = "deserialize_amount")] -// pub amount: f64, -// #[serde(deserialize_with = "deserialize_amount")] -// pub price: f64, -// pub created_at: DateTime, -// pub filled_at: DateTime, -// pub accepter: u32, -// #[serde(skip_deserializing)] -// pub rms_hour: f64, -// #[serde(skip_deserializing)] -// pub rms_24hour: f64, -// #[serde(skip_deserializing)] -// pub price_vs_rms_hour: f64, -// #[serde(skip_deserializing)] -// pub price_vs_rms_24hour: f64, -// #[serde(skip_deserializing)] -// pub shill_bid: Option, // New field to indicate if it took the best price +// fn test_cache_mechanism() { +// let cache_file = "balance_tracker_cache.json".to_string(); +// let mut orders = vec![ +// ExchangeOrder { +// user: 1, +// order_type: "BUY".to_string(), +// amount: 10.0, +// price: 2.0, +// created_at: parse_date("2024-03-01"), +// filled_at: parse_date("2024-03-02"), +// accepter: 2, +// rms_hour: 0.0, +// rms_24hour: 0.0, +// price_vs_rms_hour: 0.0, +// price_vs_rms_24hour: 0.0, +// shill_bid: None, +// }, +// ExchangeOrder { +// user: 2, +// order_type: "SELL".to_string(), +// amount: 5.0, +// price: 3.0, +// created_at: parse_date("2024-03-05"), +// filled_at: parse_date("2024-03-06"), +// accepter: 3, +// rms_hour: 0.0, +// rms_24hour: 0.0, +// price_vs_rms_hour: 0.0, +// price_vs_rms_24hour: 0.0, +// shill_bid: None, +// }, +// ]; + +// let tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), true); +// assert!(tracker.accounts.contains_key(&1)); +// assert!(tracker.accounts.contains_key(&2)); + +// // Test loading from cache +// let cached_tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), false); +// assert!(cached_tracker.accounts.contains_key(&1)); +// assert!(cached_tracker.accounts.contains_key(&2)); + +// // Cleanup +// let _ = fs::remove_file(cache_file); // } -// My goal is to determine the amount of funding ('amount') that each account required at a given time. We will need to replay all the transaction history sequentially. - -// We need a new data structure to track account balances. Accepting a BUY transaction by another User, would decrease the total balance of the accepter, and increase of the User. Accepting a SELL transaction, would increase the balance of the accepter, and decrease that of the User. - -// We also need a data structure to save when there were funding events to the account. We can assume all accounts start at 0 total_balance. This means that we need to also track a funded_event_amount, for whenever the account would have a negative balance. - -// As for granularity of time we should just track daily balances, and daily funding. - -// How would I do this in Rust? +// // #[test] + +// // fn test_cypher_query() { +// // let tracker = BalanceTracker::new(); // Assume tracker is populated +// // // let params = tracker.generate_cypher_params(); +// // let query = tracker.generate_cypher_query(); +// // // dbg!(¶ms); +// // dbg!(&query); +// // } + +// // I'm coding some data analysis in rust. + +// // I have a vector structs that looks like this: + +// // pub struct ExchangeOrder { +// // pub user: u32, +// // #[serde(rename = "orderType")] +// // pub order_type: String, +// // #[serde(deserialize_with = "deserialize_amount")] +// // pub amount: f64, +// // #[serde(deserialize_with = "deserialize_amount")] +// // pub price: f64, +// // pub created_at: DateTime, +// // pub filled_at: DateTime, +// // pub accepter: u32, +// // #[serde(skip_deserializing)] +// // pub rms_hour: f64, +// // #[serde(skip_deserializing)] +// // pub rms_24hour: f64, +// // #[serde(skip_deserializing)] +// // pub price_vs_rms_hour: f64, +// // #[serde(skip_deserializing)] +// // pub price_vs_rms_24hour: f64, +// // #[serde(skip_deserializing)] +// // pub shill_bid: Option, // New field to indicate if it took the best price +// // } + +// // My goal is to determine the amount of funding ('amount') that each account required at a given time. We will need to replay all the transaction history sequentially. + +// // We need a new data structure to track account balances. Accepting a BUY transaction by another User, would decrease the total balance of the accepter, and increase of the User. Accepting a SELL transaction, would increase the balance of the accepter, and decrease that of the User. + +// // We also need a data structure to save when there were funding events to the account. We can assume all accounts start at 0 total_balance. This means that we need to also track a funded_event_amount, for whenever the account would have a negative balance. + +// // As for granularity of time we should just track daily balances, and daily funding. + +// // How would I do this in Rust? diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index ef1a384..cff12c9 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -84,17 +84,17 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) -> Ok((merged as u64, ignored as u64)) } -pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { - let mut orders = extract_exchange_orders::read_orders_from_file(path)?; - // add RMS stats to each order - enrich_rms::include_rms_stats(&mut orders); - // find likely shill bids - enrich_rms::process_sell_order_shill(&mut orders); - enrich_rms::process_buy_order_shill(&mut orders); - - let balances = enrich_account_funding::replay_transactions(&mut orders); - let ledger_inserts = enrich_account_funding::submit_ledger(&balances, pool).await?; - info!("exchange ledger relations inserted: {}", ledger_inserts); - - swap_batch(&orders, pool, batch_size).await -} +// pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { +// let mut orders = extract_exchange_orders::read_orders_from_file(path)?; +// // add RMS stats to each order +// enrich_rms::include_rms_stats(&mut orders); +// // find likely shill bids +// enrich_rms::process_sell_order_shill(&mut orders); +// enrich_rms::process_buy_order_shill(&mut orders); + +// let balances = enrich_account_funding::replay_transactions(&mut orders); +// let ledger_inserts = enrich_account_funding::submit_ledger(&balances, pool).await?; +// info!("exchange ledger relations inserted: {}", ledger_inserts); + +// swap_batch(&orders, pool, batch_size).await +// } From 3d40f5a99d0529b475d50527be1ddd6789653f50 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 13:01:42 -0500 Subject: [PATCH 06/21] refactor ledger replay --- src/analytics/enrich_account_funding.rs | 385 +++++++++--------------- src/load_exchange_orders.rs | 28 +- tests/test_enrich_exchange.rs | 8 +- 3 files changed, 163 insertions(+), 258 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 12d27cf..3da6e04 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -1,7 +1,8 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use log::trace; -use neo4rs::{Graph, Query}; +use log::error; +// use log::trace; +// use neo4rs::{Graph, Query}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -11,7 +12,7 @@ use std::{ use crate::schema_exchange_orders::ExchangeOrder; -#[derive(Default, Debug, Deserialize, Serialize)] +#[derive(Default, Debug, Clone, Deserialize, Serialize)] pub struct AccountDataAlt { pub current_balance: f64, pub total_funded: f64, @@ -25,7 +26,19 @@ pub struct AccountDataAlt { #[derive(Default, Debug, Deserialize, Serialize)] pub struct UserLedger(HashMap, AccountDataAlt>); -impl UserLedger { +#[derive(Default, Debug, Deserialize, Serialize)] +pub struct BalanceTracker(HashMap); // Tracks data for each user + +impl BalanceTracker { + /// Replay all transactions sequentially and return a balance tracker + pub fn replay_transactions(&mut self, orders: &mut [ExchangeOrder]) -> Result<()> { + orders.sort_by_key(|order| order.filled_at); + for o in orders { + self.process_transaction_alt(o); + } + Ok(()) + } + pub fn process_transaction_alt(&mut self, order: &ExchangeOrder) { let date = order.created_at; match order.order_type.as_str() { @@ -56,26 +69,62 @@ impl UserLedger { amount: f64, credit: bool, ) { - let account = self.0.entry(&user_id).or_default(); - // let daily_balance = account.current_balance.entry(date).or_insert(0.0); - // let day = account.day.entry(date).or_default(); + let ul = self.0.entry(user_id).or_default(); + + let most_recent_date = *ul.0.keys().max_by(|x, y| x.cmp(y)).unwrap_or(&date); + + // NOTE the previous record may be today's record from a previous transaction. Need to take care in the aggregation below + + // // TODO: gross, this shouldn't clone + // let previous = if let Some(d) = most_recent_date { + // ul.0.entry(*).or_default().to_owned() + // } else { + // AccountDataAlt::default() + // }; + + if most_recent_date > date { + // don't know what to here + error!("most recent ledger date is higher than current day"); + return; + }; + + let previous = ul.0.get(&most_recent_date).unwrap().clone(); + + let today = ul.0.entry(date).or_default(); + if credit { - account.current_balance += amount; - account.total_inflows += amount; - account.daily_inflows += amount; + today.current_balance = previous.current_balance + amount; + today.total_inflows = previous.total_inflows + amount; + if most_recent_date == date { + today.daily_inflows = previous.daily_inflows + amount; + } else { + today.daily_inflows = amount; + } // *daily_balance += amount } else { // debit - account.current_balance += -amount; - account.total_outflows += -amount; - account.daily_outflows += -amount; + today.current_balance = previous.current_balance - amount; + today.total_outflows = previous.total_outflows + amount; + + if most_recent_date == date { + today.daily_outflows = previous.daily_outflows + amount; + } else { + today.daily_outflows = amount; + } } - if account.current_balance < 0.0 { - let negative_balance = account.current_balance; + // find out if the outflows created a funding requirement on the account + if today.current_balance < 0.0 { + let negative_balance = today.current_balance.abs(); // funding was needed - account.total_funded += negative_balance; - account.daily_funding += negative_balance; + today.total_funded = previous.total_funded + negative_balance; + if most_recent_date == date { + today.daily_funding += negative_balance; + } else { + today.daily_funding = negative_balance; + } + // reset to zero + today.current_balance = 0.0; } } @@ -98,41 +147,24 @@ impl UserLedger { } None } -} - -#[derive(Default, Debug, Deserialize, Serialize)] -pub struct BalanceTracker { - pub accounts: HashMap, // Tracks data for each user -} - -impl BalanceTracker { - /// Replay all transactions sequentially and return a balance tracker - pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> Self { - let mut tracker: HashMap = HashMap::new(); - orders.sort_by_key(|order| order.filled_at); - for order in sorted_orders { - tracker.process_transaction_alt(order); - } - tracker - } pub fn to_cypher_map(&self, id: u32) -> Result { - let ul = self.accounts.get(&id).context("no user")?; + let ul = self.0.get(&id).context("no user")?; let mut list_literal: String = "".to_owned(); - for (date, _) in &ul.0 { + for date in ul.0.keys() { if let Some(acc) = ul.0.get(date) { let obj = format!( - r#"{{ swap_id: {}, date: "{}", current_balance: {}, total_funded: {}, total_inflows: {}, total_outflows: {}, daily_funding: {}, daily_inflows: {}, daily_outflows: {} }}"#, - id, - date.to_rfc3339(), - acc.current_balance, - acc.total_funded - acc.total_inflows, - acc.total_outflows, - acc.daily_funding, - acc.daily_inflows, - acc.daily_outflows, + r#"{{ swap_id: {}, date: "{}", current_balance: {}, total_funded: {}, total_inflows: {}, total_outflows: {}, daily_funding: {}, daily_inflows: {}, daily_outflows: {} }}"#, + id, + date.to_rfc3339(), + acc.current_balance, + acc.total_funded, + acc.total_inflows, + acc.total_outflows, + acc.daily_funding, + acc.daily_inflows, + acc.daily_outflows, ); list_literal.push_str(&obj); @@ -168,132 +200,14 @@ pub fn generate_cypher_query(map: String) -> String { "#, ) } -// pub fn new() -> Self { -// Self { -// accounts: HashMap::new(), -// } -// } - -// pub fn process_transaction(&mut self, order: &ExchangeOrder) { -// let date = order.created_at; -// let (buyer_id, seller_id, amount) = match order.order_type.as_str() { -// "Buy" => (order.user, order.accepter, order.amount * order.price), -// "Sell" => (order.accepter, order.user, order.amount * order.price), -// _ => { -// println!("ERROR: not a valid Buy/Sell order, {:?}", &order); -// return; -// } -// }; - -// self.update_balance_and_flows(seller_id, date, -amount, false); -// self.update_balance_and_flows(buyer_id, date, amount, true); -// } -// fn update_balance_and_flows( -// &mut self, -// user_id: u32, -// date: DateTime, -// amount: f64, -// credit: bool, -// ) { -// let account = self.accounts.entry(user_id).or_default(); -// let daily_balance = account.daily_balances.entry(date).or_insert(0.0); - -// if credit { -// *account.daily_inflows.entry(date).or_insert(0.0) += amount; -// *daily_balance += amount -// } else { -// // debit -// *account.daily_outflows.entry(date).or_insert(0.0) += -amount; - -// *daily_balance += -amount -// } - -// // if credit { -// // *account.daily_user_flows.entry(date).or_insert(0.0) += amount; -// // } else { -// // *account.daily_accepter_flows.entry(date).or_insert(0.0) += amount; -// // } - -// if *daily_balance < 0.0 { -// // funding was needed -// *account.daily_funding.entry(date).or_insert(0.0) += *daily_balance; -// // reset -// *daily_balance = 0.0; -// } -// } - -// /// Save the balance tracker to a JSON file -// pub fn save_to_cache(&self, file_path: &str) { -// if let Ok(json) = serde_json::to_string(self) { -// let _ = fs::write(file_path, json); -// } -// } - -// /// Load the balance tracker from a JSON file -// pub fn load_from_cache(file_path: &str) -> Option { -// if let Ok(mut file) = File::open(file_path) { -// let mut contents = String::new(); -// if file.read_to_string(&mut contents).is_ok() { -// if let Ok(tracker) = serde_json::from_str(&contents) { -// return Some(tracker); -// } -// } -// } -// None -// } -// /// Generate a Cypher query string to insert data into Neo4j -// pub fn generate_cypher_query(&self, map: String) -> String { -// // r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, -// format!( -// r#" -// UNWIND {map} AS account -// MERGE (sa:SwapAccount {{swap_id: account.swap_id}}) -// MERGE (ul:UserLedger {{date: datetime(account.date)}}) -// SET ul.balance = account.balance, -// ul.funding = account.funding, -// ul.inflows = account.inflows, -// ul.outflows = account.outflows, -// ul.user_flows = account.user_flows, -// ul.accepter_flows = account.accepter_flows -// MERGE (sa)-[r:Daily]->(ul) -// SET r.date = datetime(account.date) -// RETURN COUNT(r) as merged_relations -// "#, -// ) -// } -// } - -// /// Manages cache logic and invokes replay_transactions only if necessary -// pub fn get_or_recalculate_balances( -// orders: &mut [ExchangeOrder], -// cache_file: Option, -// force_recalculate: bool, -// ) -> BalanceTracker { -// if !force_recalculate && cache_file.is_some() { -// if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file.as_ref().unwrap()) -// { -// return cached_tracker; -// } -// } - -// let tracker = replay_transactions(orders); -// if let Some(p) = cache_file { -// tracker.save_to_cache(&p); -// } -// tracker -// } - -// /// Replay all transactions sequentially and return a balance tracker -// pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> BalanceTracker { -// let mut tracker = BalanceTracker::new(); -// let sorted_orders = orders; -// sorted_orders.sort_by_key(|order| order.filled_at); -// for order in sorted_orders { -// tracker.process_transaction(order); -// } -// tracker -// } +/// Helper function to parse "YYYY-MM-DD" into `DateTime` +fn parse_date(date_str: &str) -> DateTime { + let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset + DateTime::parse_from_rfc3339(&datetime_str) + .expect("Invalid date format; expected YYYY-MM-DD") + .with_timezone(&Utc) +} // /// submit to db // pub async fn submit_ledger(balances: &BalanceTracker, pool: &Graph) -> Result { @@ -314,13 +228,6 @@ pub fn generate_cypher_query(map: String) -> String { // Ok(merged_relations) // } -/// Helper function to parse "YYYY-MM-DD" into `DateTime` -fn parse_date(date_str: &str) -> DateTime { - let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset - DateTime::parse_from_rfc3339(&datetime_str) - .expect("Invalid date format; expected YYYY-MM-DD") - .with_timezone(&Utc) -} // /// Reusable function to print account data // pub fn print_account_data(user_id: u32, data: &AccountData) { // println!("User: {}", user_id); @@ -394,67 +301,67 @@ fn parse_date(date_str: &str) -> DateTime { // println!(" Total Outflows: {:.2}", total_outflows); // } -#[test] -fn test_replay_transactions() { - let mut orders = vec![ - // user 1 creates an offer to BUY, user 2 accepts. - // user 1 sends USD user 2 move amount of coins. - ExchangeOrder { - user: 1, - order_type: "BUY".to_string(), - amount: 10.0, - price: 2.0, - created_at: parse_date("2024-03-01"), - filled_at: parse_date("2024-03-02"), - accepter: 2, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - ExchangeOrder { - // user 2 creates an offer to SELL, user 3 accepts. - // user 3 sends USD user 2 moves amount of coins. - user: 2, - order_type: "SELL".to_string(), - amount: 5.0, - price: 3.0, - created_at: parse_date("2024-03-05"), - filled_at: parse_date("2024-03-06"), - accepter: 3, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - // user 3 creates an offer to BUY, user 1 accepts. - // user 3 sends USD user 1 moves amount of coins. - ExchangeOrder { - user: 3, - order_type: "BUY".to_string(), - amount: 15.0, - price: 1.5, - created_at: parse_date("2024-03-10"), - filled_at: parse_date("2024-03-11"), - accepter: 1, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - ]; - - let tracker = BalanceTracker::replay_transactions(&mut orders); - - // // Analyze results for March 2024 - // for (user_id, data) in &tracker.accounts { - // print_account_data(*user_id, data); - // display_account_statistics(*user_id, data, "2024-03-01", "2024-03-31"); - // } -} +// #[test] +// fn test_replay_transactions() { +// let mut orders = vec![ +// // user 1 creates an offer to BUY, user 2 accepts. +// // user 1 sends USD user 2 move amount of coins. +// ExchangeOrder { +// user: 1, +// order_type: "BUY".to_string(), +// amount: 10.0, +// price: 2.0, +// created_at: parse_date("2024-03-01"), +// filled_at: parse_date("2024-03-02"), +// accepter: 2, +// rms_hour: 0.0, +// rms_24hour: 0.0, +// price_vs_rms_hour: 0.0, +// price_vs_rms_24hour: 0.0, +// shill_bid: None, +// }, +// ExchangeOrder { +// // user 2 creates an offer to SELL, user 3 accepts. +// // user 3 sends USD user 2 moves amount of coins. +// user: 2, +// order_type: "SELL".to_string(), +// amount: 5.0, +// price: 3.0, +// created_at: parse_date("2024-03-05"), +// filled_at: parse_date("2024-03-06"), +// accepter: 3, +// rms_hour: 0.0, +// rms_24hour: 0.0, +// price_vs_rms_hour: 0.0, +// price_vs_rms_24hour: 0.0, +// shill_bid: None, +// }, +// // user 3 creates an offer to BUY, user 1 accepts. +// // user 3 sends USD user 1 moves amount of coins. +// ExchangeOrder { +// user: 3, +// order_type: "BUY".to_string(), +// amount: 15.0, +// price: 1.5, +// created_at: parse_date("2024-03-10"), +// filled_at: parse_date("2024-03-11"), +// accepter: 1, +// rms_hour: 0.0, +// rms_24hour: 0.0, +// price_vs_rms_hour: 0.0, +// price_vs_rms_24hour: 0.0, +// shill_bid: None, +// }, +// ]; + +// let tracker = replay_transactions(&mut orders); + +// // // Analyze results for March 2024 +// // for (user_id, data) in &tracker.accounts { +// // print_account_data(*user_id, data); +// // display_account_statistics(*user_id, data, "2024-03-01", "2024-03-31"); +// // } +// } // #[ignore] // // TODO: check paths diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index cff12c9..ce0b242 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -5,9 +5,7 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use crate::{ - analytics::{enrich_account_funding, enrich_rms}, - extract_exchange_orders, queue, - schema_exchange_orders::ExchangeOrder, + analytics::enrich_rms, extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder, }; pub async fn swap_batch( @@ -84,17 +82,17 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) -> Ok((merged as u64, ignored as u64)) } -// pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { -// let mut orders = extract_exchange_orders::read_orders_from_file(path)?; -// // add RMS stats to each order -// enrich_rms::include_rms_stats(&mut orders); -// // find likely shill bids -// enrich_rms::process_sell_order_shill(&mut orders); -// enrich_rms::process_buy_order_shill(&mut orders); +pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { + let mut orders = extract_exchange_orders::read_orders_from_file(path)?; + // add RMS stats to each order + enrich_rms::include_rms_stats(&mut orders); + // find likely shill bids + enrich_rms::process_sell_order_shill(&mut orders); + enrich_rms::process_buy_order_shill(&mut orders); -// let balances = enrich_account_funding::replay_transactions(&mut orders); -// let ledger_inserts = enrich_account_funding::submit_ledger(&balances, pool).await?; -// info!("exchange ledger relations inserted: {}", ledger_inserts); + // let balances = enrich_account_funding::replay_transactions(&mut orders); + // let ledger_inserts = enrich_account_funding::submit_ledger(&balances, pool).await?; + // info!("exchange ledger relations inserted: {}", ledger_inserts); -// swap_batch(&orders, pool, batch_size).await -// } + swap_batch(&orders, pool, batch_size).await +} diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 392fdb2..acb8f45 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use anyhow::Result; use libra_forensic_db::{ - analytics::{enrich_account_funding, enrich_rms}, + analytics::enrich_rms, extract_exchange_orders, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, schema_exchange_orders::ExchangeOrder, @@ -70,11 +70,11 @@ fn test_sell_order_shill() { fn test_enrich_account_funding() { let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); - let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + let orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); - let balance = enrich_account_funding::replay_transactions(&mut orders); + // let balance = enrich_account_funding::replay_transactions(&mut orders); - dbg!(balance.accounts.len()); + // dbg!(balance.accounts.len()); } #[test] From fb5cffc4506c61e06f7dc3b4bb16dde7f9096c0a Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 13:55:06 -0500 Subject: [PATCH 07/21] submit to db --- src/analytics/enrich_account_funding.rs | 397 +++++++++--------------- src/load_exchange_orders.rs | 11 +- tests/test_enrich_exchange.rs | 9 +- 3 files changed, 153 insertions(+), 264 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 3da6e04..5702908 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use log::error; +use log::{error, trace}; +use neo4rs::{Graph, Query}; // use log::trace; // use neo4rs::{Graph, Query}; use serde::{Deserialize, Serialize}; @@ -24,12 +25,15 @@ pub struct AccountDataAlt { } #[derive(Default, Debug, Deserialize, Serialize)] -pub struct UserLedger(HashMap, AccountDataAlt>); +pub struct UserLedger(pub HashMap, AccountDataAlt>); #[derive(Default, Debug, Deserialize, Serialize)] -pub struct BalanceTracker(HashMap); // Tracks data for each user +pub struct BalanceTracker(pub HashMap); // Tracks data for each user impl BalanceTracker { + pub fn new() -> Self { + BalanceTracker(HashMap::new()) + } /// Replay all transactions sequentially and return a balance tracker pub fn replay_transactions(&mut self, orders: &mut [ExchangeOrder]) -> Result<()> { orders.sort_by_key(|order| order.filled_at); @@ -40,7 +44,7 @@ impl BalanceTracker { } pub fn process_transaction_alt(&mut self, order: &ExchangeOrder) { - let date = order.created_at; + let date = order.filled_at; match order.order_type.as_str() { "Buy" => { // user offered to buy coins (Buyer) @@ -88,7 +92,10 @@ impl BalanceTracker { return; }; - let previous = ul.0.get(&most_recent_date).unwrap().clone(); + let previous = + ul.0.get(&most_recent_date) + .unwrap_or(&AccountDataAlt::default()) + .clone(); let today = ul.0.entry(date).or_default(); @@ -100,7 +107,8 @@ impl BalanceTracker { } else { today.daily_inflows = amount; } - // *daily_balance += amount + // no change from on totals + today.total_outflows = previous.total_outflows; } else { // debit today.current_balance = previous.current_balance - amount; @@ -111,6 +119,9 @@ impl BalanceTracker { } else { today.daily_outflows = amount; } + + // no change from on totals + today.total_inflows = previous.total_inflows; } // find out if the outflows created a funding requirement on the account @@ -177,6 +188,25 @@ impl BalanceTracker { list_literal.pop(); // need to drop last comma "," Ok(format!("[{}]", list_literal)) } + + /// submit to db + pub async fn submit_ledger(&self, pool: &Graph) -> Result { + let mut merged_relations = 0u64; + for id in self.0.keys() { + let data = self.to_cypher_map(*id)?; + let query_literal = generate_cypher_query(data); + let query = Query::new(query_literal); + let mut result = pool.execute(query).await?; + + while let Some(r) = result.next().await? { + if let Ok(i) = r.get::("merged_relations") { + trace!("merged ledger in tx: {i}"); + merged_relations += i; + }; + } + } + Ok(merged_relations) + } } /// Generate a Cypher query string to insert data into Neo4j @@ -202,259 +232,114 @@ pub fn generate_cypher_query(map: String) -> String { } /// Helper function to parse "YYYY-MM-DD" into `DateTime` -fn parse_date(date_str: &str) -> DateTime { +pub fn parse_date(date_str: &str) -> DateTime { let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset DateTime::parse_from_rfc3339(&datetime_str) .expect("Invalid date format; expected YYYY-MM-DD") .with_timezone(&Utc) } -// /// submit to db -// pub async fn submit_ledger(balances: &BalanceTracker, pool: &Graph) -> Result { -// let mut merged_relations = 0u64; -// for (id, acc) in balances.accounts.iter() { -// let data = acc.to_cypher_map(*id); -// let query_literal = balances.generate_cypher_query(data); -// let query = Query::new(query_literal); -// let mut result = pool.execute(query).await?; - -// while let Some(r) = result.next().await? { -// if let Ok(i) = r.get::("merged_relations") { -// trace!("merged ledger in tx: {i}"); -// merged_relations += i; -// }; -// } -// } -// Ok(merged_relations) -// } - -// /// Reusable function to print account data -// pub fn print_account_data(user_id: u32, data: &AccountData) { -// println!("User: {}", user_id); -// for (date, balance) in &data.daily_balances { -// println!(" Date: {}, Balance: {}", date, balance); -// } -// for (date, funding) in &data.daily_funding { -// println!(" Date: {}, Funding: {}", date, funding); -// } -// for (date, inflow) in &data.daily_inflows { -// println!(" Date: {}, Inflows: {}", date, inflow); -// } -// for (date, outflow) in &data.daily_outflows { -// println!(" Date: {}, Outflows: {}", date, outflow); -// } -// for (date, user_flow) in &data.daily_user_flows { -// println!(" Date: {}, User Flow: {}", date, user_flow); -// } -// for (date, accepter_flow) in &data.daily_accepter_flows { -// println!(" Date: {}, Accepter Flow: {}", date, accepter_flow); -// } -// } - -// /// Display statistics for a specific account within a date range -// pub fn display_account_statistics( -// user_id: u32, -// data: &AccountData, -// start_date: &str, -// end_date: &str, -// ) { -// let start = parse_date(start_date); -// let end = parse_date(end_date); - -// println!( -// "Statistics for User {} from {} to {}", -// user_id, start_date, end_date -// ); - -// let mut total_balance = 0.0; -// let mut total_funding = 0.0; -// let mut total_inflows = 0.0; -// let mut total_outflows = 0.0; - -// for (date, balance) in &data.daily_balances { -// if *date >= start && *date <= end { -// total_balance += balance; -// } -// } - -// for (date, funding) in &data.daily_funding { -// if *date >= start && *date <= end { -// total_funding += funding; -// } -// } - -// for (date, inflow) in &data.daily_inflows { -// if *date >= start && *date <= end { -// total_inflows += inflow; -// } -// } - -// for (date, outflow) in &data.daily_outflows { -// if *date >= start && *date <= end { -// total_outflows += outflow; -// } -// } - -// println!(" Total Balance: {:.2}", total_balance); -// println!(" Total Funding: {:.2}", total_funding); -// println!(" Total Inflows: {:.2}", total_inflows); -// println!(" Total Outflows: {:.2}", total_outflows); -// } - -// #[test] -// fn test_replay_transactions() { -// let mut orders = vec![ -// // user 1 creates an offer to BUY, user 2 accepts. -// // user 1 sends USD user 2 move amount of coins. -// ExchangeOrder { -// user: 1, -// order_type: "BUY".to_string(), -// amount: 10.0, -// price: 2.0, -// created_at: parse_date("2024-03-01"), -// filled_at: parse_date("2024-03-02"), -// accepter: 2, -// rms_hour: 0.0, -// rms_24hour: 0.0, -// price_vs_rms_hour: 0.0, -// price_vs_rms_24hour: 0.0, -// shill_bid: None, -// }, -// ExchangeOrder { -// // user 2 creates an offer to SELL, user 3 accepts. -// // user 3 sends USD user 2 moves amount of coins. -// user: 2, -// order_type: "SELL".to_string(), -// amount: 5.0, -// price: 3.0, -// created_at: parse_date("2024-03-05"), -// filled_at: parse_date("2024-03-06"), -// accepter: 3, -// rms_hour: 0.0, -// rms_24hour: 0.0, -// price_vs_rms_hour: 0.0, -// price_vs_rms_24hour: 0.0, -// shill_bid: None, -// }, -// // user 3 creates an offer to BUY, user 1 accepts. -// // user 3 sends USD user 1 moves amount of coins. -// ExchangeOrder { -// user: 3, -// order_type: "BUY".to_string(), -// amount: 15.0, -// price: 1.5, -// created_at: parse_date("2024-03-10"), -// filled_at: parse_date("2024-03-11"), -// accepter: 1, -// rms_hour: 0.0, -// rms_24hour: 0.0, -// price_vs_rms_hour: 0.0, -// price_vs_rms_24hour: 0.0, -// shill_bid: None, -// }, -// ]; - -// let tracker = replay_transactions(&mut orders); - -// // // Analyze results for March 2024 -// // for (user_id, data) in &tracker.accounts { -// // print_account_data(*user_id, data); -// // display_account_statistics(*user_id, data, "2024-03-01", "2024-03-31"); -// // } -// } - -// #[ignore] -// // TODO: check paths -// #[test] -// fn test_cache_mechanism() { -// let cache_file = "balance_tracker_cache.json".to_string(); -// let mut orders = vec![ -// ExchangeOrder { -// user: 1, -// order_type: "BUY".to_string(), -// amount: 10.0, -// price: 2.0, -// created_at: parse_date("2024-03-01"), -// filled_at: parse_date("2024-03-02"), -// accepter: 2, -// rms_hour: 0.0, -// rms_24hour: 0.0, -// price_vs_rms_hour: 0.0, -// price_vs_rms_24hour: 0.0, -// shill_bid: None, -// }, -// ExchangeOrder { -// user: 2, -// order_type: "SELL".to_string(), -// amount: 5.0, -// price: 3.0, -// created_at: parse_date("2024-03-05"), -// filled_at: parse_date("2024-03-06"), -// accepter: 3, -// rms_hour: 0.0, -// rms_24hour: 0.0, -// price_vs_rms_hour: 0.0, -// price_vs_rms_24hour: 0.0, -// shill_bid: None, -// }, -// ]; - -// let tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), true); -// assert!(tracker.accounts.contains_key(&1)); -// assert!(tracker.accounts.contains_key(&2)); - -// // Test loading from cache -// let cached_tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), false); -// assert!(cached_tracker.accounts.contains_key(&1)); -// assert!(cached_tracker.accounts.contains_key(&2)); - -// // Cleanup -// let _ = fs::remove_file(cache_file); -// } - -// // #[test] - -// // fn test_cypher_query() { -// // let tracker = BalanceTracker::new(); // Assume tracker is populated -// // // let params = tracker.generate_cypher_params(); -// // let query = tracker.generate_cypher_query(); -// // // dbg!(¶ms); -// // dbg!(&query); -// // } - -// // I'm coding some data analysis in rust. - -// // I have a vector structs that looks like this: - -// // pub struct ExchangeOrder { -// // pub user: u32, -// // #[serde(rename = "orderType")] -// // pub order_type: String, -// // #[serde(deserialize_with = "deserialize_amount")] -// // pub amount: f64, -// // #[serde(deserialize_with = "deserialize_amount")] -// // pub price: f64, -// // pub created_at: DateTime, -// // pub filled_at: DateTime, -// // pub accepter: u32, -// // #[serde(skip_deserializing)] -// // pub rms_hour: f64, -// // #[serde(skip_deserializing)] -// // pub rms_24hour: f64, -// // #[serde(skip_deserializing)] -// // pub price_vs_rms_hour: f64, -// // #[serde(skip_deserializing)] -// // pub price_vs_rms_24hour: f64, -// // #[serde(skip_deserializing)] -// // pub shill_bid: Option, // New field to indicate if it took the best price -// // } - -// // My goal is to determine the amount of funding ('amount') that each account required at a given time. We will need to replay all the transaction history sequentially. - -// // We need a new data structure to track account balances. Accepting a BUY transaction by another User, would decrease the total balance of the accepter, and increase of the User. Accepting a SELL transaction, would increase the balance of the accepter, and decrease that of the User. - -// // We also need a data structure to save when there were funding events to the account. We can assume all accounts start at 0 total_balance. This means that we need to also track a funded_event_amount, for whenever the account would have a negative balance. - -// // As for granularity of time we should just track daily balances, and daily funding. - -// // How would I do this in Rust? +#[test] +fn test_replay_transactions() { + let mut orders = vec![ + // user_1 creates an offer to BUY, user_2 accepts. + // user_1 sends USD, user_2 moves 10 coins. + ExchangeOrder { + user: 1, + order_type: "Buy".to_string(), + amount: 10.0, + price: 2.0, + created_at: parse_date("2024-03-01"), + filled_at: parse_date("2024-03-02"), + accepter: 2, + rms_hour: 0.0, + rms_24hour: 0.0, + price_vs_rms_hour: 0.0, + price_vs_rms_24hour: 0.0, + shill_bid: None, + }, + ExchangeOrder { + // user 2 creates an offer to SELL, user 3 accepts. + // user 3 sends USD user 2 moves amount of coins. + user: 2, + order_type: "Sell".to_string(), + amount: 5.0, + price: 3.0, + created_at: parse_date("2024-03-05"), + filled_at: parse_date("2024-03-06"), + accepter: 3, + rms_hour: 0.0, + rms_24hour: 0.0, + price_vs_rms_hour: 0.0, + price_vs_rms_24hour: 0.0, + shill_bid: None, + }, + // user 3 creates an offer to BUY, user 1 accepts. + // user 3 sends USD user 1 moves amount of coins. + ExchangeOrder { + user: 3, + order_type: "Buy".to_string(), + amount: 15.0, + price: 1.5, + created_at: parse_date("2024-03-10"), + filled_at: parse_date("2024-03-11"), + accepter: 1, + rms_hour: 0.0, + rms_24hour: 0.0, + price_vs_rms_hour: 0.0, + price_vs_rms_24hour: 0.0, + shill_bid: None, + }, + ]; + + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders).unwrap(); + + let accs = tracker.0; + + let user_1 = accs.get(&1).unwrap(); + let (_, acc) = user_1.0.get_key_value(&parse_date("2024-03-02")).unwrap(); + + assert!(acc.current_balance == 10.0); + assert!(acc.total_funded == 0.0); + assert!(acc.total_outflows == 0.0); + assert!(acc.total_inflows == 10.0); + assert!(acc.daily_funding == 0.0); + assert!(acc.daily_inflows == 10.0); + assert!(acc.daily_outflows == 0.0); + + let (_, acc) = user_1.0.get_key_value(&parse_date("2024-03-11")).unwrap(); + + // balance got drawn to negative on sale of 15 coin + assert!(acc.current_balance == 0.0); + // implied he had to fund with at least 5 coins + assert!(acc.total_funded == 5.0); + assert!(acc.total_outflows == 15.0); + // the all-time inflows should not have changed from the previous period + assert!(acc.total_inflows == 10.0); + assert!(acc.daily_funding == 5.0); + assert!(acc.daily_inflows == 0.0); + assert!(acc.daily_outflows == 15.0); + + let user_1 = accs.get(&3).unwrap(); + let (_, acc) = user_1.0.get_key_value(&parse_date("2024-03-06")).unwrap(); + + assert!(acc.current_balance == 5.0); + assert!(acc.total_funded == 0.0); + assert!(acc.total_outflows == 0.0); + assert!(acc.total_inflows == 5.0); + assert!(acc.daily_funding == 0.0); + assert!(acc.daily_inflows == 5.0); + assert!(acc.daily_outflows == 0.0); + + let (_, acc) = user_1.0.get_key_value(&parse_date("2024-03-11")).unwrap(); + + // balance should increase again + assert!(acc.current_balance == 20.0); + assert!(acc.total_funded == 0.0); + assert!(acc.total_outflows == 0.0); + assert!(acc.total_inflows == 20.0); + assert!(acc.daily_funding == 0.0); + assert!(acc.daily_inflows == 15.0); + assert!(acc.daily_outflows == 0.0); +} diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index ce0b242..2292b50 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -5,7 +5,9 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use crate::{ - analytics::enrich_rms, extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder, + analytics::{enrich_account_funding::BalanceTracker, enrich_rms}, + extract_exchange_orders, queue, + schema_exchange_orders::ExchangeOrder, }; pub async fn swap_batch( @@ -90,9 +92,10 @@ pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Res enrich_rms::process_sell_order_shill(&mut orders); enrich_rms::process_buy_order_shill(&mut orders); - // let balances = enrich_account_funding::replay_transactions(&mut orders); - // let ledger_inserts = enrich_account_funding::submit_ledger(&balances, pool).await?; - // info!("exchange ledger relations inserted: {}", ledger_inserts); + let mut balances = BalanceTracker::new(); + balances.replay_transactions(&mut orders)?; + let ledger_inserts = balances.submit_ledger(pool).await?; + info!("exchange ledger relations inserted: {}", ledger_inserts); swap_batch(&orders, pool, batch_size).await } diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index acb8f45..22c17ec 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use anyhow::Result; use libra_forensic_db::{ - analytics::enrich_rms, + analytics::{enrich_account_funding::BalanceTracker, enrich_rms}, extract_exchange_orders, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, schema_exchange_orders::ExchangeOrder, @@ -70,11 +70,12 @@ fn test_sell_order_shill() { fn test_enrich_account_funding() { let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); - let orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); - // let balance = enrich_account_funding::replay_transactions(&mut orders); + let mut balance = BalanceTracker::new(); + balance.replay_transactions(&mut orders).unwrap(); - // dbg!(balance.accounts.len()); + assert!(balance.0.len() == 3957); } #[test] From 18e0061a045b2f3bb19b0b231725adfadb41a6d3 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 14:03:00 -0500 Subject: [PATCH 08/21] add entrypoint test for exchange --- src/analytics/enrich_account_funding.rs | 2 +- tests/test_enrich_exchange.rs | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 5702908..d75522c 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -223,7 +223,7 @@ pub fn generate_cypher_query(map: String) -> String { ul.total_outflows = account.total_outflows, ul.daily_funding = account.daily_funding, ul.daily_inflows = account.daily_inflows, - ul.daily_outflows = account.daily_outflows, + ul.daily_outflows = account.daily_outflows MERGE (sa)-[r:DailyLedger]->(ul) SET r.date = datetime(account.date) RETURN COUNT(r) as merged_relations diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 22c17ec..8772f19 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -174,3 +174,16 @@ async fn e2e_swap_data() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_entry_point_exchange_load() -> Result<()>{ + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port).await?; + maybe_create_indexes(&graph).await?; + + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + load_exchange_orders::load_from_json(&buf, &graph, 250).await?; + Ok(()) +} From e795c9df22896aa6b6c9204c3d6814da54b6936e Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 14:59:56 -0500 Subject: [PATCH 09/21] add index --- src/load_exchange_orders.rs | 5 +++++ src/neo4j_init.rs | 6 ++++++ tests/test_enrich_exchange.rs | 6 ++++-- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index 2292b50..963a4c4 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -86,11 +86,16 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) -> pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { let mut orders = extract_exchange_orders::read_orders_from_file(path)?; + info!("completed parsing orders"); + // add RMS stats to each order enrich_rms::include_rms_stats(&mut orders); + info!("completed rms statistics"); + // find likely shill bids enrich_rms::process_sell_order_shill(&mut orders); enrich_rms::process_buy_order_shill(&mut orders); + info!("completed shill bid calculation"); let mut balances = BalanceTracker::new(); balances.replay_transactions(&mut orders)?; diff --git a/src/neo4j_init.rs b/src/neo4j_init.rs index 6d19eb8..f53f7c9 100644 --- a/src/neo4j_init.rs +++ b/src/neo4j_init.rs @@ -35,6 +35,11 @@ pub static INDEX_SWAP_ID: &str = pub static INDEX_SWAP_TIME: &str = "CREATE INDEX swap_time IF NOT EXISTS FOR ()-[r:Swap]-() ON (r.filled_at)"; +pub static INDEX_EXCHANGE_LEDGER: &str = r#" + CREATE INDEX user_ledger IF NOT EXISTS FOR (ul:UserExchangeLedger) ON (ul.date) + CREATE INDEX link_ledger IF NOT EXISTS FOR ()-[r:DailyLedger]->() ON (r.date) + "#; + /// get the testing neo4j connection pub async fn get_neo4j_localhost_pool(port: u16) -> Result { let uri = format!("127.0.0.1:{port}"); @@ -67,6 +72,7 @@ pub async fn maybe_create_indexes(graph: &Graph) -> Result<()> { INDEX_TX_HASH, INDEX_TX_FUNCTION, INDEX_SWAP_ID, + INDEX_EXCHANGE_LEDGER, ]) .await?; txn.commit().await?; diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 8772f19..2ca6ac8 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -176,7 +176,9 @@ async fn e2e_swap_data() -> Result<()> { } #[tokio::test] -async fn test_entry_point_exchange_load() -> Result<()>{ +async fn test_entry_point_exchange_load() -> Result<()> { + libra_forensic_db::log_setup(); + let c = start_neo4j_container(); let port = c.get_host_port_ipv4(7687); let graph = get_neo4j_localhost_pool(port).await?; @@ -184,6 +186,6 @@ async fn test_entry_point_exchange_load() -> Result<()>{ let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); - load_exchange_orders::load_from_json(&buf, &graph, 250).await?; + load_exchange_orders::load_from_json(&buf, &graph, 10).await?; Ok(()) } From e7d4e4e99907e6279bbc628f006fcaff0335b617 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:01:41 -0500 Subject: [PATCH 10/21] ignore expensive test --- tests/test_enrich_exchange.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 2ca6ac8..a6ca3d9 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -175,6 +175,7 @@ async fn e2e_swap_data() -> Result<()> { Ok(()) } +#[ignore] #[tokio::test] async fn test_entry_point_exchange_load() -> Result<()> { libra_forensic_db::log_setup(); From ef6e66b7f704d20316a3ed4f8a3cdccf555b8113 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:06:40 -0500 Subject: [PATCH 11/21] patch --- src/neo4j_init.rs | 8 ++++++-- tests/test_neo4j_meta.rs | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/neo4j_init.rs b/src/neo4j_init.rs index f53f7c9..b9dde52 100644 --- a/src/neo4j_init.rs +++ b/src/neo4j_init.rs @@ -35,10 +35,13 @@ pub static INDEX_SWAP_ID: &str = pub static INDEX_SWAP_TIME: &str = "CREATE INDEX swap_time IF NOT EXISTS FOR ()-[r:Swap]-() ON (r.filled_at)"; -pub static INDEX_EXCHANGE_LEDGER: &str = r#" +pub static INDEX_EXCHANGE_LEDGER: &str = " CREATE INDEX user_ledger IF NOT EXISTS FOR (ul:UserExchangeLedger) ON (ul.date) + "; + +pub static INDEX_EXCHANGE_LINK_LEDGER: &str = " CREATE INDEX link_ledger IF NOT EXISTS FOR ()-[r:DailyLedger]->() ON (r.date) - "#; + "; /// get the testing neo4j connection pub async fn get_neo4j_localhost_pool(port: u16) -> Result { @@ -73,6 +76,7 @@ pub async fn maybe_create_indexes(graph: &Graph) -> Result<()> { INDEX_TX_FUNCTION, INDEX_SWAP_ID, INDEX_EXCHANGE_LEDGER, + INDEX_EXCHANGE_LINK_LEDGER, ]) .await?; txn.commit().await?; diff --git a/tests/test_neo4j_meta.rs b/tests/test_neo4j_meta.rs index a9c4fb1..c214e20 100644 --- a/tests/test_neo4j_meta.rs +++ b/tests/test_neo4j_meta.rs @@ -76,7 +76,7 @@ async fn test_tx_insert() -> Result<()> { } #[tokio::test] -async fn test_init_indices() { +async fn test_init_indexes() { let c = start_neo4j_container(); let port = c.get_host_port_ipv4(7687); let graph = get_neo4j_localhost_pool(port) From a3f8591cacf1a5cab15ea7c1f52ffa5bdbf2310a Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:20:07 -0500 Subject: [PATCH 12/21] patch index --- src/neo4j_init.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/neo4j_init.rs b/src/neo4j_init.rs index b9dde52..f2eb51d 100644 --- a/src/neo4j_init.rs +++ b/src/neo4j_init.rs @@ -36,7 +36,7 @@ pub static INDEX_SWAP_TIME: &str = "CREATE INDEX swap_time IF NOT EXISTS FOR ()-[r:Swap]-() ON (r.filled_at)"; pub static INDEX_EXCHANGE_LEDGER: &str = " - CREATE INDEX user_ledger IF NOT EXISTS FOR (ul:UserExchangeLedger) ON (ul.date) + CREATE INDEX user_ledger IF NOT EXISTS FOR (ul:UserLedger) ON (ul.date) "; pub static INDEX_EXCHANGE_LINK_LEDGER: &str = " From 8fe98194d00aaa7f4092c7b7d417107c440ab9cb Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:41:54 -0500 Subject: [PATCH 13/21] handle submit errors --- src/analytics/enrich_account_funding.rs | 26 ++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index d75522c..29bb10a 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -196,13 +196,25 @@ impl BalanceTracker { let data = self.to_cypher_map(*id)?; let query_literal = generate_cypher_query(data); let query = Query::new(query_literal); - let mut result = pool.execute(query).await?; - - while let Some(r) = result.next().await? { - if let Ok(i) = r.get::("merged_relations") { - trace!("merged ledger in tx: {i}"); - merged_relations += i; - }; + let result = pool.execute(query).await; + + match result { + Ok(mut d) => { + while let r = d.next().await { + match r { + Ok(row) => { + if let Some(r) = row { + if let Ok(i) = r.get::("merged_relations") { + trace!("merged ledger in tx: {i}"); + merged_relations += i; + }; + } + } + Err(e) => error!("could not parse row in cypher query response: {}", e), + } + } + } + Err(e) => error!("could not get response in cypher query response: {}", e), } } Ok(merged_relations) From 17b37ae0f5338e6cd7bc782446a39e6e64489435 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:46:47 -0500 Subject: [PATCH 14/21] revert --- src/analytics/enrich_account_funding.rs | 26 +++++++------------------ 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 29bb10a..d75522c 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -196,25 +196,13 @@ impl BalanceTracker { let data = self.to_cypher_map(*id)?; let query_literal = generate_cypher_query(data); let query = Query::new(query_literal); - let result = pool.execute(query).await; - - match result { - Ok(mut d) => { - while let r = d.next().await { - match r { - Ok(row) => { - if let Some(r) = row { - if let Ok(i) = r.get::("merged_relations") { - trace!("merged ledger in tx: {i}"); - merged_relations += i; - }; - } - } - Err(e) => error!("could not parse row in cypher query response: {}", e), - } - } - } - Err(e) => error!("could not get response in cypher query response: {}", e), + let mut result = pool.execute(query).await?; + + while let Some(r) = result.next().await? { + if let Ok(i) = r.get::("merged_relations") { + trace!("merged ledger in tx: {i}"); + merged_relations += i; + }; } } Ok(merged_relations) From 2a6bb24c9b6899b516db6fcb2dbab75188ccc81d Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:08:00 -0500 Subject: [PATCH 15/21] better error handling --- src/analytics/enrich_account_funding.rs | 28 ++++++++++++++++--------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index d75522c..0775435 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -189,20 +189,28 @@ impl BalanceTracker { Ok(format!("[{}]", list_literal)) } + pub async fn submit_one_id(&self, id: u32, pool: &Graph) -> Result { + let data = self.to_cypher_map(id)?; + let query_literal = generate_cypher_query(data); + let query = Query::new(query_literal); + let mut result = pool.execute(query).await?; + + let row = result.next().await?.context("no row returned")?; + + let merged: u64 = row + .get("merged_relations") + .context("no unique_accounts field")?; + + trace!("merged ledger in tx: {merged}"); + Ok(merged) + } /// submit to db pub async fn submit_ledger(&self, pool: &Graph) -> Result { let mut merged_relations = 0u64; for id in self.0.keys() { - let data = self.to_cypher_map(*id)?; - let query_literal = generate_cypher_query(data); - let query = Query::new(query_literal); - let mut result = pool.execute(query).await?; - - while let Some(r) = result.next().await? { - if let Ok(i) = r.get::("merged_relations") { - trace!("merged ledger in tx: {i}"); - merged_relations += i; - }; + match self.submit_one_id(*id, pool).await { + Ok(m) => merged_relations += m, + Err(e) => error!("could not submit user ledger: {}", e), } } Ok(merged_relations) From ef7ef82f20542435dc36526290c3366fa3468280 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:17:09 -0500 Subject: [PATCH 16/21] patch total funding --- src/analytics/enrich_account_funding.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 0775435..f9f4d49 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -137,6 +137,9 @@ impl BalanceTracker { // reset to zero today.current_balance = 0.0; } + // no changes to funding + today.total_funded = previous.total_funded; + } /// Save the balance tracker to a JSON file From 79f719ca99f9cb7668aa2f00219f4466e1636c28 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 6 Dec 2024 17:23:38 -0500 Subject: [PATCH 17/21] patch replay --- src/analytics/enrich_account_funding.rs | 34 +++++++++++++++---------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index f9f4d49..5e278f9 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -75,6 +75,8 @@ impl BalanceTracker { ) { let ul = self.0.entry(user_id).or_default(); + let has_history = !ul.0.is_empty(); + let most_recent_date = *ul.0.keys().max_by(|x, y| x.cmp(y)).unwrap_or(&date); // NOTE the previous record may be today's record from a previous transaction. Need to take care in the aggregation below @@ -99,47 +101,51 @@ impl BalanceTracker { let today = ul.0.entry(date).or_default(); + // roll over from previous + if has_history { + today.current_balance = previous.current_balance; + today.total_funded = previous.total_funded; + today.total_inflows = previous.total_inflows; + today.total_outflows = previous.total_outflows; + } + if credit { - today.current_balance = previous.current_balance + amount; - today.total_inflows = previous.total_inflows + amount; + today.current_balance += amount; + today.total_inflows += amount; + // there are records from today if most_recent_date == date { today.daily_inflows = previous.daily_inflows + amount; } else { + // today's first record today.daily_inflows = amount; } - // no change from on totals - today.total_outflows = previous.total_outflows; } else { // debit - today.current_balance = previous.current_balance - amount; - today.total_outflows = previous.total_outflows + amount; + today.current_balance += -amount; + today.total_outflows += amount; if most_recent_date == date { today.daily_outflows = previous.daily_outflows + amount; } else { today.daily_outflows = amount; } - - // no change from on totals - today.total_inflows = previous.total_inflows; } // find out if the outflows created a funding requirement on the account if today.current_balance < 0.0 { let negative_balance = today.current_balance.abs(); // funding was needed - today.total_funded = previous.total_funded + negative_balance; + today.total_funded += negative_balance; + + // if the previous record is from today if most_recent_date == date { - today.daily_funding += negative_balance; + today.daily_funding = previous.daily_funding + negative_balance; } else { today.daily_funding = negative_balance; } // reset to zero today.current_balance = 0.0; } - // no changes to funding - today.total_funded = previous.total_funded; - } /// Save the balance tracker to a JSON file From 6364dbb7c253045d6c69e020ddcb438bea618b0f Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sat, 7 Dec 2024 15:30:53 -0500 Subject: [PATCH 18/21] use btree instead of hash, add fixtures and test --- src/analytics/enrich_account_funding.rs | 139 +++++++++++++++++++++++- 1 file changed, 135 insertions(+), 4 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 5e278f9..0ba057d 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -6,7 +6,7 @@ use neo4rs::{Graph, Query}; // use neo4rs::{Graph, Query}; use serde::{Deserialize, Serialize}; use std::{ - collections::HashMap, + collections::BTreeMap, fs::{self, File}, io::Read, }; @@ -25,14 +25,14 @@ pub struct AccountDataAlt { } #[derive(Default, Debug, Deserialize, Serialize)] -pub struct UserLedger(pub HashMap, AccountDataAlt>); +pub struct UserLedger(pub BTreeMap, AccountDataAlt>); #[derive(Default, Debug, Deserialize, Serialize)] -pub struct BalanceTracker(pub HashMap); // Tracks data for each user +pub struct BalanceTracker(pub BTreeMap); // Tracks data for each user impl BalanceTracker { pub fn new() -> Self { - BalanceTracker(HashMap::new()) + BalanceTracker(BTreeMap::new()) } /// Replay all transactions sequentially and return a balance tracker pub fn replay_transactions(&mut self, orders: &mut [ExchangeOrder]) -> Result<()> { @@ -360,3 +360,134 @@ fn test_replay_transactions() { assert!(acc.daily_inflows == 15.0); assert!(acc.daily_outflows == 0.0); } + +#[test] +fn test_example_user() -> Result<()> { + use crate::extract_exchange_orders; + use std::path::PathBuf; + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + orders.retain(|el| { + if el.filled_at < parse_date("2024-01-16") { + if el.user == 123 { + return true; + }; + if el.accepter == 123 { + return true; + }; + } + false + }); + + assert!(orders.len() == 68); + + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders)?; + + // check that running totals e.g. total_funded are always monotonically increasing. + + // Dump case + // This user only had outflows of coins, and thus an increasing funding requirement. + let user = tracker.0.get(&123).unwrap(); + assert!(user.0.len() == 68); + + // btree is already sorted + let mut prev_funding = 0.0; + let mut prev_inflows = 0.0; + let mut prev_outflows = 0.0; + for (_d, acc) in user.0.iter() { + assert!( + acc.total_funded >= prev_funding, + "total_funded is not monotonically increasing" + ); + assert!( + acc.total_inflows >= prev_inflows, + "total_inflows is not monotonically increasing" + ); + assert!( + acc.total_outflows >= prev_outflows, + "total_outflows is not monotonically increasing" + ); + prev_funding = acc.total_funded; + prev_inflows = acc.total_inflows; + prev_outflows = acc.total_outflows; + } + + Ok(()) +} + +#[test] +fn test_example_week() -> Result<()> { + // history for two users 123, and 336 + use crate::extract_exchange_orders; + use std::path::PathBuf; + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + orders.retain(|el| el.filled_at < parse_date("2024-01-16")); + assert!(orders.len() == 956); + + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders)?; + + // // check that running totals e.g. total_funded are always monotonically increasing. + + // Dump case + // This user only had outflows of coins, and thus an increasing funding requirement. + let user = tracker.0.get(&123).unwrap(); + + // btree is already sorted + let mut prev_funding = 0.0; + let mut prev_inflows = 0.0; + let mut prev_outflows = 0.0; + for (_d, acc) in user.0.iter() { + assert!( + acc.total_funded >= prev_funding, + "total_funded is not monotonically increasing" + ); + assert!( + acc.total_inflows >= prev_inflows, + "total_inflows is not monotonically increasing" + ); + assert!( + acc.total_outflows >= prev_outflows, + "total_outflows is not monotonically increasing" + ); + prev_funding = acc.total_funded; + prev_inflows = acc.total_inflows; + prev_outflows = acc.total_outflows; + } + + // Active Trading case, 336 + // This user only had outflows of coins, and thus an increasing funding requirement. + let user = tracker.0.get(&336).unwrap(); + + // btree is already sorted + let mut prev_funding = 0.0; + let mut prev_inflows = 0.0; + let mut prev_outflows = 0.0; + for (_d, acc) in user.0.iter() { + assert!( + acc.total_funded >= prev_funding, + "total_funded is not monotonically increasing" + ); + assert!( + acc.total_inflows >= prev_inflows, + "total_inflows is not monotonically increasing" + ); + assert!( + acc.total_outflows >= prev_outflows, + "total_outflows is not monotonically increasing" + ); + prev_funding = acc.total_funded; + prev_inflows = acc.total_inflows; + prev_outflows = acc.total_outflows; + } + + Ok(()) +} From ae04672e736f403b27c9be211b8c4f999ef17cfa Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sat, 7 Dec 2024 21:21:12 -0500 Subject: [PATCH 19/21] patch cypher query --- src/analytics/enrich_account_funding.rs | 2 +- tests/test_analytics.rs | 142 +++++++++++++++++++++++- 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 0ba057d..526d2d0 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -233,7 +233,7 @@ pub fn generate_cypher_query(map: String) -> String { r#" UNWIND {map} AS account MERGE (sa:SwapAccount {{swap_id: account.swap_id}}) - MERGE (ul:UserLedger {{date: datetime(account.date)}}) + MERGE (ul:UserLedger {{swap_id: account.swap_id, date: datetime(account.date)}}) SET ul.current_balance = account.current_balance, ul.total_funded = account.total_funded, ul.total_inflows = account.total_inflows, diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index e8ecd51..095f57d 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -3,7 +3,11 @@ use anyhow::Result; use std::path::PathBuf; use libra_forensic_db::{ - analytics, extract_exchange_orders, load_exchange_orders, + analytics::{ + self, + enrich_account_funding::{parse_date, BalanceTracker}, + }, + extract_exchange_orders, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, }; use support::neo4j_testcontainer::start_neo4j_container; @@ -96,3 +100,139 @@ async fn test_rms_batch() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_submit_exchange_ledger() -> Result<()> { + libra_forensic_db::log_setup(); + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port).await?; + maybe_create_indexes(&graph).await?; + + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + orders.retain(|el| { + if el.filled_at < parse_date("2024-01-16") { + if el.user == 123 { + return true; + }; + if el.accepter == 123 { + return true; + }; + } + false + }); + + assert!(orders.len() == 68); + + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders)?; + dbg!(&tracker.0.len()); + let days_records = tracker.0.len(); + assert!(days_records == 47); + + let user = tracker.0.get(&123).unwrap(); + assert!(user.0.len() == 68); + + let res = tracker.submit_one_id(123, &graph).await?; + + // the number of transactions merged should equal the number of orders + assert!(res == orders.len() as u64); + + // check there are transaction records with function args. + let cypher_query = neo4rs::query( + "MATCH (s:SwapAccount)-[r:DailyLedger]->(ul:UserLedger) + WHERE s.swap_id = 123 + ORDER BY ul.date + RETURN s.swap_id AS id, ul.date AS date, ul.total_funded AS funded + ", + ); + + // Execute the query + let mut result = graph.execute(cypher_query).await?; + + let mut prev_funding = 0; + let mut i = 0; + + // Fetch the first row only + while let Some(r) = result.next().await? { + if let Ok(s) = r.get::("funded") { + i += 1; + assert!(s >= prev_funding, "funded totals should always increase"); + prev_funding = s; + } + } + + assert!(i == orders.len()); + + Ok(()) +} + +#[tokio::test] +async fn test_submit_exchange_ledger_all() -> Result<()> { + libra_forensic_db::log_setup(); + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port).await?; + maybe_create_indexes(&graph).await?; + + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + orders.retain(|el| el.filled_at < parse_date("2024-01-16")); + + dbg!(&orders.len()); + // assert!(orders.len() == 68); + + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders)?; + dbg!(&tracker.0.len()); + let days_records = tracker.0.len(); + // assert!(days_records == 47); + + let user = tracker.0.get(&123).unwrap(); + // assert!(user.0.len() == 68); + + let res = tracker.submit_ledger(&graph).await?; + + // the number of transactions merged should equal the number of orders + // assert!(res == orders.len() as u64); + + // check there are transaction records with function args. + let cypher_query = neo4rs::query( + "MATCH (s:SwapAccount)-[r:DailyLedger]->(ul:UserLedger) + WHERE s.swap_id = 123 + ORDER BY ul.date + RETURN s.swap_id AS id, ul.date AS date, ul.total_funded AS funded + ", + ); + + // Execute the query + let mut result = graph.execute(cypher_query).await?; + + let mut prev_funding = 0; + let mut i = 0; + + // Fetch the first row only + while let Some(r) = result.next().await? { + if let Ok(s) = r.get::("funded") { + i += 1; + dbg!(&prev_funding); + + dbg!(&s); + assert!(s >= prev_funding, "funded totals should always increase"); + prev_funding = s; + } + } + + dbg!(&i); + + Ok(()) +} From c2db1e8f96d97d5ae350f056ee82369ff992f26b Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sat, 7 Dec 2024 21:36:31 -0500 Subject: [PATCH 20/21] docs --- docs/local_testing.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/local_testing.md b/docs/local_testing.md index 79585a9..11cbed5 100644 --- a/docs/local_testing.md +++ b/docs/local_testing.md @@ -8,6 +8,9 @@ Start a Neo4j instance. Choose a password ``. Allow it to create the d export LIBRA_GRAPH_DB_URI='neo4j://localhost' export LIBRA_GRAPH_DB_USER='neo4j' export LIBRA_GRAPH_DB_PASS= + +# optionally export trace logging +export RUST_LOG=trace ``` Import the sample exchange orders From c2031a999f820ab1ad197ac3ad643f161bd6125d Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sun, 8 Dec 2024 13:40:40 -0500 Subject: [PATCH 21/21] complete tests --- tests/test_analytics.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 095f57d..48db7e8 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -188,22 +188,20 @@ async fn test_submit_exchange_ledger_all() -> Result<()> { orders.retain(|el| el.filled_at < parse_date("2024-01-16")); - dbg!(&orders.len()); - // assert!(orders.len() == 68); + assert!(orders.len() == 956); let mut tracker = BalanceTracker::new(); tracker.replay_transactions(&mut orders)?; - dbg!(&tracker.0.len()); let days_records = tracker.0.len(); - // assert!(days_records == 47); + assert!(days_records == 367); // each users * dates with txs let user = tracker.0.get(&123).unwrap(); - // assert!(user.0.len() == 68); + assert!(user.0.len() == 68); let res = tracker.submit_ledger(&graph).await?; - // the number of transactions merged should equal the number of orders - // assert!(res == orders.len() as u64); + // there should be double len of ledgers, since user and accepter will have a ledger + assert!(res == (orders.len() * 2) as u64); // check there are transaction records with function args. let cypher_query = neo4rs::query( @@ -224,15 +222,13 @@ async fn test_submit_exchange_ledger_all() -> Result<()> { while let Some(r) = result.next().await? { if let Ok(s) = r.get::("funded") { i += 1; - dbg!(&prev_funding); - dbg!(&s); assert!(s >= prev_funding, "funded totals should always increase"); prev_funding = s; } } - dbg!(&i); + assert!(i == user.0.len()); Ok(()) }