From ee2b266df7149ce97e32afc1fc0ab05f9d3efb67 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Wed, 27 Nov 2024 20:15:20 -0500 Subject: [PATCH] smarter queue --- src/json_rescue_v5_load.rs | 329 +++--------------------------- src/queue.rs | 28 +++ src/warehouse_cli.rs | 11 +- tests/test_json_rescue_v5_load.rs | 46 +---- 4 files changed, 65 insertions(+), 349 deletions(-) diff --git a/src/json_rescue_v5_load.rs b/src/json_rescue_v5_load.rs index 6be78db..998cec7 100644 --- a/src/json_rescue_v5_load.rs +++ b/src/json_rescue_v5_load.rs @@ -3,16 +3,14 @@ use crate::{ decompress_to_temppath, extract_v5_json_rescue, list_all_json_files, list_all_tgz_archives, }, load_tx_cypher::tx_batch, - schema_transaction::WarehouseTxMaster, + queue::{self}, }; use anyhow::Result; -use futures::{stream, StreamExt}; -use log::{error, info}; +use log::{error, info, warn}; use neo4rs::Graph; -use std::path::Path; use std::sync::Arc; -use tokio::sync::{Mutex, Semaphore}; -use tokio::task; +use std::{path::Path, thread::available_parallelism}; +use tokio::sync::Semaphore; /// How many records to read from the archives before attempting insert // static LOAD_QUEUE_SIZE: usize = 1000; @@ -31,331 +29,56 @@ pub async fn single_thread_decompress_extract(tgz_file: &Path, pool: &Graph) -> let mut unique_functions: Vec = vec![]; for j in json_vec { + let archive_id = j.file_name().unwrap().to_str().unwrap(); + let complete = queue::are_all_completed(pool, archive_id).await?; + if complete { + info!("skip parsing, this file was loaded successfully"); + continue; + } + let (records, _, unique) = extract_v5_json_rescue(&j)?; unique.iter().for_each(|f| { - if !unique_functions.contains(&f) { + if !unique_functions.contains(f) { unique_functions.push(f.clone()); } }); - let res = tx_batch( - &records, - pool, - QUERY_BATCH_SIZE, - j.file_name().unwrap().to_str().unwrap(), - ) - .await?; + let res = tx_batch(&records, pool, QUERY_BATCH_SIZE, archive_id).await?; created_count += res.created_tx as u64; found_count += records.len() as u64; } info!("V5 transactions found: {}", found_count); - info!("V5 transactions processed: {}", created_count); + info!("V5 transactions inserted: {}", created_count); if found_count != created_count { warn!("transactions loaded don't match transactions extracted, perhaps previously loaded?"); } Ok(created_count) } -const MAX_CONCURRENT_PARSE: usize = 24; // Number of concurrent parsing tasks -const MAX_CONCURRENT_INSERT: usize = 4; // Number of concurrent database insert tasks - -pub async fn concurrent_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result { - let temppath = decompress_to_temppath(tgz_file)?; - let json_vec = list_all_json_files(temppath.path())?; - - let found_count = Arc::new(tokio::sync::Mutex::new(0u64)); - let created_count = Arc::new(tokio::sync::Mutex::new(0u64)); - - let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_INSERT)); - let parse_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PARSE)); - - let tasks = json_vec.into_iter().map(|j| { - let semaphore = Arc::clone(&semaphore); - let parse_semaphore = Arc::clone(&parse_semaphore); - let found_count = Arc::clone(&found_count); - let created_count = Arc::clone(&created_count); - let pool = pool.clone(); - - task::spawn(async move { - let _permit = parse_semaphore.acquire().await.unwrap(); // Control parsing concurrency - if let Ok((mut r, _e, _)) = extract_v5_json_rescue(&j) { - let drain: Vec = std::mem::take(&mut r); - - if !drain.is_empty() { - let _db_permit = semaphore.acquire().await.unwrap(); // Control DB insert concurrency - let res = tx_batch( - &drain, - &pool, - QUERY_BATCH_SIZE, - j.file_name().unwrap().to_str().unwrap(), - ) - .await?; - { - let mut fc = found_count.lock().await; - let mut cc = created_count.lock().await; - *fc += drain.len() as u64; - *cc += res.created_tx as u64; - } - } - } - Ok::<(), anyhow::Error>(()) - }) - }); - - // Collect all results - let results: Vec<_> = futures::future::join_all(tasks).await; - - // Check for errors in tasks - for result in results { - if let Err(e) = result { - error!("Task failed: {:?}", e); - } - } - - let found_count = *found_count.lock().await; - let created_count = *created_count.lock().await; - - info!("V5 transactions found: {}", found_count); - info!("V5 transactions processed: {}", created_count); - if found_count != created_count { - error!("transactions loaded don't match transactions extracted"); - } - - Ok(created_count) -} - -pub async fn stream_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result { - let temppath = decompress_to_temppath(tgz_file)?; - let json_vec = list_all_json_files(temppath.path())?; - - let found_count = Arc::new(Mutex::new(0u64)); - let created_count = Arc::new(Mutex::new(0u64)); - - let v: Vec = vec![]; - let unique_functions = Arc::new(Mutex::new(v)); - - let parse_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PARSE)); - let insert_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_INSERT)); - - // Stream for JSON file processing - let _results: Vec<_> = stream::iter(json_vec) - .map(|j| { - let parse_semaphore = Arc::clone(&parse_semaphore); - let insert_semaphore = Arc::clone(&insert_semaphore); - let found_count = Arc::clone(&found_count); - let created_count = Arc::clone(&created_count); - let unique_functions = Arc::clone(&unique_functions); - - let pool = pool.clone(); - - async move { - let _parse_permit = parse_semaphore.acquire().await.unwrap(); - - if let Ok((records, _e, unique_fun)) = extract_v5_json_rescue(&j) { - // let batch = std::mem::take(&mut records); - - if !records.is_empty() { - let _insert_permit = insert_semaphore.acquire().await.unwrap(); - let res = tx_batch( - &records, - &pool, - QUERY_BATCH_SIZE, - j.file_name().unwrap().to_str().unwrap(), - ) - .await?; - - let mut uf = unique_functions.lock().await; - for f in &unique_fun { - if !uf.contains(&f) { - uf.push(f.to_owned()); - } - } - - let mut fc = found_count.lock().await; - let mut cc = created_count.lock().await; - - *fc += records.len() as u64; - *cc += res.created_tx; - } - } - - Ok::<(), anyhow::Error>(()) - } - }) - .buffer_unordered(MAX_CONCURRENT_PARSE) // Concurrency for parsing - .collect() // Waits for all tasks to finish - .await; - - // // Check for errors in results - // for result in results { - // if let Err(e) = result { - // error!("Task failed: {:?}", e); - // } - // } - - // Gather final counts - let found_count = *found_count.lock().await; - let created_count = *created_count.lock().await; - let unique_functions = unique_functions.lock().await; - - info!("unique functions found: {:?}", unique_functions); - - info!("V5 transactions found: {}", found_count); - info!("V5 transactions processed: {}", created_count); - if found_count != created_count { - error!("transactions loaded don't match transactions extracted"); - } - - Ok(created_count) -} - -pub async fn alt_stream_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result { - let temppath = decompress_to_temppath(tgz_file)?; - let json_vec = list_all_json_files(temppath.path())?; - - let found_count = Arc::new(tokio::sync::Mutex::new(0u64)); - let created_count = Arc::new(tokio::sync::Mutex::new(0u64)); - - let parse_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PARSE)); - let insert_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_INSERT)); - - // Parsing Stream - let parsed_stream = stream::iter(json_vec).map({ - let parse_semaphore = Arc::clone(&parse_semaphore); - - move |j| { - let parse_semaphore = Arc::clone(&parse_semaphore); - async move { - let _parse_permit = parse_semaphore.acquire().await; - extract_v5_json_rescue(&j) - .map(|(records, _, _)| records) - .ok() - } - } - }); - - // Insertion Stream - let results = parsed_stream - .buffer_unordered(MAX_CONCURRENT_PARSE) - .filter_map(|opt_records| async move { opt_records }) - .flat_map(|records| stream::iter(records.into_iter().collect::>())) - .chunks(QUERY_BATCH_SIZE) // Batch insert chunks - .map({ - let insert_semaphore = Arc::clone(&insert_semaphore); - let found_count = Arc::clone(&found_count); - let created_count = Arc::clone(&created_count); - let pool = pool.clone(); - - move |batch| { - let insert_semaphore = Arc::clone(&insert_semaphore); - let found_count = Arc::clone(&found_count); - let created_count = Arc::clone(&created_count); - let pool = pool.clone(); - - async move { - let _insert_permit = insert_semaphore.acquire().await; - let res = tx_batch(&batch, &pool, QUERY_BATCH_SIZE, "batch").await?; - *found_count.lock().await += batch.len() as u64; - *created_count.lock().await += res.created_tx as u64; - Ok::<(), anyhow::Error>(()) - } - } - }) - .buffer_unordered(MAX_CONCURRENT_INSERT) - .collect::>() // Collect all results - .await; - - // Log Errors - for result in results { - if let Err(e) = result { - error!("Failed batch insert: {:?}", e); - } - } - - // Final Logging - let found = found_count.lock().await; - let created = created_count.lock().await; - info!("V5 transactions found: {}", found); - info!("V5 transactions processed: {}", created); - if *found != *created { - error!( - "Mismatch: transactions loaded ({}) vs extracted ({})", - created, found - ); - } - - Ok(*created) -} - -pub async fn rip(start_dir: &Path, pool: &Graph) -> Result { - let tgz_list = list_all_tgz_archives(start_dir)?; - info!("tgz archives found: {}", tgz_list.len()); - let mut txs = 0u64; - for p in tgz_list.iter() { - match alt_stream_decompress_and_extract(p, pool).await { - Ok(t) => txs += t, - Err(e) => { - error!( - "could not load archive: {}, error: {}", - p.display(), - e.to_string() - ); - } - } - } - Ok(txs) -} - -pub async fn rip_concurrent(start_dir: &Path, pool: &Graph) -> Result<()> { - let tgz_list = list_all_tgz_archives(start_dir)?; - info!("tgz archives found: {}", &tgz_list.len()); - - let tasks: Vec<_> = tgz_list - .into_iter() - .map(|p| { - let pool = pool.clone(); // Clone pool for each task - tokio::spawn(async move { - single_thread_decompress_extract(&p, &pool).await // Call the async function - }) - }) - .collect(); - - // Await all tasks and handle results - let results = futures::future::join_all(tasks).await; - // Check for errors - for (i, result) in results.into_iter().enumerate() { - match result { - Ok(Ok(_)) => { - info!("Task {} completed successfully.", i); - } - Ok(Err(e)) => { - error!("Task {} failed: {:?}", i, e); - } - Err(e) => { - error!("Task {} panicked: {:?}", i, e); - } - } - } - Ok(()) -} - -const MAX_CONCURRENT_TASKS: usize = 4; // Define the limit for concurrent tasks +pub async fn rip_concurrent_limited( + start_dir: &Path, + pool: &Graph, + threads: Option, +) -> Result<()> { + let threads = threads.unwrap_or(available_parallelism().unwrap().get()); + info!("concurrent threads used: {}", threads); -pub async fn rip_concurrent_limited(start_dir: &Path, pool: &Graph) -> Result<()> { let tgz_list = list_all_tgz_archives(start_dir)?; - info!("tgz archives found: {}", tgz_list.len()); + let archives_count = tgz_list.len(); + info!("tgz archives found: {}", archives_count); - let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_TASKS)); // Semaphore to limit concurrency + let semaphore = Arc::new(Semaphore::new(threads)); // Semaphore to limit concurrency let mut tasks = vec![]; - for p in tgz_list.into_iter() { + for (n, p) in tgz_list.into_iter().enumerate() { let pool = pool.clone(); // Clone pool for each task let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task let task = tokio::spawn(async move { let _permit = semaphore.acquire().await; // Acquire semaphore permit + info!("PROGRESS: {n}/{archives_count}"); single_thread_decompress_extract(&p, &pool).await // Perform the task }); diff --git a/src/queue.rs b/src/queue.rs index cabeb67..846940e 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -86,6 +86,34 @@ pub async fn is_batch_complete( } } +// Three options: Not found in DB, found and complete, found and incomplete +pub async fn are_all_completed(pool: &Graph, archive_id: &str) -> Result { + let cypher_string = format!( + r#" + MATCH (a:Queue {{archive_id: {} }}) + RETURN CASE + WHEN COUNT(a) = 0 THEN false + ELSE ALL(a.completed IN COLLECT(a.completed)) + END AS allCompleted; + "#, + archive_id, + ); + + let cypher_query = neo4rs::query(&cypher_string); + + let mut res = pool + .execute(cypher_query) + .await + .context("execute query error")?; + + if let Some(row) = res.next().await? { + // Extract `archive_id` as a String + Ok(row.get::("allCompleted")?) + } else { + bail!("not found") + } +} + // clear queue pub async fn clear_queue(pool: &Graph) -> Result<()> { let cypher_string = r#" diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index 7868c0b..fdcf321 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -33,6 +33,10 @@ pub struct WarehouseCli { /// force clear queue clear_queue: bool, + #[clap(long, short('t'))] + /// max tasks to run in parallel + threads: Option, + #[clap(subcommand)] command: Sub, } @@ -174,7 +178,12 @@ impl WarehouseCli { Sub::VersionFiveTx { archive_dir } => { let pool = try_db_connection_pool(self).await?; - json_rescue_v5_load::rip_concurrent_limited(archive_dir, &pool).await?; + json_rescue_v5_load::rip_concurrent_limited( + archive_dir, + &pool, + self.threads.to_owned(), + ) + .await?; } }; Ok(()) diff --git a/tests/test_json_rescue_v5_load.rs b/tests/test_json_rescue_v5_load.rs index f5cbf3a..b506734 100644 --- a/tests/test_json_rescue_v5_load.rs +++ b/tests/test_json_rescue_v5_load.rs @@ -30,50 +30,6 @@ async fn test_load_all_tgz() -> anyhow::Result<()> { Ok(()) } -#[tokio::test] -async fn test_concurrent_load_all_tgz() -> anyhow::Result<()> { - libra_forensic_db::log_setup(); - - let c = start_neo4j_container(); - let port = c.get_host_port_ipv4(7687); - let pool = get_neo4j_localhost_pool(port) - .await - .expect("could not get neo4j connection pool"); - maybe_create_indexes(&pool) - .await - .expect("could start index"); - - let path = fixtures::v5_json_tx_path().join("0-99900.tgz"); - - let tx_count = json_rescue_v5_load::concurrent_decompress_and_extract(&path, &pool).await?; - - assert!(tx_count == 5244); - - Ok(()) -} - -#[tokio::test] -async fn test_stream_load_all_tgz() -> anyhow::Result<()> { - libra_forensic_db::log_setup(); - - let c = start_neo4j_container(); - let port = c.get_host_port_ipv4(7687); - let pool = get_neo4j_localhost_pool(port) - .await - .expect("could not get neo4j connection pool"); - maybe_create_indexes(&pool) - .await - .expect("could start index"); - - let path = fixtures::v5_json_tx_path().join("0-99900.tgz"); - - let tx_count = json_rescue_v5_load::alt_stream_decompress_and_extract(&path, &pool).await?; - - assert!(tx_count == 13); - - Ok(()) -} - #[tokio::test] async fn test_load_entrypoint() -> anyhow::Result<()> { libra_forensic_db::log_setup(); @@ -89,7 +45,7 @@ async fn test_load_entrypoint() -> anyhow::Result<()> { let path = fixtures::v5_json_tx_path(); - json_rescue_v5_load::rip_concurrent_limited(&path, &pool).await?; + json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?; // dbg!(&tx_count); // assert!(tx_count == 13);