Skip to content

Commit

Permalink
try alternative two stream
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 27, 2024
1 parent e21ebf4 commit dee32be
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
80 changes: 79 additions & 1 deletion src/json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,90 @@ pub async fn stream_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Res
Ok(created_count)
}

pub async fn alt_stream_decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result<u64> {
let temppath = decompress_to_temppath(tgz_file)?;
let json_vec = list_all_json_files(temppath.path())?;

let found_count = Arc::new(tokio::sync::Mutex::new(0u64));
let created_count = Arc::new(tokio::sync::Mutex::new(0u64));

let parse_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PARSE));
let insert_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_INSERT));

// Parsing Stream
let parsed_stream = stream::iter(json_vec).map({
let parse_semaphore = Arc::clone(&parse_semaphore);

move |j| {
let parse_semaphore = Arc::clone(&parse_semaphore);
async move {
let _parse_permit = parse_semaphore.acquire().await;
extract_v5_json_rescue(&j)
.map(|(records, _, _)| records)
.ok()
}
}
});

// Insertion Stream
let results = parsed_stream
.buffer_unordered(MAX_CONCURRENT_PARSE)
.filter_map(|opt_records| async move { opt_records })
.flat_map(|records| stream::iter(records.into_iter().collect::<Vec<_>>()))
.chunks(QUERY_BATCH_SIZE) // Batch insert chunks
.map({
let insert_semaphore = Arc::clone(&insert_semaphore);
let found_count = Arc::clone(&found_count);
let created_count = Arc::clone(&created_count);
let pool = pool.clone();

move |batch| {
let insert_semaphore = Arc::clone(&insert_semaphore);
let found_count = Arc::clone(&found_count);
let created_count = Arc::clone(&created_count);
let pool = pool.clone();

async move {
let _insert_permit = insert_semaphore.acquire().await;
let res = tx_batch(&batch, &pool, QUERY_BATCH_SIZE, "batch").await?;
*found_count.lock().await += batch.len() as u64;
*created_count.lock().await += res.created_tx as u64;
Ok::<(), anyhow::Error>(())
}
}
})
.buffer_unordered(MAX_CONCURRENT_INSERT)
.collect::<Vec<_>>() // Collect all results
.await;

// Log Errors
for result in results {
if let Err(e) = result {
error!("Failed batch insert: {:?}", e);
}
}

// Final Logging
let found = found_count.lock().await;
let created = created_count.lock().await;
info!("V5 transactions found: {}", found);
info!("V5 transactions processed: {}", created);
if *found != *created {
error!(
"Mismatch: transactions loaded ({}) vs extracted ({})",
created, found
);
}

Ok(*created)
}

pub async fn rip(start_dir: &Path, pool: &Graph) -> Result<u64> {
let tgz_list = list_all_tgz_archives(start_dir)?;
info!("tgz archives found: {}", tgz_list.len());
let mut txs = 0u64;
for p in tgz_list.iter() {
match stream_decompress_and_extract(p, pool).await {
match alt_stream_decompress_and_extract(p, pool).await {
Ok(t) => txs += t,
Err(e) => {
error!(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn test_stream_load_all_tgz() -> anyhow::Result<()> {

let path = fixtures::v5_json_tx_path().join("0-99900.tgz");

let tx_count = json_rescue_v5_load::stream_decompress_and_extract(&path, &pool).await?;
let tx_count = json_rescue_v5_load::alt_stream_decompress_and_extract(&path, &pool).await?;

assert!(tx_count == 13);

Expand Down

0 comments on commit dee32be

Please sign in to comment.