Skip to content

Commit

Permalink
CLI entrypoint cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 19, 2024
1 parent 07e871f commit 2d98996
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 48 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,5 @@ You can add off-chain data to the forensic db. Currently, exchange transactions
#### enrich data

```
libra-forensic-db enrich --swap-record-json <path to .json file>
libra-forensic-db enrich-exchange --exchange_json <path to .json file>
```
12 changes: 6 additions & 6 deletions src/supporting_data.rs → src/exchange_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::path::Path;

#[derive(Clone, Debug, Deserialize)]
#[allow(dead_code)]
pub struct SwapOrder {
pub struct ExchangeOrder {
pub user: u32,
#[serde(rename = "orderType")]
pub order_type: String,
Expand All @@ -21,7 +21,7 @@ pub struct SwapOrder {
pub accepter: u32,
}

impl Default for SwapOrder {
impl Default for ExchangeOrder {
fn default() -> Self {
Self {
user: 0,
Expand All @@ -35,7 +35,7 @@ impl Default for SwapOrder {
}
}

impl SwapOrder {
impl ExchangeOrder {
/// creates one transaction record in the cypher query map format
/// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000
pub fn to_cypher_object_template(&self) -> String {
Expand Down Expand Up @@ -102,12 +102,12 @@ where
s.parse::<f64>().map_err(serde::de::Error::custom)
}

fn deserialize_orders(json_data: &str) -> Result<Vec<SwapOrder>> {
let orders: Vec<SwapOrder> = serde_json::from_str(json_data)?;
fn deserialize_orders(json_data: &str) -> Result<Vec<ExchangeOrder>> {
let orders: Vec<ExchangeOrder> = serde_json::from_str(json_data)?;
Ok(orders)
}

pub fn read_orders_from_file<P: AsRef<Path>>(path: P) -> Result<Vec<SwapOrder>> {
pub fn read_orders_from_file<P: AsRef<Path>>(path: P) -> Result<Vec<ExchangeOrder>> {
let mut file = File::open(path)?;
let mut json_data = String::new();
file.read_to_string(&mut json_data)?;
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
pub mod cypher_templates;
pub mod exchange_orders;
pub mod extract_snapshot;
pub mod extract_transactions;
pub mod load;
pub mod load_supporting_data;
pub mod load_exchange_orders;
pub mod load_tx_cypher;
pub mod neo4j_init;
pub mod queue;
pub mod scan;
pub mod supporting_data;
pub mod table_structs;
pub mod unzip_temp;
pub mod warehouse_cli;
Expand Down
27 changes: 8 additions & 19 deletions src/load_supporting_data.rs → src/load_exchange_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,23 @@ use log::{error, info, warn};
use neo4rs::{query, Graph};

use crate::{
exchange_orders::{read_orders_from_file, ExchangeOrder},
queue,
supporting_data::{read_orders_from_file, SwapOrder},
};

pub async fn swap_batch(
txs: &[SwapOrder],
txs: &[ExchangeOrder],
pool: &Graph,
batch_len: usize,
skip_to_batch: Option<usize>,
) -> Result<(u64, u64)> {
let chunks: Vec<&[SwapOrder]> = txs.chunks(batch_len).collect();
let chunks: Vec<&[ExchangeOrder]> = txs.chunks(batch_len).collect();
let mut merged_count = 0u64;
let mut ignored_count = 0u64;

let archive_id = "swap_orders";
info!("archive: {}", archive_id);

for (i, c) in chunks.iter().enumerate() {
if let Some(skip) = skip_to_batch {
if i < skip {
continue;
};
};
info!("batch #{}", i);

match queue::is_complete(pool, archive_id, i).await {
Expand Down Expand Up @@ -70,9 +64,9 @@ pub async fn swap_batch(
Ok((merged_count, ignored_count))
}

pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[SwapOrder]) -> Result<(u64, u64)> {
let list_str = SwapOrder::to_cypher_map(batch_txs);
let cypher_string = SwapOrder::cypher_batch_insert_str(list_str);
pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) -> Result<(u64, u64)> {
let list_str = ExchangeOrder::to_cypher_map(batch_txs);
let cypher_string = ExchangeOrder::cypher_batch_insert_str(list_str);

// Execute the query
let cypher_query = query(&cypher_string);
Expand All @@ -88,12 +82,7 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[SwapOrder]) -> Resu
Ok((merged as u64, ignored as u64))
}

pub async fn load_from_json(
path: &Path,
pool: &Graph,
batch_len: usize,
skip_to_batch: Option<usize>,
) -> Result<(u64, u64)> {
pub async fn load_from_json(path: &Path, pool: &Graph, batch_len: usize) -> Result<(u64, u64)> {
let orders = read_orders_from_file(path)?;
swap_batch(&orders, pool, batch_len, skip_to_batch).await
swap_batch(&orders, pool, batch_len).await
}
27 changes: 14 additions & 13 deletions src/warehouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::PathBuf;

use crate::{
load::{ingest_all, try_load_one_archive},
load_supporting_data,
load_exchange_orders,
neo4j_init::{self, get_credentials_from_env, PASS_ENV, URI_ENV, USER_ENV},
scan::{scan_dir_archive, BundleContent},
};
Expand All @@ -15,11 +15,14 @@ use crate::{
#[clap(arg_required_else_help(true))]
/// Extract transform and load data into a graph datawarehouse
pub struct WarehouseCli {
#[clap(long, short('d'))]
#[clap(long, short('r'))]
/// URI of graphDB e.g. neo4j+s://localhost:port
db_uri: Option<String>,
#[clap(long, short('u'))]

/// username of db
db_username: Option<String>,
#[clap(long, short('p'))]
/// db password
db_password: Option<String>,

Expand Down Expand Up @@ -48,13 +51,13 @@ pub enum Sub {
archive_dir: PathBuf,
},
/// add supporting data in addition to chain records
Enrich {
EnrichExchange {
#[clap(long)]
/// file with swap records
swap_record_json: PathBuf,
exchange_json: PathBuf,
#[clap(long)]
/// skip ahead to a batch in case you got disconnected
skip_to_batch: Option<usize>,
/// size of each batch to load
batch_size: Option<usize>,
},
}

Expand Down Expand Up @@ -97,19 +100,17 @@ impl WarehouseCli {
}
}
}
Sub::Enrich {
swap_record_json,
skip_to_batch,
Sub::EnrichExchange {
exchange_json: swap_record_json,
batch_size,
} => {
let pool = try_db_connection_pool(self).await?;
neo4j_init::maybe_create_indexes(&pool).await?;

let batch_len = 1000; // TODO: make this a param
load_supporting_data::load_from_json(
load_exchange_orders::load_from_json(
swap_record_json,
&pool,
batch_len,
*skip_to_batch,
batch_size.unwrap_or(250),
)
.await?;
}
Expand Down
14 changes: 7 additions & 7 deletions tests/test_supporting_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::path::PathBuf;

use anyhow::Result;
use libra_forensic_db::{
load_supporting_data,
exchange_orders::{read_orders_from_file, ExchangeOrder},
load_exchange_orders,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
supporting_data::{read_orders_from_file, SwapOrder},
};
use neo4rs::query;

Expand All @@ -26,21 +26,21 @@ async fn test_swap_batch_cypher() -> Result<()> {
let port = c.get_host_port_ipv4(7687);
let graph = get_neo4j_localhost_pool(port).await?;
// Three user ids exist in these two transactions
let order1 = SwapOrder {
let order1 = ExchangeOrder {
user: 1234,
accepter: 666,
..Default::default()
};

let order2 = SwapOrder {
let order2 = ExchangeOrder {
user: 4567,
accepter: 666,
..Default::default()
};

let list = vec![order1.clone(), order2];
let cypher_map = SwapOrder::to_cypher_map(&list);
let insert_query = SwapOrder::cypher_batch_insert_str(cypher_map);
let cypher_map = ExchangeOrder::to_cypher_map(&list);
let insert_query = ExchangeOrder::cypher_batch_insert_str(cypher_map);

let mut res1 = graph.execute(query(&insert_query)).await?;

Expand Down Expand Up @@ -77,7 +77,7 @@ async fn e2e_swap_data() -> Result<()> {
assert!(orders.len() == 25450);

// load 1000 orders
load_supporting_data::swap_batch(&orders[..1000], &graph, 1000, None).await?;
load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?;

// now check data was loaded
let mut result = graph
Expand Down

0 comments on commit 2d98996

Please sign in to comment.