Skip to content

Commit

Permalink
wip unique account deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 21, 2024
1 parent f28e943 commit 8d23fcf
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 41 deletions.
54 changes: 32 additions & 22 deletions src/cypher_templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,13 @@
use anyhow::Result;

// TODO move this to a .CQL file so we can lint and debug
pub fn write_batch_tx_string(list_str: String) -> String {
pub fn write_batch_tx_string(list_str: &str) -> String {
format!(
r#"
WITH {list_str} AS tx_data
UNWIND tx_data AS tx
// Deduplicate sender and recipient accounts
WITH COLLECT(DISTINCT [tx.sender, tx.recipient]) AS unique_address, tx
UNWIND unique_address AS address
// Merge unique Accounts
MERGE (account:Account {{address: address}})
ON CREATE SET
account.created_at = timestamp(),
account.modified_at = null
ON MATCH SET
account.modified_at = timestamp()
// CREATE Transaction Relationship and set creation flag
// NOTE: users should have already been merged in a previous call
MERGE (from:Account {{address: tx.sender}})
MERGE (to:Account {{address: tx.recipient}})
MERGE (from)-[rel:Tx {{tx_hash: tx.tx_hash}}]->(to)
Expand All @@ -33,18 +22,39 @@ SET
rel.relation = tx.relation,
rel.function = tx.function
// Count created, modified, and unchanged Account nodes and Tx relationships based on current timestamp
WITH from, to, rel
UNWIND [from, to] AS all_nodes
WITH DISTINCT all_nodes AS node, rel
WITH rel
RETURN
// COUNT(CASE WHEN node.created_at = timestamp() THEN 1 END) AS created_accounts,
COUNT(node) AS created_accounts,
COUNT(CASE WHEN rel.created_at = timestamp() THEN 1 END) AS created_tx,
COUNT(CASE WHEN rel.modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
"#
)
}

pub fn write_batch_user_create(list_str: &str) -> String {
format!(
r#"
WITH {list_str} AS tx_data
UNWIND tx_data AS tx
WITH COLLECT(DISTINCT tx.sender) + COLLECT(DISTINCT tx.recipient) AS unique_addresses
// Deduplicate the combined list to ensure only unique addresses
UNWIND unique_addresses AS each_addr
WITH COLLECT(DISTINCT each_addr) as unique_array
UNWIND unique_array AS addr
// Merge unique Accounts
MERGE (node:Account {{address: addr}})
ON CREATE SET
node.created_at = timestamp(),
node.modified_at = null
ON MATCH SET
node.modified_at = timestamp()
RETURN
COUNT(node) AS unique_count,
COUNT(CASE WHEN node.created_at = timestamp() THEN 1 END) AS created_accounts,
COUNT(CASE WHEN node.modified_at = timestamp() AND node.created_at < timestamp() THEN 1 END) AS modified_accounts,
COUNT(CASE WHEN node.modified_at < timestamp() THEN 1 END) AS unchanged_accounts,
COUNT(CASE WHEN rel.created_at = timestamp() THEN 1 END) AS created_tx
COUNT(CASE WHEN node.modified_at < timestamp() THEN 1 END) AS unchanged_accounts
"#
)
}
Expand All @@ -66,7 +76,7 @@ pub fn to_cypher_object<T: Serialize>(object: &T, prefix: Option<&str>) -> Resul
// Serialize the struct to a JSON value

let serialized_value = serde_json::to_value(object).expect("Failed to serialize");
dbg!(&serialized_value);
// dbg!(&serialized_value);

// Convert the JSON value into a map for easy processing
let map = if let Value::Object(obj) = serialized_value {
Expand Down
1 change: 1 addition & 0 deletions src/extract_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub fn make_master_tx(
tx_hash,
expiration_timestamp: user_tx.expiration_timestamp_secs(),
sender: user_tx.sender(),
recipient: relation_label.get_recipient(),
epoch,
round,
block_timestamp,
Expand Down
58 changes: 53 additions & 5 deletions src/load_tx_cypher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ use log::{error, info, warn};
use neo4rs::{query, Graph};
use std::{fmt::Display, thread, time::Duration};

use crate::{cypher_templates::write_batch_tx_string, queue, table_structs::WarehouseTxMaster};
use crate::{
cypher_templates::{write_batch_tx_string, write_batch_user_create},
queue,
table_structs::WarehouseTxMaster,
};

/// response for the batch insert tx
#[derive(Debug, Clone)]
pub struct BatchTxReturn {
pub unique_accounts: u64,
pub created_accounts: u64,
pub modified_accounts: u64,
pub unchanged_accounts: u64,
Expand All @@ -16,7 +21,8 @@ pub struct BatchTxReturn {

impl Display for BatchTxReturn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Total Transactions - created accounts: {}, modified accounts: {}, unchanged accounts: {}, transactions created: {}",
write!(f, "Total Transactions - unique accounts: {}, created accounts: {}, modified accounts: {}, unchanged accounts: {}, transactions created: {}",
self.unique_accounts,
self.created_accounts,
self.modified_accounts,
self.unchanged_accounts,
Expand All @@ -34,13 +40,15 @@ impl Default for BatchTxReturn {
impl BatchTxReturn {
pub fn new() -> Self {
Self {
unique_accounts: 0,
created_accounts: 0,
modified_accounts: 0,
unchanged_accounts: 0,
created_tx: 0,
}
}
pub fn increment(&mut self, new: &BatchTxReturn) {
self.unique_accounts += new.unique_accounts;
self.created_accounts += new.created_accounts;
self.modified_accounts += new.modified_accounts;
self.unchanged_accounts += new.unchanged_accounts;
Expand All @@ -55,6 +63,19 @@ pub async fn tx_batch(
batch_size: usize,
archive_id: &str,
) -> Result<BatchTxReturn> {
let mut unique_addrs = vec![];
txs.iter().for_each(|t| {
if !unique_addrs.contains(&t.sender) {
unique_addrs.push(t.sender.clone());
}
if let Some(r) = t.recipient {
if !unique_addrs.contains(&r) {
unique_addrs.push(t.sender.clone());
}
}
});
info!("unique accounts in batch: {}", unique_addrs.len());

let chunks: Vec<&[WarehouseTxMaster]> = txs.chunks(batch_size).collect();
let mut all_results = BatchTxReturn::new();
info!("archive: {}", archive_id);
Expand Down Expand Up @@ -86,9 +107,9 @@ pub async fn tx_batch(

match impl_batch_tx_insert(pool, c).await {
Ok(batch) => {
dbg!(&batch);
// dbg!(&batch);
all_results.increment(&batch);
dbg!(&all_results);
// dbg!(&all_results);
queue::update_task(pool, archive_id, true, i).await?;
info!("...success");
}
Expand All @@ -101,6 +122,15 @@ pub async fn tx_batch(
};
}

if all_results.unique_accounts != unique_addrs.len() as u64 {
error!(
"number of accounts in batch {} is not equal to unique accounts: {}",
all_results.unique_accounts,
unique_addrs.len()
);
}

dbg!(&all_results);
Ok(all_results)
}

Expand All @@ -109,7 +139,11 @@ pub async fn impl_batch_tx_insert(
batch_txs: &[WarehouseTxMaster],
) -> Result<BatchTxReturn> {
let list_str = WarehouseTxMaster::to_cypher_map(batch_txs);
let cypher_string = write_batch_tx_string(list_str);

// first insert the users
// cypher queries makes it annoying to do a single insert of users and
// txs
let cypher_string = write_batch_user_create(&list_str);

// Execute the query
let cypher_query = query(&cypher_string);
Expand All @@ -119,6 +153,10 @@ pub async fn impl_batch_tx_insert(
.context("execute query error")?;

let row = res.next().await?.context("no row returned")?;
dbg!(&row);
let unique_accounts: u64 = row
.get("unique_accounts")
.context("no created_accounts field")?;
let created_accounts: u64 = row
.get("created_accounts")
.context("no created_accounts field")?;
Expand All @@ -128,9 +166,19 @@ pub async fn impl_batch_tx_insert(
let unchanged_accounts: u64 = row
.get("unchanged_accounts")
.context("no unchanged_accounts field")?;

let cypher_string = write_batch_tx_string(&list_str);
// Execute the query
let cypher_query = query(&cypher_string);
let mut res = pool
.execute(cypher_query)
.await
.context("execute query error")?;
let row = res.next().await?.context("no row returned")?;
let created_tx: u64 = row.get("created_tx").context("no created_tx field")?;

Ok(BatchTxReturn {
unique_accounts,
created_accounts,
modified_accounts,
unchanged_accounts,
Expand Down
5 changes: 3 additions & 2 deletions src/table_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub struct WarehouseTxMaster {
pub tx_hash: HashValue, // primary key
pub relation_label: RelationLabel,
pub sender: AccountAddress,
pub recipient: Option<AccountAddress>,
pub function: String,
pub epoch: u64,
pub round: u64,
Expand All @@ -105,6 +106,7 @@ impl Default for WarehouseTxMaster {
tx_hash: HashValue::zero(),
relation_label: RelationLabel::Configuration,
sender: AccountAddress::ZERO,
recipient: Some(AccountAddress::ZERO),
function: "none".to_owned(),
epoch: 0,
round: 0,
Expand Down Expand Up @@ -139,8 +141,7 @@ impl WarehouseTxMaster {
self.function,
self.sender.to_hex_literal(),
tx_args,
self.relation_label
.get_recipient()
self.recipient
.unwrap_or(self.sender)
.to_hex_literal(),
)
Expand Down
84 changes: 72 additions & 12 deletions tests/test_e2e_tx_neo4j.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod support;
use anyhow::Result;
use diem_crypto::HashValue;
use libra_forensic_db::cypher_templates::write_batch_tx_string;
use libra_forensic_db::cypher_templates::{write_batch_tx_string, write_batch_user_create};
use libra_forensic_db::load::try_load_one_archive;
use libra_forensic_db::load_tx_cypher::tx_batch;
use libra_forensic_db::scan::scan_dir_archive;
Expand All @@ -14,7 +14,8 @@ use neo4rs::query;
use support::neo4j_testcontainer::start_neo4j_container;

#[tokio::test]
async fn test_parse_archive_into_neo4j() -> anyhow::Result<()> {
async fn test_tx_batch() -> anyhow::Result<()> {
libra_forensic_db::log_setup();
let archive_path = support::fixtures::v6_tx_manifest_fixtures_path();
let (txs, events) = extract_current_transactions(&archive_path).await?;
assert!(txs.len() == 705);
Expand Down Expand Up @@ -70,7 +71,7 @@ async fn test_load_entry_point_tx() -> anyhow::Result<()> {
.expect("could start index");

let res = try_load_one_archive(man, &graph, 10).await?;
dbg!(&res);

assert!(res.created_accounts == 135);
assert!(res.modified_accounts == 590);
assert!(res.unchanged_accounts == 0);
Expand Down Expand Up @@ -99,8 +100,68 @@ async fn insert_with_cypher_string() -> Result<()> {
let list = vec![tx1, tx2, tx3];

let list_str = WarehouseTxMaster::to_cypher_map(&list);
dbg!(&list_str);
let cypher_string = write_batch_tx_string(list_str);

let cypher_string = write_batch_tx_string(&list_str);

let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = get_neo4j_localhost_pool(port)
.await
.expect("could not get neo4j connection pool");
maybe_create_indexes(&graph).await?;

// Execute the query
let cypher_query = query(&cypher_string);
let mut res = graph.execute(cypher_query).await?;

let row = res.next().await?.unwrap();
// let created_accounts: i64 = row.get("created_accounts").unwrap();
// dbg!(&created_accounts);
// assert!(created_accounts == 1);
// let modified_accounts: i64 = row.get("modified_accounts").unwrap();
// assert!(modified_accounts == 0);
// let unchanged_accounts: i64 = row.get("unchanged_accounts").unwrap();
// assert!(unchanged_accounts == 0);
let created_tx: i64 = row.get("created_tx").unwrap();
assert!(created_tx == 3);

// get the sum of all transactions in db
let cypher_query = query(
"MATCH ()-[r:Tx]->()
RETURN count(r) AS total_tx_count",
);

// Execute the query
let mut result = graph.execute(cypher_query).await?;
let row = result.next().await?.unwrap();
let total_tx_count: i64 = row.get("total_tx_count").unwrap();
assert!(total_tx_count == 3);
Ok(())
}

#[tokio::test]
async fn batch_users_create_unit() -> Result<()> {
let tx1 = WarehouseTxMaster {
tx_hash: HashValue::random(),
..Default::default()
};

let tx2 = WarehouseTxMaster {
tx_hash: HashValue::random(),
..Default::default()
};

let tx3 = WarehouseTxMaster {
tx_hash: HashValue::random(),
..Default::default()
};

// two tx records
let list = vec![tx1, tx2, tx3];

let list_str = WarehouseTxMaster::to_cypher_map(&list);

let cypher_string = write_batch_user_create(&list_str);

let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
Expand All @@ -115,26 +176,25 @@ async fn insert_with_cypher_string() -> Result<()> {

let row = res.next().await?.unwrap();
let created_accounts: i64 = row.get("created_accounts").unwrap();
dbg!(&created_accounts);

assert!(created_accounts == 1);
let modified_accounts: i64 = row.get("modified_accounts").unwrap();
assert!(modified_accounts == 0);
let unchanged_accounts: i64 = row.get("unchanged_accounts").unwrap();
assert!(unchanged_accounts == 0);
let created_tx: i64 = row.get("created_tx").unwrap();
assert!(created_tx == 2);

// get the sum of all transactions in db
let cypher_query = query(
"MATCH ()-[r:Tx]->()
RETURN count(r) AS total_tx_count",
"MATCH (a:Account)
RETURN count(a) AS total_users",
);

// Execute the query
let mut result = graph.execute(cypher_query).await?;
let row = result.next().await?.unwrap();
let total_tx_count: i64 = row.get("total_tx_count").unwrap();
assert!(total_tx_count == 2);
let total_tx_count: i64 = row.get("total_users").unwrap();
assert!(total_tx_count == 1);

Ok(())
}

Expand Down

0 comments on commit 8d23fcf

Please sign in to comment.