Skip to content

Commit

Permalink
steam and concurrency examples
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 27, 2024
1 parent 9c6bae5 commit bf5ca0c
Showing 1 changed file with 41 additions and 46 deletions.
87 changes: 41 additions & 46 deletions src/json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub async fn decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result<u64
Ok(created_count)
}

const MAX_CONCURRENT_PARSE: usize = 4; // Number of concurrent parsing tasks
const MAX_CONCURRENT_INSERT: usize = 2; // Number of concurrent database insert tasks
const MAX_CONCURRENT_PARSE: usize = 50; // Number of concurrent parsing tasks
const MAX_CONCURRENT_INSERT: usize = 1; // Number of concurrent database insert tasks

pub async fn concurrent_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result<u64> {
let temppath = decompress_to_temppath(tgz_file)?;
Expand Down Expand Up @@ -128,13 +128,6 @@ pub async fn concurrent_decompress_and_extract(tgz_file: &Path, pool: &Graph) ->
Ok(created_count)
}

use futures::{stream, StreamExt};
use tokio::sync::Semaphore;
use std::sync::Arc;

const MAX_CONCURRENT_PARSE: usize = 4; // Number of concurrent parsing tasks
const MAX_CONCURRENT_INSERT: usize = 2; // Number of concurrent database insert tasks

pub async fn stream_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result<u64> {
let temppath = decompress_to_temppath(tgz_file)?;
let json_vec = list_all_json_files(temppath.path())?;
Expand All @@ -145,49 +138,52 @@ pub async fn stream_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Res
let parse_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PARSE));
let insert_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_INSERT));

// Stream for parsing JSON files
let parse_stream = stream::iter(json_vec).map(|j| {
let parse_semaphore = Arc::clone(&parse_semaphore);
let insert_semaphore = Arc::clone(&insert_semaphore);
let found_count = Arc::clone(&found_count);
let created_count = Arc::clone(&created_count);
let pool = pool.clone();

async move {
let _parse_permit = parse_semaphore.acquire().await.unwrap();
if let Ok((mut records, _e)) = extract_v5_json_rescue(&j) {
let batch = records.drain(..).collect::<Vec<_>>();

if !batch.is_empty() {
let _insert_permit = insert_semaphore.acquire().await.unwrap();
let res = tx_batch(
&batch,
&pool,
QUERY_BATCH_SIZE,
j.file_name().unwrap().to_str().unwrap(),
)
.await?;
// Stream for JSON file processing
let results: Vec<_> = stream::iter(json_vec)
.map(|j| {
let parse_semaphore = Arc::clone(&parse_semaphore);
let insert_semaphore = Arc::clone(&insert_semaphore);
let found_count = Arc::clone(&found_count);
let created_count = Arc::clone(&created_count);
let pool = pool.clone();

async move {
let _parse_permit = parse_semaphore.acquire().await.unwrap();

if let Ok((mut records, _e)) = extract_v5_json_rescue(&j) {
let batch = records.drain(..).collect::<Vec<_>>();

Check failure on line 154 in src/json_rescue_v5_load.rs

View workflow job for this annotation

GitHub Actions / clippy

you seem to be trying to move all elements into a new `Vec`

if !batch.is_empty() {
let _insert_permit = insert_semaphore.acquire().await.unwrap();
let res = tx_batch(
&batch,
&pool,
QUERY_BATCH_SIZE,
j.file_name().unwrap().to_str().unwrap(),
)
.await?;

let mut fc = found_count.lock().await;
let mut cc = created_count.lock().await;
*fc += batch.len() as u64;
*cc += res.created_tx as u64;
let mut fc = found_count.lock().await;
let mut cc = created_count.lock().await;
*fc += batch.len() as u64;
*cc += res.created_tx as u64;
}
}
}
Ok::<(), anyhow::Error>(())
}
});

// Process the stream with controlled concurrency
parse_stream
.buffer_unordered(MAX_CONCURRENT_PARSE)
.for_each(|result| async {
if let Err(e) = result {
error!("Failed to process file: {:?}", e);
Ok::<(), anyhow::Error>(())
}
})
.buffer_unordered(MAX_CONCURRENT_PARSE) // Concurrency for parsing
.collect::<Vec<_>>() // Waits for all tasks to finish
.await;

// Check for errors in results
for result in results {
if let Err(e) = result {
error!("Task failed: {:?}", e);
}
}

// Gather final counts
let found_count = *found_count.lock().await;
let created_count = *created_count.lock().await;
Expand All @@ -201,7 +197,6 @@ pub async fn stream_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Res
Ok(created_count)
}


pub async fn rip(start_dir: &Path, pool: &Graph) -> Result<u64> {
let tgz_list = list_all_tgz_archives(start_dir)?;
info!("tgz archives found: {}", tgz_list.len());
Expand Down

0 comments on commit bf5ca0c

Please sign in to comment.