From 2d98996e2b0d6efb437b05b19feec513d88e2dac Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Tue, 19 Nov 2024 11:09:40 -0500 Subject: [PATCH] CLI entrypoint cleanup --- README.md | 2 +- ...{supporting_data.rs => exchange_orders.rs} | 12 ++++----- src/lib.rs | 4 +-- ...orting_data.rs => load_exchange_orders.rs} | 27 ++++++------------- src/warehouse_cli.rs | 27 ++++++++++--------- tests/test_supporting_data.rs | 14 +++++----- 6 files changed, 38 insertions(+), 48 deletions(-) rename src/{supporting_data.rs => exchange_orders.rs} (95%) rename src/{load_supporting_data.rs => load_exchange_orders.rs} (74%) diff --git a/README.md b/README.md index e359b44..437f466 100644 --- a/README.md +++ b/README.md @@ -87,5 +87,5 @@ You can add off-chain data to the forensic db. Currently, exchange transactions #### enrich data ``` -libra-forensic-db enrich --swap-record-json +libra-forensic-db enrich-exchange --exchange_json ``` diff --git a/src/supporting_data.rs b/src/exchange_orders.rs similarity index 95% rename from src/supporting_data.rs rename to src/exchange_orders.rs index 95605ef..923c42c 100644 --- a/src/supporting_data.rs +++ b/src/exchange_orders.rs @@ -8,7 +8,7 @@ use std::path::Path; #[derive(Clone, Debug, Deserialize)] #[allow(dead_code)] -pub struct SwapOrder { +pub struct ExchangeOrder { pub user: u32, #[serde(rename = "orderType")] pub order_type: String, @@ -21,7 +21,7 @@ pub struct SwapOrder { pub accepter: u32, } -impl Default for SwapOrder { +impl Default for ExchangeOrder { fn default() -> Self { Self { user: 0, @@ -35,7 +35,7 @@ impl Default for SwapOrder { } } -impl SwapOrder { +impl ExchangeOrder { /// creates one transaction record in the cypher query map format /// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000 pub fn to_cypher_object_template(&self) -> String { @@ -102,12 +102,12 @@ where s.parse::().map_err(serde::de::Error::custom) } -fn deserialize_orders(json_data: &str) -> Result> { - let orders: Vec = serde_json::from_str(json_data)?; +fn deserialize_orders(json_data: &str) -> Result> { + let orders: Vec = serde_json::from_str(json_data)?; Ok(orders) } -pub fn read_orders_from_file>(path: P) -> Result> { +pub fn read_orders_from_file>(path: P) -> Result> { let mut file = File::open(path)?; let mut json_data = String::new(); file.read_to_string(&mut json_data)?; diff --git a/src/lib.rs b/src/lib.rs index f2081cf..721d113 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,13 +1,13 @@ pub mod cypher_templates; +pub mod exchange_orders; pub mod extract_snapshot; pub mod extract_transactions; pub mod load; -pub mod load_supporting_data; +pub mod load_exchange_orders; pub mod load_tx_cypher; pub mod neo4j_init; pub mod queue; pub mod scan; -pub mod supporting_data; pub mod table_structs; pub mod unzip_temp; pub mod warehouse_cli; diff --git a/src/load_supporting_data.rs b/src/load_exchange_orders.rs similarity index 74% rename from src/load_supporting_data.rs rename to src/load_exchange_orders.rs index 5fea32e..1a4e1dd 100644 --- a/src/load_supporting_data.rs +++ b/src/load_exchange_orders.rs @@ -5,17 +5,16 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use crate::{ + exchange_orders::{read_orders_from_file, ExchangeOrder}, queue, - supporting_data::{read_orders_from_file, SwapOrder}, }; pub async fn swap_batch( - txs: &[SwapOrder], + txs: &[ExchangeOrder], pool: &Graph, batch_len: usize, - skip_to_batch: Option, ) -> Result<(u64, u64)> { - let chunks: Vec<&[SwapOrder]> = txs.chunks(batch_len).collect(); + let chunks: Vec<&[ExchangeOrder]> = txs.chunks(batch_len).collect(); let mut merged_count = 0u64; let mut ignored_count = 0u64; @@ -23,11 +22,6 @@ pub async fn swap_batch( info!("archive: {}", archive_id); for (i, c) in chunks.iter().enumerate() { - if let Some(skip) = skip_to_batch { - if i < skip { - continue; - }; - }; info!("batch #{}", i); match queue::is_complete(pool, archive_id, i).await { @@ -70,9 +64,9 @@ pub async fn swap_batch( Ok((merged_count, ignored_count)) } -pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[SwapOrder]) -> Result<(u64, u64)> { - let list_str = SwapOrder::to_cypher_map(batch_txs); - let cypher_string = SwapOrder::cypher_batch_insert_str(list_str); +pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) -> Result<(u64, u64)> { + let list_str = ExchangeOrder::to_cypher_map(batch_txs); + let cypher_string = ExchangeOrder::cypher_batch_insert_str(list_str); // Execute the query let cypher_query = query(&cypher_string); @@ -88,12 +82,7 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[SwapOrder]) -> Resu Ok((merged as u64, ignored as u64)) } -pub async fn load_from_json( - path: &Path, - pool: &Graph, - batch_len: usize, - skip_to_batch: Option, -) -> Result<(u64, u64)> { +pub async fn load_from_json(path: &Path, pool: &Graph, batch_len: usize) -> Result<(u64, u64)> { let orders = read_orders_from_file(path)?; - swap_batch(&orders, pool, batch_len, skip_to_batch).await + swap_batch(&orders, pool, batch_len).await } diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index d459593..1e2ab0f 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use crate::{ load::{ingest_all, try_load_one_archive}, - load_supporting_data, + load_exchange_orders, neo4j_init::{self, get_credentials_from_env, PASS_ENV, URI_ENV, USER_ENV}, scan::{scan_dir_archive, BundleContent}, }; @@ -15,11 +15,14 @@ use crate::{ #[clap(arg_required_else_help(true))] /// Extract transform and load data into a graph datawarehouse pub struct WarehouseCli { - #[clap(long, short('d'))] + #[clap(long, short('r'))] /// URI of graphDB e.g. neo4j+s://localhost:port db_uri: Option, + #[clap(long, short('u'))] + /// username of db db_username: Option, + #[clap(long, short('p'))] /// db password db_password: Option, @@ -48,13 +51,13 @@ pub enum Sub { archive_dir: PathBuf, }, /// add supporting data in addition to chain records - Enrich { + EnrichExchange { #[clap(long)] /// file with swap records - swap_record_json: PathBuf, + exchange_json: PathBuf, #[clap(long)] - /// skip ahead to a batch in case you got disconnected - skip_to_batch: Option, + /// size of each batch to load + batch_size: Option, }, } @@ -97,19 +100,17 @@ impl WarehouseCli { } } } - Sub::Enrich { - swap_record_json, - skip_to_batch, + Sub::EnrichExchange { + exchange_json: swap_record_json, + batch_size, } => { let pool = try_db_connection_pool(self).await?; neo4j_init::maybe_create_indexes(&pool).await?; - let batch_len = 1000; // TODO: make this a param - load_supporting_data::load_from_json( + load_exchange_orders::load_from_json( swap_record_json, &pool, - batch_len, - *skip_to_batch, + batch_size.unwrap_or(250), ) .await?; } diff --git a/tests/test_supporting_data.rs b/tests/test_supporting_data.rs index ac3a59d..894ddd6 100644 --- a/tests/test_supporting_data.rs +++ b/tests/test_supporting_data.rs @@ -6,9 +6,9 @@ use std::path::PathBuf; use anyhow::Result; use libra_forensic_db::{ - load_supporting_data, + exchange_orders::{read_orders_from_file, ExchangeOrder}, + load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, - supporting_data::{read_orders_from_file, SwapOrder}, }; use neo4rs::query; @@ -26,21 +26,21 @@ async fn test_swap_batch_cypher() -> Result<()> { let port = c.get_host_port_ipv4(7687); let graph = get_neo4j_localhost_pool(port).await?; // Three user ids exist in these two transactions - let order1 = SwapOrder { + let order1 = ExchangeOrder { user: 1234, accepter: 666, ..Default::default() }; - let order2 = SwapOrder { + let order2 = ExchangeOrder { user: 4567, accepter: 666, ..Default::default() }; let list = vec![order1.clone(), order2]; - let cypher_map = SwapOrder::to_cypher_map(&list); - let insert_query = SwapOrder::cypher_batch_insert_str(cypher_map); + let cypher_map = ExchangeOrder::to_cypher_map(&list); + let insert_query = ExchangeOrder::cypher_batch_insert_str(cypher_map); let mut res1 = graph.execute(query(&insert_query)).await?; @@ -77,7 +77,7 @@ async fn e2e_swap_data() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_supporting_data::swap_batch(&orders[..1000], &graph, 1000, None).await?; + load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; // now check data was loaded let mut result = graph