diff --git a/src/cypher_templates.rs b/src/cypher_templates.rs index 1938c39..ffc5832 100644 --- a/src/cypher_templates.rs +++ b/src/cypher_templates.rs @@ -59,6 +59,7 @@ RETURN ) } + use anyhow::bail; use serde::Serialize; use serde_json::Value; diff --git a/src/enrich_whitepages.rs b/src/enrich_whitepages.rs new file mode 100644 index 0000000..544a892 --- /dev/null +++ b/src/enrich_whitepages.rs @@ -0,0 +1,118 @@ +use anyhow::{Context, Result}; +use diem_types::account_address::AccountAddress; +use log::info; +use neo4rs::Graph; +use serde::{Deserialize, Deserializer, Serialize}; +use std::path::Path; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Whitepages { + #[serde(deserialize_with = "from_any_string")] + address: Option, + owner: Option, + address_note: Option, +} + +fn from_any_string<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s: &str = Deserialize::deserialize(deserializer)?; + // do better hex decoding than this + let mut lower = s.to_ascii_lowercase(); + if !lower.contains("0x") { + lower = format!("0x{}", lower); + } + Ok(AccountAddress::from_hex_literal(&lower).ok()) +} + +impl Whitepages { + pub fn parse_json_file(path: &Path) -> Result> { + let s = std::fs::read_to_string(path)?; + Ok(serde_json::from_str(&s)?) + } + + pub fn to_cypher_object_template(&self) -> String { + format!( + r#"{{owner: "{}", address: "{}" }}"#, + self.owner.as_ref().unwrap(), + self.address.as_ref().unwrap().to_hex_literal(), + // self.address_note, + ) + } + + /// create a cypher query string for the map object + pub fn to_cypher_map(list: &[Self]) -> String { + let mut list_literal = "".to_owned(); + for el in list { + // skip empty records + if el.owner.is_none() { continue }; + + let s = el.to_cypher_object_template(); + list_literal.push_str(&s); + list_literal.push(','); + } + list_literal.pop(); // need to drop last comma "," + format!("[{}]", list_literal) + } + + pub fn cypher_batch_link_owner(list_str: &str) -> String { + format!( + r#" + WITH {list_str} AS owner_data + UNWIND owner_data AS each_owner + + MATCH (addr:Account {{address: each_owner.address}}) + + MERGE (own:Owner {{alias: each_owner.owner}}) + MERGE (own)-[rel:Owns]->(addr) + + WITH rel + RETURN + COUNT(rel) AS owners_merged +"# + ) + } +} + + + +pub async fn impl_batch_tx_insert( + pool: &Graph, + batch_txs: &[Whitepages], +) -> Result { + let mut unique_owners = vec![]; + batch_txs.iter().for_each(|t| { + if let Some(o) = &t.owner { + if !unique_owners.contains(&o) { + unique_owners.push(o); + } + } + }); + + info!("unique owner links in batch: {}", unique_owners.len()); + + let list_str = Whitepages::to_cypher_map(batch_txs); + + // first insert the users + // cypher queries makes it annoying to do a single insert of users and + // txs + let cypher_string = Whitepages::cypher_batch_link_owner(&list_str); + + // Execute the query + let cypher_query = neo4rs::query(&cypher_string); + let mut res = pool + .execute(cypher_query) + .await + .context("execute query error")?; + + let row = res.next().await?.context("no row returned")?; + + let owners_merged: u64 = row + .get("owners_merged") + .context("no owners_merged field")?; + + info!("owners linked to addresses: {}", owners_merged); + + Ok(owners_merged) +} diff --git a/src/lib.rs b/src/lib.rs index 721d113..2f08e4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod cypher_templates; +pub mod enrich_whitepages; pub mod exchange_orders; pub mod extract_snapshot; pub mod extract_transactions; diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index e3dc3fd..4c9f0d5 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -1,9 +1,11 @@ use anyhow::{bail, Result}; use clap::{Parser, Subcommand}; +use log::info; use neo4rs::Graph; use std::path::PathBuf; use crate::{ + enrich_whitepages::{self, Whitepages}, load::{ingest_all, try_load_one_archive}, load_exchange_orders, neo4j_init::{self, get_credentials_from_env, PASS_ENV, URI_ENV, USER_ENV}, @@ -72,6 +74,15 @@ pub enum Sub { /// size of each batch to load batch_size: Option, }, + /// map owners of accounts from json file + EnrichWhitepages { + #[clap(long)] + /// file with owner map + json_file: PathBuf, + #[clap(long)] + /// size of each batch to load + batch_size: Option, + }, } impl WarehouseCli { @@ -129,6 +140,18 @@ impl WarehouseCli { ) .await?; } + Sub::EnrichWhitepages { + json_file, + batch_size: _, + } => { + info!("whitepages"); + let pool = try_db_connection_pool(self).await?; + + let wp = Whitepages::parse_json_file(&json_file)?; + let owners_merged = enrich_whitepages::impl_batch_tx_insert(&pool, &wp).await?; + + println!("SUCCESS: {} owner accounts linked", owners_merged); + } }; Ok(()) }