Skip to content

Commit

Permalink
better progress counter
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 28, 2024
1 parent 9714bce commit 4cd266b
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions src/json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use log::{error, info, trace, warn};
use neo4rs::Graph;
use std::sync::Arc;
use std::{path::Path, thread::available_parallelism};
use tokio::sync::Semaphore;
use tokio::sync::{Mutex, Semaphore};

/// How many records to read from the archives before attempting insert
// static LOAD_QUEUE_SIZE: usize = 1000;
Expand All @@ -32,7 +32,10 @@ pub async fn single_thread_decompress_extract(tgz_file: &Path, pool: &Graph) ->
let archive_id = j.file_name().unwrap().to_str().unwrap();
let complete = queue::are_all_completed(pool, archive_id).await?;
if complete {
trace!("skip parsing {}, this file was loaded successfully", archive_id);
trace!(
"skip parsing {}, this file was loaded successfully",
archive_id
);
continue;
}

Expand Down Expand Up @@ -70,15 +73,19 @@ pub async fn rip_concurrent_limited(
info!("tgz archives found: {}", archives_count);

let semaphore = Arc::new(Semaphore::new(threads)); // Semaphore to limit concurrency
let progress = Arc::new(Mutex::new(0u64));
let mut tasks = vec![];

for (n, p) in tgz_list.into_iter().enumerate() {
for p in tgz_list.into_iter() {
let pool = pool.clone(); // Clone pool for each task
let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task
let progress = Arc::clone(&progress); // Clone semaphore for each task

let task = tokio::spawn(async move {
let _permit = semaphore.acquire().await; // Acquire semaphore permit
let mut n = progress.lock().await;
info!("PROGRESS: {n}/{archives_count}");
*n += 1;
single_thread_decompress_extract(&p, &pool).await // Perform the task
});

Expand Down

0 comments on commit 4cd266b

Please sign in to comment.