From 657f6bacb10586b3b0f80ab054404a64f20b3a9a Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Wed, 27 Nov 2024 16:40:45 -0500 Subject: [PATCH] test concurrent --- src/json_rescue_v5_extract.rs | 10 ++- src/json_rescue_v5_load.rs | 116 +++++++++++++++++++++++------- src/warehouse_cli.rs | 2 +- tests/test_json_rescue_v5_load.rs | 10 +-- 4 files changed, 105 insertions(+), 33 deletions(-) diff --git a/src/json_rescue_v5_extract.rs b/src/json_rescue_v5_extract.rs index 4fde4ff..12f72c2 100644 --- a/src/json_rescue_v5_extract.rs +++ b/src/json_rescue_v5_extract.rs @@ -48,8 +48,7 @@ pub fn extract_v5_json_rescue( wtxs.function = make_function_name(script); trace!("function: {}", &wtxs.function); if !unique_functions.contains(&wtxs.function) { - unique_functions.push(wtxs.function.clone()); - + unique_functions.push(wtxs.function.clone()); } decode_transaction_args(&mut wtxs, &t.bytes)?; @@ -64,7 +63,7 @@ pub fn extract_v5_json_rescue( RelationLabel::Transfer(_) => tx_vec.push(wtxs), RelationLabel::Onboarding(_) => tx_vec.push(wtxs), RelationLabel::Vouch(_) => tx_vec.push(wtxs), - RelationLabel::Configuration => {}, + RelationLabel::Configuration => {} RelationLabel::Miner => {} }; } @@ -97,6 +96,11 @@ pub fn decode_transaction_args(wtx: &mut WarehouseTxMaster, tx_bytes: &[u8]) -> wtx.entry_function = Some(EntryFunctionArgs::V5(sf.to_owned())); } + ScriptFunctionCallGenesis::AutopayCreateInstruction { payee, .. } => { + wtx.relation_label = + RelationLabel::Transfer(cast_legacy_account(destination)?); + wtx.entry_function = Some(EntryFunctionArgs::V5(sf.to_owned())); + } ScriptFunctionCallGenesis::CreateAccUser { .. } => { // onboards self wtx.relation_label = RelationLabel::Onboarding(wtx.sender); diff --git a/src/json_rescue_v5_load.rs b/src/json_rescue_v5_load.rs index c7c19f7..179b41d 100644 --- a/src/json_rescue_v5_load.rs +++ b/src/json_rescue_v5_load.rs @@ -13,15 +13,16 @@ use std::path::Path; use std::sync::Arc; use tokio::sync::{Mutex, Semaphore}; use tokio::task; +use sysinfo::{System, SystemExt}; /// How many records to read from the archives before attempting insert -static LOAD_QUEUE_SIZE: usize = 1000; +// static LOAD_QUEUE_SIZE: usize = 1000; /// When we attempt insert, the chunks of txs that go in to each query static QUERY_BATCH_SIZE: usize = 250; /// from a tgz file decompress all the .json files in archive /// and then read into the warehouse record format -pub async fn decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result { +pub async fn single_thread_decompress_extract(tgz_file: &Path, pool: &Graph) -> Result { let temppath = decompress_to_temppath(tgz_file)?; let json_vec = list_all_json_files(temppath.path())?; @@ -29,35 +30,25 @@ pub async fn decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result = vec![]; - // fill to BATCH_SIZE before attempting insert. - // many files may only have a handful of user txs, - // so individual files may have far fewer than BATCH_SIZE. - let mut queue: Vec = vec![]; for j in json_vec { - if let Ok((mut r, _e, _)) = extract_v5_json_rescue(&j) { - queue.append(&mut r); - } + let (records, _, unique) = extract_v5_json_rescue(&j)?; - queue.iter().for_each(|s| { - if !unique_functions.contains(&s.function) { - unique_functions.push(s.function.clone()); + unique.iter().for_each(|f| { + if !unique_functions.contains(&f) { + unique_functions.push(f.clone()); } }); - if queue.len() >= LOAD_QUEUE_SIZE { - let drain: Vec = std::mem::take(&mut queue); - - let res = tx_batch( - &drain, - pool, - QUERY_BATCH_SIZE, - j.file_name().unwrap().to_str().unwrap(), - ) - .await?; - created_count += res.created_tx as u64; - found_count += drain.len() as u64; - } + let res = tx_batch( + &records, + pool, + QUERY_BATCH_SIZE, + j.file_name().unwrap().to_str().unwrap(), + ) + .await?; + created_count += res.created_tx as u64; + found_count += records.len() as u64; } info!("V5 transactions found: {}", found_count); @@ -317,3 +308,78 @@ pub async fn rip(start_dir: &Path, pool: &Graph) -> Result { } Ok(txs) } + +pub async fn rip_concurrent(start_dir: &Path, pool: &Graph) -> Result<()> { + let tgz_list = list_all_tgz_archives(start_dir)?; + info!("tgz archives found: {}", &tgz_list.len()); + + let tasks: Vec<_> = tgz_list + .into_iter() + .map(|p| { + let pool = pool.clone(); // Clone pool for each task + tokio::spawn(async move { + single_thread_decompress_extract(&p, &pool).await // Call the async function + }) + }) + .collect(); + + // Await all tasks and handle results + let results = futures::future::join_all(tasks).await; + // Check for errors + for (i, result) in results.into_iter().enumerate() { + match result { + Ok(Ok(_)) => { + info!("Task {} completed successfully.", i); + } + Ok(Err(e)) => { + error!("Task {} failed: {:?}", i, e); + } + Err(e) => { + error!("Task {} panicked: {:?}", i, e); + } + } + } + Ok(()) +} + + +const MAX_CONCURRENT_TASKS: usize = 4; // Define the limit for concurrent tasks + +pub async fn rip_concurrent_limited(start_dir: &Path, pool: &Graph) -> Result<()> { + let tgz_list = list_all_tgz_archives(start_dir)?; + info!("tgz archives found: {}", tgz_list.len()); + + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_TASKS)); // Semaphore to limit concurrency + let mut tasks = vec![]; + + for p in tgz_list.into_iter() { + let pool = pool.clone(); // Clone pool for each task + let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task + + let task = tokio::spawn(async move { + let _permit = semaphore.acquire().await; // Acquire semaphore permit + single_thread_decompress_extract(&p, &pool).await // Perform the task + }); + + tasks.push(task); + } + + // Await all tasks and handle results + let results = futures::future::join_all(tasks).await; + + for (i, result) in results.into_iter().enumerate() { + match result { + Ok(Ok(_)) => { + info!("Task {} completed successfully.", i); + } + Ok(Err(e)) => { + error!("Task {} failed: {:?}", i, e); + } + Err(e) => { + error!("Task {} panicked: {:?}", i, e); + } + } + } + + Ok(()) +} diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index 5cea9a8..7868c0b 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -174,7 +174,7 @@ impl WarehouseCli { Sub::VersionFiveTx { archive_dir } => { let pool = try_db_connection_pool(self).await?; - json_rescue_v5_load::rip(archive_dir, &pool).await?; + json_rescue_v5_load::rip_concurrent_limited(archive_dir, &pool).await?; } }; Ok(()) diff --git a/tests/test_json_rescue_v5_load.rs b/tests/test_json_rescue_v5_load.rs index 74b7bd8..61ba691 100644 --- a/tests/test_json_rescue_v5_load.rs +++ b/tests/test_json_rescue_v5_load.rs @@ -23,7 +23,7 @@ async fn test_load_all_tgz() -> anyhow::Result<()> { let path = fixtures::v5_json_tx_path().join("0-99900.tgz"); - let tx_count = json_rescue_v5_load::decompress_and_extract(&path, &pool).await?; + let tx_count = json_rescue_v5_load::single_thread_decompress_extract(&path, &pool).await?; assert!(tx_count == 5244); @@ -89,13 +89,15 @@ async fn test_load_entrypoint() -> anyhow::Result<()> { let path = fixtures::v5_json_tx_path(); - let tx_count = json_rescue_v5_load::rip(&path, &pool).await?; - dbg!(&tx_count); - assert!(tx_count == 13); + json_rescue_v5_load::rip_concurrent_limited(&path, &pool).await?; + // dbg!(&tx_count); + // assert!(tx_count == 13); Ok(()) } + + #[tokio::test] async fn test_rescue_v5_parse_set_wallet_tx() -> anyhow::Result<()> { libra_forensic_db::log_setup();