Skip to content

Commit

Permalink
wip deduplicating account creation
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 20, 2024
1 parent 2f9e1ce commit f28e943
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 29 deletions.
38 changes: 24 additions & 14 deletions src/cypher_templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ pub fn write_batch_tx_string(list_str: String) -> String {
r#"
WITH {list_str} AS tx_data
UNWIND tx_data AS tx
// Merge Accounts and set creation or modification flags
MERGE (from:Account {{address: tx.sender}})
ON CREATE SET from.created_at = timestamp(), from.modified_at = null
ON MATCH SET from.modified_at = timestamp()
MERGE (to:Account {{address: tx.recipient}})
ON CREATE SET to.created_at = timestamp(), to.modified_at = null
ON MATCH SET to.modified_at = timestamp()
// Deduplicate sender and recipient accounts
WITH COLLECT(DISTINCT [tx.sender, tx.recipient]) AS unique_address, tx
UNWIND unique_address AS address
// Merge unique Accounts
MERGE (account:Account {{address: address}})
ON CREATE SET
account.created_at = timestamp(),
account.modified_at = null
ON MATCH SET
account.modified_at = timestamp()
// CREATE Transaction Relationship and set creation flag
MERGE (from:Account {{address: tx.sender}})
MERGE (to:Account {{address: tx.recipient}})
MERGE (from)-[rel:Tx {{tx_hash: tx.tx_hash}}]->(to)
ON CREATE SET rel.created_at = timestamp(), rel.modified_at = null
ON MATCH SET rel.modified_at = timestamp()
SET
Expand All @@ -29,12 +34,17 @@ SET
rel.function = tx.function
// Count created, modified, and unchanged Account nodes and Tx relationships based on current timestamp
WITH
COUNT(CASE WHEN from.created_at = timestamp() THEN 1 END) AS created_accounts,
COUNT(CASE WHEN from.modified_at = timestamp() AND from.created_at IS NULL THEN 1 END) AS modified_accounts,
COUNT(CASE WHEN from.modified_at < timestamp() THEN 1 END) AS unchanged_accounts,
WITH from, to, rel
UNWIND [from, to] AS all_nodes
WITH DISTINCT all_nodes AS node, rel
RETURN
// COUNT(CASE WHEN node.created_at = timestamp() THEN 1 END) AS created_accounts,
COUNT(node) AS created_accounts,
COUNT(CASE WHEN node.modified_at = timestamp() AND node.created_at < timestamp() THEN 1 END) AS modified_accounts,
COUNT(CASE WHEN node.modified_at < timestamp() THEN 1 END) AS unchanged_accounts,
COUNT(CASE WHEN rel.created_at = timestamp() THEN 1 END) AS created_tx
RETURN created_accounts, modified_accounts, unchanged_accounts, created_tx
"#
)
}
Expand Down
2 changes: 2 additions & 0 deletions src/load_tx_cypher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub async fn tx_batch(

match impl_batch_tx_insert(pool, c).await {
Ok(batch) => {
dbg!(&batch);
all_results.increment(&batch);
dbg!(&all_results);
queue::update_task(pool, archive_id, true, i).await?;
info!("...success");
}
Expand Down
6 changes: 0 additions & 6 deletions tests/support/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pub fn v7_state_manifest_fixtures_path() -> PathBuf {
let p = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.canonicalize()
.unwrap();
assert!(&p.exists(), "not at the cargo manifest dir");

assert!(&p.exists(), "cannot find project root dir");
let dir = p.join("tests/fixtures/v7/state_epoch_116_ver_38180075.05af");
assert!(
Expand All @@ -47,8 +45,6 @@ pub fn v7_tx_manifest_fixtures_path() -> PathBuf {
let p = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.canonicalize()
.unwrap();
assert!(&p.exists(), "not at the cargo manifest dir");

assert!(&p.exists(), "cannot find project root dir");
let dir = p.join("tests/fixtures/v7/transaction_38100001-.541f");
assert!(
Expand All @@ -64,8 +60,6 @@ pub fn v6_tx_manifest_fixtures_path() -> PathBuf {
.canonicalize()
.unwrap();
assert!(&p.exists(), "not at the cargo manifest dir");
let p = p.parent().unwrap();
assert!(&p.exists(), "cannot find project root dir");
let dir = p.join("tests/fixtures/v6/transaction_9900001-.e469");
assert!(
&dir.exists(),
Expand Down
26 changes: 17 additions & 9 deletions tests/test_e2e_tx_neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ async fn test_parse_archive_into_neo4j() -> anyhow::Result<()> {
// load in batches
let archive_id = archive_path.file_name().unwrap().to_str().unwrap();
let res = tx_batch(&txs, &graph, 100, archive_id).await?;
assert!(res.created_accounts == 118);
assert!(res.modified_accounts == 0);
assert!(res.created_accounts == 135);
assert!(res.modified_accounts == 590);
assert!(res.unchanged_accounts == 0);
assert!(res.created_tx == 705);
assert!(res.created_tx == 725);
// CHECK
// get the sum of all transactions in db
let cypher_query = query(
Expand Down Expand Up @@ -69,11 +69,12 @@ async fn test_load_entry_point_tx() -> anyhow::Result<()> {
.await
.expect("could start index");

let res = try_load_one_archive(man, &graph, 1000).await?;
assert!(res.created_accounts == 118);
assert!(res.modified_accounts == 0);
let res = try_load_one_archive(man, &graph, 10).await?;
dbg!(&res);
assert!(res.created_accounts == 135);
assert!(res.modified_accounts == 590);
assert!(res.unchanged_accounts == 0);
assert!(res.created_tx == 705);
assert!(res.created_tx == 725);
Ok(())
}

Expand All @@ -89,10 +90,16 @@ async fn insert_with_cypher_string() -> Result<()> {
..Default::default()
};

let tx3 = WarehouseTxMaster {
tx_hash: HashValue::random(),
..Default::default()
};

// two tx records
let list = vec![tx1, tx2];
let list = vec![tx1, tx2, tx3];

let list_str = WarehouseTxMaster::to_cypher_map(&list);
dbg!(&list_str);
let cypher_string = write_batch_tx_string(list_str);

let c = start_neo4j_container();
Expand All @@ -108,7 +115,8 @@ async fn insert_with_cypher_string() -> Result<()> {

let row = res.next().await?.unwrap();
let created_accounts: i64 = row.get("created_accounts").unwrap();
assert!(created_accounts == 2);
dbg!(&created_accounts);
assert!(created_accounts == 1);
let modified_accounts: i64 = row.get("modified_accounts").unwrap();
assert!(modified_accounts == 0);
let unchanged_accounts: i64 = row.get("unchanged_accounts").unwrap();
Expand Down

0 comments on commit f28e943

Please sign in to comment.