diff --git a/src/cypher_templates.rs b/src/cypher_templates.rs index b58fdfd..a2aaf68 100644 --- a/src/cypher_templates.rs +++ b/src/cypher_templates.rs @@ -2,24 +2,13 @@ use anyhow::Result; // TODO move this to a .CQL file so we can lint and debug -pub fn write_batch_tx_string(list_str: String) -> String { +pub fn write_batch_tx_string(list_str: &str) -> String { format!( r#" WITH {list_str} AS tx_data UNWIND tx_data AS tx -// Deduplicate sender and recipient accounts -WITH COLLECT(DISTINCT [tx.sender, tx.recipient]) AS unique_address, tx -UNWIND unique_address AS address -// Merge unique Accounts -MERGE (account:Account {{address: address}}) -ON CREATE SET - account.created_at = timestamp(), - account.modified_at = null -ON MATCH SET - account.modified_at = timestamp() - -// CREATE Transaction Relationship and set creation flag +// NOTE: users should have already been merged in a previous call MERGE (from:Account {{address: tx.sender}}) MERGE (to:Account {{address: tx.recipient}}) MERGE (from)-[rel:Tx {{tx_hash: tx.tx_hash}}]->(to) @@ -33,18 +22,39 @@ SET rel.relation = tx.relation, rel.function = tx.function -// Count created, modified, and unchanged Account nodes and Tx relationships based on current timestamp -WITH from, to, rel -UNWIND [from, to] AS all_nodes -WITH DISTINCT all_nodes AS node, rel +WITH rel RETURN - // COUNT(CASE WHEN node.created_at = timestamp() THEN 1 END) AS created_accounts, - COUNT(node) AS created_accounts, + COUNT(CASE WHEN rel.created_at = timestamp() THEN 1 END) AS created_tx, + COUNT(CASE WHEN rel.modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx +"# + ) +} +pub fn write_batch_user_create(list_str: &str) -> String { + format!( + r#" +WITH {list_str} AS tx_data +UNWIND tx_data AS tx +WITH COLLECT(DISTINCT tx.sender) + COLLECT(DISTINCT tx.recipient) AS unique_addresses +// Deduplicate the combined list to ensure only unique addresses +UNWIND unique_addresses AS each_addr +WITH COLLECT(DISTINCT each_addr) as unique_array + +UNWIND unique_array AS addr +// Merge unique Accounts +MERGE (node:Account {{address: addr}}) +ON CREATE SET + node.created_at = timestamp(), + node.modified_at = null +ON MATCH SET + node.modified_at = timestamp() + +RETURN + COUNT(node) AS unique_count, + COUNT(CASE WHEN node.created_at = timestamp() THEN 1 END) AS created_accounts, COUNT(CASE WHEN node.modified_at = timestamp() AND node.created_at < timestamp() THEN 1 END) AS modified_accounts, - COUNT(CASE WHEN node.modified_at < timestamp() THEN 1 END) AS unchanged_accounts, - COUNT(CASE WHEN rel.created_at = timestamp() THEN 1 END) AS created_tx + COUNT(CASE WHEN node.modified_at < timestamp() THEN 1 END) AS unchanged_accounts "# ) } @@ -66,7 +76,7 @@ pub fn to_cypher_object(object: &T, prefix: Option<&str>) -> Resul // Serialize the struct to a JSON value let serialized_value = serde_json::to_value(object).expect("Failed to serialize"); - dbg!(&serialized_value); + // dbg!(&serialized_value); // Convert the JSON value into a map for easy processing let map = if let Value::Object(obj) = serialized_value { diff --git a/src/extract_transactions.rs b/src/extract_transactions.rs index 772a385..2fc2d27 100644 --- a/src/extract_transactions.rs +++ b/src/extract_transactions.rs @@ -125,6 +125,7 @@ pub fn make_master_tx( tx_hash, expiration_timestamp: user_tx.expiration_timestamp_secs(), sender: user_tx.sender(), + recipient: relation_label.get_recipient(), epoch, round, block_timestamp, diff --git a/src/load_tx_cypher.rs b/src/load_tx_cypher.rs index cbb69cf..71974c1 100644 --- a/src/load_tx_cypher.rs +++ b/src/load_tx_cypher.rs @@ -3,11 +3,16 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use std::{fmt::Display, thread, time::Duration}; -use crate::{cypher_templates::write_batch_tx_string, queue, table_structs::WarehouseTxMaster}; +use crate::{ + cypher_templates::{write_batch_tx_string, write_batch_user_create}, + queue, + table_structs::WarehouseTxMaster, +}; /// response for the batch insert tx #[derive(Debug, Clone)] pub struct BatchTxReturn { + pub unique_accounts: u64, pub created_accounts: u64, pub modified_accounts: u64, pub unchanged_accounts: u64, @@ -16,7 +21,8 @@ pub struct BatchTxReturn { impl Display for BatchTxReturn { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Total Transactions - created accounts: {}, modified accounts: {}, unchanged accounts: {}, transactions created: {}", + write!(f, "Total Transactions - unique accounts: {}, created accounts: {}, modified accounts: {}, unchanged accounts: {}, transactions created: {}", + self.unique_accounts, self.created_accounts, self.modified_accounts, self.unchanged_accounts, @@ -34,6 +40,7 @@ impl Default for BatchTxReturn { impl BatchTxReturn { pub fn new() -> Self { Self { + unique_accounts: 0, created_accounts: 0, modified_accounts: 0, unchanged_accounts: 0, @@ -41,6 +48,7 @@ impl BatchTxReturn { } } pub fn increment(&mut self, new: &BatchTxReturn) { + self.unique_accounts += new.unique_accounts; self.created_accounts += new.created_accounts; self.modified_accounts += new.modified_accounts; self.unchanged_accounts += new.unchanged_accounts; @@ -55,6 +63,19 @@ pub async fn tx_batch( batch_size: usize, archive_id: &str, ) -> Result { + let mut unique_addrs = vec![]; + txs.iter().for_each(|t| { + if !unique_addrs.contains(&t.sender) { + unique_addrs.push(t.sender.clone()); + } + if let Some(r) = t.recipient { + if !unique_addrs.contains(&r) { + unique_addrs.push(t.sender.clone()); + } + } + }); + info!("unique accounts in batch: {}", unique_addrs.len()); + let chunks: Vec<&[WarehouseTxMaster]> = txs.chunks(batch_size).collect(); let mut all_results = BatchTxReturn::new(); info!("archive: {}", archive_id); @@ -86,9 +107,9 @@ pub async fn tx_batch( match impl_batch_tx_insert(pool, c).await { Ok(batch) => { - dbg!(&batch); + // dbg!(&batch); all_results.increment(&batch); - dbg!(&all_results); + // dbg!(&all_results); queue::update_task(pool, archive_id, true, i).await?; info!("...success"); } @@ -101,6 +122,15 @@ pub async fn tx_batch( }; } + if all_results.unique_accounts != unique_addrs.len() as u64 { + error!( + "number of accounts in batch {} is not equal to unique accounts: {}", + all_results.unique_accounts, + unique_addrs.len() + ); + } + + dbg!(&all_results); Ok(all_results) } @@ -109,7 +139,11 @@ pub async fn impl_batch_tx_insert( batch_txs: &[WarehouseTxMaster], ) -> Result { let list_str = WarehouseTxMaster::to_cypher_map(batch_txs); - let cypher_string = write_batch_tx_string(list_str); + + // first insert the users + // cypher queries makes it annoying to do a single insert of users and + // txs + let cypher_string = write_batch_user_create(&list_str); // Execute the query let cypher_query = query(&cypher_string); @@ -119,6 +153,10 @@ pub async fn impl_batch_tx_insert( .context("execute query error")?; let row = res.next().await?.context("no row returned")?; + dbg!(&row); + let unique_accounts: u64 = row + .get("unique_accounts") + .context("no created_accounts field")?; let created_accounts: u64 = row .get("created_accounts") .context("no created_accounts field")?; @@ -128,9 +166,19 @@ pub async fn impl_batch_tx_insert( let unchanged_accounts: u64 = row .get("unchanged_accounts") .context("no unchanged_accounts field")?; + + let cypher_string = write_batch_tx_string(&list_str); + // Execute the query + let cypher_query = query(&cypher_string); + let mut res = pool + .execute(cypher_query) + .await + .context("execute query error")?; + let row = res.next().await?.context("no row returned")?; let created_tx: u64 = row.get("created_tx").context("no created_tx field")?; Ok(BatchTxReturn { + unique_accounts, created_accounts, modified_accounts, unchanged_accounts, diff --git a/src/table_structs.rs b/src/table_structs.rs index d513cf4..8dfd024 100644 --- a/src/table_structs.rs +++ b/src/table_structs.rs @@ -89,6 +89,7 @@ pub struct WarehouseTxMaster { pub tx_hash: HashValue, // primary key pub relation_label: RelationLabel, pub sender: AccountAddress, + pub recipient: Option, pub function: String, pub epoch: u64, pub round: u64, @@ -105,6 +106,7 @@ impl Default for WarehouseTxMaster { tx_hash: HashValue::zero(), relation_label: RelationLabel::Configuration, sender: AccountAddress::ZERO, + recipient: Some(AccountAddress::ZERO), function: "none".to_owned(), epoch: 0, round: 0, @@ -139,8 +141,7 @@ impl WarehouseTxMaster { self.function, self.sender.to_hex_literal(), tx_args, - self.relation_label - .get_recipient() + self.recipient .unwrap_or(self.sender) .to_hex_literal(), ) diff --git a/tests/test_e2e_tx_neo4j.rs b/tests/test_e2e_tx_neo4j.rs index 48362f5..28ff5fa 100644 --- a/tests/test_e2e_tx_neo4j.rs +++ b/tests/test_e2e_tx_neo4j.rs @@ -1,7 +1,7 @@ mod support; use anyhow::Result; use diem_crypto::HashValue; -use libra_forensic_db::cypher_templates::write_batch_tx_string; +use libra_forensic_db::cypher_templates::{write_batch_tx_string, write_batch_user_create}; use libra_forensic_db::load::try_load_one_archive; use libra_forensic_db::load_tx_cypher::tx_batch; use libra_forensic_db::scan::scan_dir_archive; @@ -14,7 +14,8 @@ use neo4rs::query; use support::neo4j_testcontainer::start_neo4j_container; #[tokio::test] -async fn test_parse_archive_into_neo4j() -> anyhow::Result<()> { +async fn test_tx_batch() -> anyhow::Result<()> { + libra_forensic_db::log_setup(); let archive_path = support::fixtures::v6_tx_manifest_fixtures_path(); let (txs, events) = extract_current_transactions(&archive_path).await?; assert!(txs.len() == 705); @@ -70,7 +71,7 @@ async fn test_load_entry_point_tx() -> anyhow::Result<()> { .expect("could start index"); let res = try_load_one_archive(man, &graph, 10).await?; - dbg!(&res); + assert!(res.created_accounts == 135); assert!(res.modified_accounts == 590); assert!(res.unchanged_accounts == 0); @@ -99,8 +100,68 @@ async fn insert_with_cypher_string() -> Result<()> { let list = vec![tx1, tx2, tx3]; let list_str = WarehouseTxMaster::to_cypher_map(&list); - dbg!(&list_str); - let cypher_string = write_batch_tx_string(list_str); + + let cypher_string = write_batch_tx_string(&list_str); + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port) + .await + .expect("could not get neo4j connection pool"); + maybe_create_indexes(&graph).await?; + + // Execute the query + let cypher_query = query(&cypher_string); + let mut res = graph.execute(cypher_query).await?; + + let row = res.next().await?.unwrap(); + // let created_accounts: i64 = row.get("created_accounts").unwrap(); + // dbg!(&created_accounts); + // assert!(created_accounts == 1); + // let modified_accounts: i64 = row.get("modified_accounts").unwrap(); + // assert!(modified_accounts == 0); + // let unchanged_accounts: i64 = row.get("unchanged_accounts").unwrap(); + // assert!(unchanged_accounts == 0); + let created_tx: i64 = row.get("created_tx").unwrap(); + assert!(created_tx == 3); + + // get the sum of all transactions in db + let cypher_query = query( + "MATCH ()-[r:Tx]->() + RETURN count(r) AS total_tx_count", + ); + + // Execute the query + let mut result = graph.execute(cypher_query).await?; + let row = result.next().await?.unwrap(); + let total_tx_count: i64 = row.get("total_tx_count").unwrap(); + assert!(total_tx_count == 3); + Ok(()) +} + +#[tokio::test] +async fn batch_users_create_unit() -> Result<()> { + let tx1 = WarehouseTxMaster { + tx_hash: HashValue::random(), + ..Default::default() + }; + + let tx2 = WarehouseTxMaster { + tx_hash: HashValue::random(), + ..Default::default() + }; + + let tx3 = WarehouseTxMaster { + tx_hash: HashValue::random(), + ..Default::default() + }; + + // two tx records + let list = vec![tx1, tx2, tx3]; + + let list_str = WarehouseTxMaster::to_cypher_map(&list); + + let cypher_string = write_batch_user_create(&list_str); let c = start_neo4j_container(); let port = c.get_host_port_ipv4(7687); @@ -115,26 +176,25 @@ async fn insert_with_cypher_string() -> Result<()> { let row = res.next().await?.unwrap(); let created_accounts: i64 = row.get("created_accounts").unwrap(); - dbg!(&created_accounts); + assert!(created_accounts == 1); let modified_accounts: i64 = row.get("modified_accounts").unwrap(); assert!(modified_accounts == 0); let unchanged_accounts: i64 = row.get("unchanged_accounts").unwrap(); assert!(unchanged_accounts == 0); - let created_tx: i64 = row.get("created_tx").unwrap(); - assert!(created_tx == 2); // get the sum of all transactions in db let cypher_query = query( - "MATCH ()-[r:Tx]->() - RETURN count(r) AS total_tx_count", + "MATCH (a:Account) + RETURN count(a) AS total_users", ); // Execute the query let mut result = graph.execute(cypher_query).await?; let row = result.next().await?.unwrap(); - let total_tx_count: i64 = row.get("total_tx_count").unwrap(); - assert!(total_tx_count == 2); + let total_tx_count: i64 = row.get("total_users").unwrap(); + assert!(total_tx_count == 1); + Ok(()) }