Skip to content

Commit

Permalink
add ownership links from whitepages
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 21, 2024
1 parent 1a1ca41 commit ee34a82
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/cypher_templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ RETURN
)
}


use anyhow::bail;
use serde::Serialize;
use serde_json::Value;
Expand Down
118 changes: 118 additions & 0 deletions src/enrich_whitepages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use anyhow::{Context, Result};
use diem_types::account_address::AccountAddress;
use log::info;
use neo4rs::Graph;
use serde::{Deserialize, Deserializer, Serialize};
use std::path::Path;

#[derive(Debug, Serialize, Deserialize)]
pub struct Whitepages {
#[serde(deserialize_with = "from_any_string")]
address: Option<AccountAddress>,
owner: Option<String>,
address_note: Option<String>,
}

fn from_any_string<'de, D>(deserializer: D) -> Result<Option<AccountAddress>, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
// do better hex decoding than this
let mut lower = s.to_ascii_lowercase();
if !lower.contains("0x") {
lower = format!("0x{}", lower);
}
Ok(AccountAddress::from_hex_literal(&lower).ok())
}

impl Whitepages {
pub fn parse_json_file(path: &Path) -> Result<Vec<Self>> {
let s = std::fs::read_to_string(path)?;
Ok(serde_json::from_str(&s)?)
}

pub fn to_cypher_object_template(&self) -> String {
format!(
r#"{{owner: "{}", address: "{}" }}"#,
self.owner.as_ref().unwrap(),
self.address.as_ref().unwrap().to_hex_literal(),
// self.address_note,
)
}

/// create a cypher query string for the map object
pub fn to_cypher_map(list: &[Self]) -> String {
let mut list_literal = "".to_owned();
for el in list {
// skip empty records
if el.owner.is_none() { continue };

let s = el.to_cypher_object_template();
list_literal.push_str(&s);
list_literal.push(',');
}
list_literal.pop(); // need to drop last comma ","
format!("[{}]", list_literal)
}

pub fn cypher_batch_link_owner(list_str: &str) -> String {
format!(
r#"
WITH {list_str} AS owner_data
UNWIND owner_data AS each_owner
MATCH (addr:Account {{address: each_owner.address}})
MERGE (own:Owner {{alias: each_owner.owner}})
MERGE (own)-[rel:Owns]->(addr)
WITH rel
RETURN
COUNT(rel) AS owners_merged
"#
)
}
}



pub async fn impl_batch_tx_insert(
pool: &Graph,
batch_txs: &[Whitepages],
) -> Result<u64> {
let mut unique_owners = vec![];
batch_txs.iter().for_each(|t| {
if let Some(o) = &t.owner {
if !unique_owners.contains(&o) {
unique_owners.push(o);
}
}
});

info!("unique owner links in batch: {}", unique_owners.len());

let list_str = Whitepages::to_cypher_map(batch_txs);

// first insert the users
// cypher queries makes it annoying to do a single insert of users and
// txs
let cypher_string = Whitepages::cypher_batch_link_owner(&list_str);

// Execute the query
let cypher_query = neo4rs::query(&cypher_string);
let mut res = pool
.execute(cypher_query)
.await
.context("execute query error")?;

let row = res.next().await?.context("no row returned")?;

let owners_merged: u64 = row
.get("owners_merged")
.context("no owners_merged field")?;

info!("owners linked to addresses: {}", owners_merged);

Ok(owners_merged)
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod cypher_templates;
pub mod enrich_whitepages;
pub mod exchange_orders;
pub mod extract_snapshot;
pub mod extract_transactions;
Expand Down
23 changes: 23 additions & 0 deletions src/warehouse_cli.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use anyhow::{bail, Result};
use clap::{Parser, Subcommand};
use log::info;
use neo4rs::Graph;
use std::path::PathBuf;

use crate::{
enrich_whitepages::{self, Whitepages},
load::{ingest_all, try_load_one_archive},
load_exchange_orders,
neo4j_init::{self, get_credentials_from_env, PASS_ENV, URI_ENV, USER_ENV},
Expand Down Expand Up @@ -72,6 +74,15 @@ pub enum Sub {
/// size of each batch to load
batch_size: Option<usize>,
},
/// map owners of accounts from json file
EnrichWhitepages {
#[clap(long)]
/// file with owner map
json_file: PathBuf,
#[clap(long)]
/// size of each batch to load
batch_size: Option<usize>,
},
}

impl WarehouseCli {
Expand Down Expand Up @@ -129,6 +140,18 @@ impl WarehouseCli {
)
.await?;
}
Sub::EnrichWhitepages {
json_file,
batch_size: _,
} => {
info!("whitepages");
let pool = try_db_connection_pool(self).await?;

let wp = Whitepages::parse_json_file(&json_file)?;
let owners_merged = enrich_whitepages::impl_batch_tx_insert(&pool, &wp).await?;

println!("SUCCESS: {} owner accounts linked", owners_merged);
}
};
Ok(())
}
Expand Down

0 comments on commit ee34a82

Please sign in to comment.