diff --git a/src/batch_tx_type.rs b/src/batch_tx_type.rs new file mode 100644 index 0000000..ecb2a21 --- /dev/null +++ b/src/batch_tx_type.rs @@ -0,0 +1,48 @@ +use std::fmt::Display; + +/// response for the batch insert tx +#[derive(Debug, Clone)] +pub struct BatchTxReturn { + pub unique_accounts: u64, + pub created_accounts: u64, + pub modified_accounts: u64, + pub unchanged_accounts: u64, + pub created_tx: u64, +} + +impl Display for BatchTxReturn { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Total Transactions - unique accounts: {}, created accounts: {}, modified accounts: {}, unchanged accounts: {}, transactions created: {}", + self.unique_accounts, + self.created_accounts, + self.modified_accounts, + self.unchanged_accounts, + self.created_tx + ) + } +} + +impl Default for BatchTxReturn { + fn default() -> Self { + Self::new() + } +} + +impl BatchTxReturn { + pub fn new() -> Self { + Self { + unique_accounts: 0, + created_accounts: 0, + modified_accounts: 0, + unchanged_accounts: 0, + created_tx: 0, + } + } + pub fn increment(&mut self, new: &BatchTxReturn) { + self.unique_accounts += new.unique_accounts; + self.created_accounts += new.created_accounts; + self.modified_accounts += new.modified_accounts; + self.unchanged_accounts += new.unchanged_accounts; + self.created_tx += new.created_tx; + } +} diff --git a/src/extract_snapshot.rs b/src/extract_snapshot.rs index a55341a..4f7a812 100644 --- a/src/extract_snapshot.rs +++ b/src/extract_snapshot.rs @@ -3,18 +3,36 @@ use std::path::Path; use anyhow::Result; use diem_types::account_view::AccountView; use libra_backwards_compatibility::version_five::{ - balance_v5::BalanceResourceV5, ol_wallet::SlowWalletResourceV5, - state_snapshot_v5::v5_accounts_from_manifest_path, + balance_v5::BalanceResourceV5, + ol_wallet::SlowWalletResourceV5, + state_snapshot_v5::{v5_accounts_from_manifest_path, v5_read_from_snapshot_manifest}, }; use libra_storage::read_snapshot::{accounts_from_snapshot_backup, load_snapshot_manifest}; -use libra_types::exports::AccountAddress; +use libra_types::{ + exports::AccountAddress, + move_resource::{libra_coin::LibraCoinStoreResource, wallet::SlowWalletResource}, +}; use log::{error, info, warn}; -use crate::schema_account_state::WarehouseAccState; +use crate::{ + scan::FrameworkVersion, + schema_account_state::{WarehouseAccState, WarehouseTime}, +}; // uses libra-compatibility to parse the v5 manifest files, and decode v5 format bytecode into current version data structures (v6+); pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result> { + // NOTE: this is duplicated with next step. + let manifest_data = v5_read_from_snapshot_manifest(v5_manifest_path)?; let account_blobs = v5_accounts_from_manifest_path(v5_manifest_path).await?; + + // TODO: see below, massively inefficient + let time = WarehouseTime { + framework_version: FrameworkVersion::V5, + timestamp: 0, + version: manifest_data.version, + epoch: 0, + }; + info!("account records found: {}", &account_blobs.len()); let mut warehouse_state = vec![]; for el in account_blobs.iter() { @@ -26,14 +44,16 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result().ok() { + if let Ok(b) = acc.get_resource::() { s.balance = b.coin() } - if let Some(sw) = acc.get_resource::().ok() { + if let Ok(sw) = acc.get_resource::() { s.slow_wallet_locked = sw.unlocked; s.slow_wallet_transferred = sw.transferred; } @@ -58,6 +78,15 @@ pub async fn extract_current_snapshot(archive_path: &Path) -> Result Result()? { + s.balance = b.coin(); + } + + if let Some(sw) = el.get_resource::()? { + s.slow_wallet_locked = sw.unlocked; + s.slow_wallet_transferred = sw.transferred; + } + warehouse_state.push(s); } } diff --git a/src/lib.rs b/src/lib.rs index ff5ed80..c1c0c99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ +pub mod batch_tx_type; pub mod cypher_templates; pub mod enrich_exchange_onboarding; pub mod enrich_whitepages; -pub mod schema_exchange_orders; pub mod extract_snapshot; pub mod extract_transactions; pub mod load; @@ -12,6 +12,7 @@ pub mod neo4j_init; pub mod queue; pub mod scan; pub mod schema_account_state; +pub mod schema_exchange_orders; pub mod schema_transaction; pub mod unzip_temp; pub mod warehouse_cli; diff --git a/src/load.rs b/src/load.rs index 7a67ee8..0323144 100644 --- a/src/load.rs +++ b/src/load.rs @@ -1,6 +1,9 @@ use crate::{ + batch_tx_type::BatchTxReturn, + extract_snapshot::{extract_current_snapshot, extract_v5_snapshot}, extract_transactions::extract_current_transactions, - load_tx_cypher::{self, BatchTxReturn}, + load_account_state::snapshot_batch, + load_tx_cypher, queue::{self, clear_queue, push_queue_from_archive_map}, scan::{ArchiveMap, ManifestInfo}, }; @@ -68,7 +71,19 @@ pub async fn try_load_one_archive( let mut all_results = BatchTxReturn::new(); match man.contents { crate::scan::BundleContent::Unknown => todo!(), - crate::scan::BundleContent::StateSnapshot => todo!(), + crate::scan::BundleContent::StateSnapshot => { + let snaps = match man.version { + crate::scan::FrameworkVersion::Unknown => todo!(), + crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?, + crate::scan::FrameworkVersion::V6 => { + extract_current_snapshot(&man.archive_dir).await? + } + crate::scan::FrameworkVersion::V7 => { + extract_current_snapshot(&man.archive_dir).await? + } + }; + snapshot_batch(&snaps, pool, batch_size, &man.archive_id).await?; + } crate::scan::BundleContent::Transaction => { let (txs, _) = extract_current_transactions(&man.archive_dir).await?; let batch_res = diff --git a/src/load_account_state.rs b/src/load_account_state.rs index e9423e9..361aeba 100644 --- a/src/load_account_state.rs +++ b/src/load_account_state.rs @@ -1,15 +1,72 @@ -use log::info; -use neo4rs::Graph; +use std::{thread, time::Duration}; + +use crate::{batch_tx_type::BatchTxReturn, queue, schema_account_state::WarehouseAccState}; use anyhow::{Context, Result}; -use crate::schema_account_state::WarehouseAccState; +use log::{error, info, warn}; +use neo4rs::Graph; + +// TODO: code duplication +pub async fn snapshot_batch( + txs: &[WarehouseAccState], + pool: &Graph, + batch_size: usize, + archive_id: &str, +) -> Result { + let mut all_results = BatchTxReturn::new(); + + let chunks: Vec<&[WarehouseAccState]> = txs.chunks(batch_size).collect(); + info!("archive: {}", archive_id); + for (i, c) in chunks.into_iter().enumerate() { + info!("batch #{}", i); + // double checking the status of the loading PER BATCH + // it could have been updated in the interim + // since the outer check in ingest_all, just checks + // all things completed prior to this run + // check if this is already completed, or should be inserted. + match queue::is_batch_complete(pool, archive_id, i).await { + Ok(Some(true)) => { + info!("...skipping, all batches loaded."); + // skip this one + continue; + } + Ok(Some(false)) => { + // keep going + } + _ => { + info!("...batch not found in queue, adding to queue."); + + // no task found in db, add to queue + queue::update_task(pool, archive_id, false, i).await?; + } + } + info!("...loading to db"); + + match impl_batch_snapshot_insert(pool, c).await { + Ok(batch) => { + // dbg!(&batch); + all_results.increment(&batch); + // dbg!(&all_results); + queue::update_task(pool, archive_id, true, i).await?; + info!("...success"); + } + Err(e) => { + let secs = 10; + error!("skipping batch, could not insert: {:?}", e); + warn!("waiting {} secs before retrying connection", secs); + thread::sleep(Duration::from_secs(secs)); + } + }; + } + + Ok(all_results) +} pub async fn impl_batch_snapshot_insert( pool: &Graph, batch_snapshots: &[WarehouseAccState], -) -> Result<()> { - +) -> Result { let list_str = WarehouseAccState::to_cypher_map(batch_snapshots); let cypher_string = WarehouseAccState::cypher_batch_insert_str(&list_str); @@ -28,5 +85,11 @@ pub async fn impl_batch_snapshot_insert( info!("merged snapshots: {}", merged_snapshots); - Ok(()) + Ok(BatchTxReturn { + unique_accounts: 0, + created_accounts: 0, + modified_accounts: 0, + unchanged_accounts: 0, + created_tx: merged_snapshots, + }) } diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index 8aa6045..48176b7 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -5,8 +5,8 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use crate::{ - schema_exchange_orders::{read_orders_from_file, ExchangeOrder}, queue, + schema_exchange_orders::{read_orders_from_file, ExchangeOrder}, }; pub async fn swap_batch( diff --git a/src/load_tx_cypher.rs b/src/load_tx_cypher.rs index a23da51..dfa6747 100644 --- a/src/load_tx_cypher.rs +++ b/src/load_tx_cypher.rs @@ -1,61 +1,15 @@ use anyhow::{Context, Result}; use log::{error, info, warn}; use neo4rs::{query, Graph}; -use std::{fmt::Display, thread, time::Duration}; +use std::{thread, time::Duration}; use crate::{ + batch_tx_type::BatchTxReturn, cypher_templates::{write_batch_tx_string, write_batch_user_create}, queue, schema_transaction::WarehouseTxMaster, }; -/// response for the batch insert tx -#[derive(Debug, Clone)] -pub struct BatchTxReturn { - pub unique_accounts: u64, - pub created_accounts: u64, - pub modified_accounts: u64, - pub unchanged_accounts: u64, - pub created_tx: u64, -} - -impl Display for BatchTxReturn { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Total Transactions - unique accounts: {}, created accounts: {}, modified accounts: {}, unchanged accounts: {}, transactions created: {}", - self.unique_accounts, - self.created_accounts, - self.modified_accounts, - self.unchanged_accounts, - self.created_tx - ) - } -} - -impl Default for BatchTxReturn { - fn default() -> Self { - Self::new() - } -} - -impl BatchTxReturn { - pub fn new() -> Self { - Self { - unique_accounts: 0, - created_accounts: 0, - modified_accounts: 0, - unchanged_accounts: 0, - created_tx: 0, - } - } - pub fn increment(&mut self, new: &BatchTxReturn) { - self.unique_accounts += new.unique_accounts; - self.created_accounts += new.created_accounts; - self.modified_accounts += new.modified_accounts; - self.unchanged_accounts += new.unchanged_accounts; - self.created_tx += new.created_tx; - } -} - // TODO: code duplication with exchange order loading. pub async fn tx_batch( txs: &[WarehouseTxMaster], diff --git a/src/scan.rs b/src/scan.rs index 2c0281c..7f28bf0 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -23,15 +23,42 @@ pub struct ManifestInfo { /// the name of the directory, as a unique archive identifier pub archive_id: String, /// what libra version were these files encoded with (v5 etc) - pub version: EncodingVersion, + pub version: FrameworkVersion, /// contents of the manifest pub contents: BundleContent, /// processed pub processed: bool, } -#[derive(Clone, Debug)] -pub enum EncodingVersion { +impl ManifestInfo { + pub fn try_set_framework_version(&mut self) -> FrameworkVersion { + match self.contents { + BundleContent::Unknown => return FrameworkVersion::Unknown, + BundleContent::StateSnapshot => { + // first check if the v7 manifest will parse + if load_snapshot_manifest(&self.archive_dir).is_ok() { + self.version = FrameworkVersion::V7; + } + + if v5_read_from_snapshot_manifest(&self.archive_dir).is_ok() { + self.version = FrameworkVersion::V5; + } + } + BundleContent::Transaction => { + // TODO: v5 manifests appear to have the same format this is a noop + if v5_read_from_transaction_manifest(&self.archive_dir).is_ok() { + self.version = FrameworkVersion::V5; + } + } + BundleContent::EpochEnding => {} + } + + FrameworkVersion::Unknown + } +} +#[derive(Clone, Debug, Default)] +pub enum FrameworkVersion { + #[default] Unknown, V5, V6, @@ -74,19 +101,23 @@ pub fn scan_dir_archive( for entry in glob(&pattern)? { match entry { - Ok(path) => { - let dir = path.parent().context("no parent dir found")?.to_owned(); - let contents = test_content(&path); + Ok(manifest_path) => { + let dir = manifest_path + .parent() + .context("no parent dir found")? + .to_owned(); + let contents = test_content(&manifest_path); let archive_id = dir.file_name().unwrap().to_str().unwrap().to_owned(); - let m = ManifestInfo { + let mut m = ManifestInfo { archive_dir: dir.clone(), archive_id, - version: test_version(&contents, &path), + version: FrameworkVersion::Unknown, contents, processed: false, }; + m.try_set_framework_version(); - archive.insert(path.clone(), m); + archive.insert(manifest_path.clone(), m); } Err(e) => println!("{:?}", e), } @@ -109,28 +140,3 @@ fn test_content(manifest_path: &Path) -> BundleContent { BundleContent::Unknown } - -fn test_version(content: &BundleContent, manifest_file: &Path) -> EncodingVersion { - match content { - BundleContent::Unknown => return EncodingVersion::Unknown, - BundleContent::StateSnapshot => { - // first check if the v7 manifest will parse - if load_snapshot_manifest(manifest_file).is_ok() { - return EncodingVersion::V7; - } - - if v5_read_from_snapshot_manifest(manifest_file).is_ok() { - return EncodingVersion::V5; - } - } - BundleContent::Transaction => { - // TODO: v5 manifests appear to have the same format this is a noop - if v5_read_from_transaction_manifest(manifest_file).is_ok() { - return EncodingVersion::V5; - } - } - BundleContent::EpochEnding => {} - } - - EncodingVersion::Unknown -} diff --git a/src/schema_account_state.rs b/src/schema_account_state.rs index 050ca4f..b3f8465 100644 --- a/src/schema_account_state.rs +++ b/src/schema_account_state.rs @@ -1,5 +1,15 @@ use libra_types::exports::AccountAddress; +use crate::scan::FrameworkVersion; + +// holds timestamp, chain height, and epoch +#[derive(Debug, Clone, Default)] +pub struct WarehouseTime { + pub framework_version: FrameworkVersion, + pub timestamp: u64, + pub version: u64, + pub epoch: u64, +} #[derive(Debug, Clone)] /// The basic information for an account pub struct WarehouseAccState { @@ -11,6 +21,19 @@ pub struct WarehouseAccState { pub slow_wallet_transferred: u64, } +impl Default for WarehouseAccState { + fn default() -> Self { + Self { + address: AccountAddress::ZERO, + time: Default::default(), + sequence_num: Default::default(), + balance: Default::default(), + slow_wallet_locked: Default::default(), + slow_wallet_transferred: Default::default(), + } + } +} + impl WarehouseAccState { pub fn new(address: AccountAddress) -> Self { Self { @@ -28,29 +51,19 @@ impl WarehouseAccState { self.time.epoch = epoch; } } -// holds timestamp, chain height, and epoch -#[derive(Debug, Clone, Default)] -pub struct WarehouseTime { - pub timestamp: u64, - pub version: u64, - pub epoch: u64, -} impl WarehouseAccState { /// creates one transaction record in the cypher query map format /// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000 pub fn to_cypher_object_template(&self) -> String { format!( - r#"{{address: {}, balance: {} }}"#, - self.address, + r#"{{address: "{}", balance: {}, version: {}, sequence_num: {}, slow_locked: {}, slow_transfer: {} }}"#, + self.address.to_hex_literal(), self.balance, - // self.order_type, - // self.amount, - // self.price, - // self.created_at.to_rfc3339(), - // self.created_at.timestamp_micros(), - // self.filled_at.to_rfc3339(), - // self.filled_at.timestamp_micros() + self.time.version, + self.sequence_num, + self.slow_wallet_locked, + self.slow_wallet_transferred, ) } @@ -72,8 +85,7 @@ impl WarehouseAccState { WITH {list_str} AS tx_data UNWIND tx_data AS tx - MATCH (addr:Account {{address: tx.address}}) - + MERGE (addr:Account {{address: tx.address}}) MERGE (snap:Snapshot {{address: tx.address, balance: tx.balance }}) MERGE (addr)-[rel:State]->(snap) diff --git a/tests/test_load_state.rs b/tests/test_load_state.rs index 3e0d6f9..25ff746 100644 --- a/tests/test_load_state.rs +++ b/tests/test_load_state.rs @@ -1,18 +1,85 @@ - mod support; use libra_forensic_db::{ - extract_snapshot::extract_v5_snapshot, load_account_state::impl_batch_snapshot_insert, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes} + extract_snapshot::extract_v5_snapshot, + load_account_state::{impl_batch_snapshot_insert, snapshot_batch}, + neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, + schema_account_state::WarehouseAccState, +}; +use support::{ + fixtures::v5_state_manifest_fixtures_path, neo4j_testcontainer::start_neo4j_container, }; -use support::{fixtures::v5_state_manifest_fixtures_path, neo4j_testcontainer::start_neo4j_container}; + +#[tokio::test] +async fn test_snapshot_unit() -> anyhow::Result<()> { + libra_forensic_db::log_setup(); + // let manifest_file = v5_state_manifest_fixtures_path().join("state.manifest"); + // assert!(manifest_file.exists()); + // let s = extract_v5_snapshot(&manifest_file).await?; + + let snap1 = WarehouseAccState::default(); + let snap2 = WarehouseAccState::default(); + let snap3 = WarehouseAccState::default(); + let vec_snap = vec![snap1, snap2, snap3]; + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port) + .await + .expect("could not get neo4j connection pool"); + maybe_create_indexes(&graph) + .await + .expect("could start index"); + + let merged_snapshots = impl_batch_snapshot_insert(&graph, &vec_snap).await?; + assert!(merged_snapshots.created_tx == 3); + + Ok(()) +} #[tokio::test] async fn test_snapshot_batch() -> anyhow::Result<()> { libra_forensic_db::log_setup(); let manifest_file = v5_state_manifest_fixtures_path().join("state.manifest"); assert!(manifest_file.exists()); - let s = extract_v5_snapshot(&manifest_file).await?; + let vec_snap = extract_v5_snapshot(&manifest_file).await?; + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port) + .await + .expect("could not get neo4j connection pool"); + maybe_create_indexes(&graph) + .await + .expect("could start index"); + + let merged_snapshots = impl_batch_snapshot_insert(&graph, &vec_snap[..100]).await?; + assert!(merged_snapshots.created_tx == 100); + + // check DB to see what is persisted + let cypher_query = neo4rs::query( + "MATCH ()-[r:State]->() + RETURN count(r) AS count_state_edges", + ); + // Execute the query + let mut result = graph.execute(cypher_query).await?; + + // Fetch the first row only + let row = result.next().await?.unwrap(); + let count: i64 = row.get("count_state_edges").unwrap(); + assert!(count == 100i64); + + Ok(()) +} + +#[tokio::test] +async fn test_snapshot_entrypoint() -> anyhow::Result<()> { + libra_forensic_db::log_setup(); + let manifest_file = v5_state_manifest_fixtures_path().join("state.manifest"); + assert!(manifest_file.exists()); + let vec_snap = extract_v5_snapshot(&manifest_file).await?; + assert!(vec_snap.len() == 17338); let c = start_neo4j_container(); let port = c.get_host_port_ipv4(7687); @@ -23,8 +90,24 @@ async fn test_snapshot_batch() -> anyhow::Result<()> { .await .expect("could start index"); - impl_batch_snapshot_insert(&graph, &s).await?; + let merged_snapshots = snapshot_batch(&vec_snap, &graph, 1000, "test_v5_manifest").await?; + + assert!(merged_snapshots.created_tx == 17338); + + // check DB to see what is persisted + let cypher_query = neo4rs::query( + "MATCH ()-[r:State]->() + RETURN count(r) AS count_state_edges", + ); + + // Execute the query + let mut result = graph.execute(cypher_query).await?; + // Fetch the first row only + let row = result.next().await?.unwrap(); + let count: i64 = row.get("count_state_edges").unwrap(); + dbg!(&count); + assert!(count == 17338i64); Ok(()) } diff --git a/tests/test_supporting_data.rs b/tests/test_supporting_data.rs index 58e4c2c..80b459b 100644 --- a/tests/test_supporting_data.rs +++ b/tests/test_supporting_data.rs @@ -6,9 +6,9 @@ use std::path::PathBuf; use anyhow::Result; use libra_forensic_db::{ - schema_exchange_orders::{read_orders_from_file, ExchangeOrder}, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, + schema_exchange_orders::{read_orders_from_file, ExchangeOrder}, }; use neo4rs::query;