Skip to content

Commit

Permalink
count tokio tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 28, 2024
1 parent 4cd266b commit 21483e9
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use log::{error, info, trace, warn};
use neo4rs::Graph;
use std::sync::Arc;
use std::{path::Path, thread::available_parallelism};
use tokio::sync::{Mutex, Semaphore};
use tokio::{
runtime::Handle,
sync::{Semaphore},
};

/// How many records to read from the archives before attempting insert
// static LOAD_QUEUE_SIZE: usize = 1000;
Expand Down Expand Up @@ -73,19 +76,17 @@ pub async fn rip_concurrent_limited(
info!("tgz archives found: {}", archives_count);

let semaphore = Arc::new(Semaphore::new(threads)); // Semaphore to limit concurrency
let progress = Arc::new(Mutex::new(0u64));
let mut tasks = vec![];

for p in tgz_list.into_iter() {
let pool = pool.clone(); // Clone pool for each task
let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task
let progress = Arc::clone(&progress); // Clone semaphore for each task

let task = tokio::spawn(async move {
let _permit = semaphore.acquire().await; // Acquire semaphore permit
let mut n = progress.lock().await;
info!("PROGRESS: {n}/{archives_count}");
*n += 1;
let metrics = Handle::current().metrics();
let n = metrics.spawned_tasks_count();
println!("Runtime has had {} tasks spawned", n);
single_thread_decompress_extract(&p, &pool).await // Perform the task
});

Expand Down

0 comments on commit 21483e9

Please sign in to comment.