Skip to content

Commit

Permalink
test concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 27, 2024
1 parent dee32be commit 657f6ba
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 33 deletions.
10 changes: 7 additions & 3 deletions src/json_rescue_v5_extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ pub fn extract_v5_json_rescue(
wtxs.function = make_function_name(script);
trace!("function: {}", &wtxs.function);
if !unique_functions.contains(&wtxs.function) {
unique_functions.push(wtxs.function.clone());

unique_functions.push(wtxs.function.clone());
}

decode_transaction_args(&mut wtxs, &t.bytes)?;
Expand All @@ -64,7 +63,7 @@ pub fn extract_v5_json_rescue(
RelationLabel::Transfer(_) => tx_vec.push(wtxs),
RelationLabel::Onboarding(_) => tx_vec.push(wtxs),
RelationLabel::Vouch(_) => tx_vec.push(wtxs),
RelationLabel::Configuration => {},
RelationLabel::Configuration => {}
RelationLabel::Miner => {}
};
}
Expand Down Expand Up @@ -97,6 +96,11 @@ pub fn decode_transaction_args(wtx: &mut WarehouseTxMaster, tx_bytes: &[u8]) ->

wtx.entry_function = Some(EntryFunctionArgs::V5(sf.to_owned()));
}
ScriptFunctionCallGenesis::AutopayCreateInstruction { payee, .. } => {
wtx.relation_label =
RelationLabel::Transfer(cast_legacy_account(destination)?);
wtx.entry_function = Some(EntryFunctionArgs::V5(sf.to_owned()));
}
ScriptFunctionCallGenesis::CreateAccUser { .. } => {
// onboards self
wtx.relation_label = RelationLabel::Onboarding(wtx.sender);
Expand Down
116 changes: 91 additions & 25 deletions src/json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,42 @@ use std::path::Path;
use std::sync::Arc;
use tokio::sync::{Mutex, Semaphore};
use tokio::task;
use sysinfo::{System, SystemExt};

/// How many records to read from the archives before attempting insert
static LOAD_QUEUE_SIZE: usize = 1000;
// static LOAD_QUEUE_SIZE: usize = 1000;
/// When we attempt insert, the chunks of txs that go in to each query
static QUERY_BATCH_SIZE: usize = 250;

/// from a tgz file decompress all the .json files in archive
/// and then read into the warehouse record format
pub async fn decompress_and_extract(tgz_file: &Path, pool: &Graph) -> Result<u64> {
pub async fn single_thread_decompress_extract(tgz_file: &Path, pool: &Graph) -> Result<u64> {
let temppath = decompress_to_temppath(tgz_file)?;
let json_vec = list_all_json_files(temppath.path())?;

let mut found_count = 0u64;
let mut created_count = 0u64;

let mut unique_functions: Vec<String> = vec![];
// fill to BATCH_SIZE before attempting insert.
// many files may only have a handful of user txs,
// so individual files may have far fewer than BATCH_SIZE.
let mut queue: Vec<WarehouseTxMaster> = vec![];

for j in json_vec {
if let Ok((mut r, _e, _)) = extract_v5_json_rescue(&j) {
queue.append(&mut r);
}
let (records, _, unique) = extract_v5_json_rescue(&j)?;

queue.iter().for_each(|s| {
if !unique_functions.contains(&s.function) {
unique_functions.push(s.function.clone());
unique.iter().for_each(|f| {
if !unique_functions.contains(&f) {
unique_functions.push(f.clone());
}
});

if queue.len() >= LOAD_QUEUE_SIZE {
let drain: Vec<WarehouseTxMaster> = std::mem::take(&mut queue);

let res = tx_batch(
&drain,
pool,
QUERY_BATCH_SIZE,
j.file_name().unwrap().to_str().unwrap(),
)
.await?;
created_count += res.created_tx as u64;
found_count += drain.len() as u64;
}
let res = tx_batch(
&records,
pool,
QUERY_BATCH_SIZE,
j.file_name().unwrap().to_str().unwrap(),
)
.await?;
created_count += res.created_tx as u64;
found_count += records.len() as u64;
}

info!("V5 transactions found: {}", found_count);
Expand Down Expand Up @@ -317,3 +308,78 @@ pub async fn rip(start_dir: &Path, pool: &Graph) -> Result<u64> {
}
Ok(txs)
}

pub async fn rip_concurrent(start_dir: &Path, pool: &Graph) -> Result<()> {
let tgz_list = list_all_tgz_archives(start_dir)?;
info!("tgz archives found: {}", &tgz_list.len());

let tasks: Vec<_> = tgz_list
.into_iter()
.map(|p| {
let pool = pool.clone(); // Clone pool for each task
tokio::spawn(async move {
single_thread_decompress_extract(&p, &pool).await // Call the async function
})
})
.collect();

// Await all tasks and handle results
let results = futures::future::join_all(tasks).await;
// Check for errors
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(Ok(_)) => {
info!("Task {} completed successfully.", i);
}
Ok(Err(e)) => {
error!("Task {} failed: {:?}", i, e);
}
Err(e) => {
error!("Task {} panicked: {:?}", i, e);
}
}
}
Ok(())
}


const MAX_CONCURRENT_TASKS: usize = 4; // Define the limit for concurrent tasks

pub async fn rip_concurrent_limited(start_dir: &Path, pool: &Graph) -> Result<()> {
let tgz_list = list_all_tgz_archives(start_dir)?;
info!("tgz archives found: {}", tgz_list.len());

let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_TASKS)); // Semaphore to limit concurrency
let mut tasks = vec![];

for p in tgz_list.into_iter() {
let pool = pool.clone(); // Clone pool for each task
let semaphore = Arc::clone(&semaphore); // Clone semaphore for each task

let task = tokio::spawn(async move {
let _permit = semaphore.acquire().await; // Acquire semaphore permit
single_thread_decompress_extract(&p, &pool).await // Perform the task
});

tasks.push(task);
}

// Await all tasks and handle results
let results = futures::future::join_all(tasks).await;

for (i, result) in results.into_iter().enumerate() {
match result {
Ok(Ok(_)) => {
info!("Task {} completed successfully.", i);
}
Ok(Err(e)) => {
error!("Task {} failed: {:?}", i, e);
}
Err(e) => {
error!("Task {} panicked: {:?}", i, e);
}
}
}

Ok(())
}
2 changes: 1 addition & 1 deletion src/warehouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl WarehouseCli {
Sub::VersionFiveTx { archive_dir } => {
let pool = try_db_connection_pool(self).await?;

json_rescue_v5_load::rip(archive_dir, &pool).await?;
json_rescue_v5_load::rip_concurrent_limited(archive_dir, &pool).await?;
}
};
Ok(())
Expand Down
10 changes: 6 additions & 4 deletions tests/test_json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn test_load_all_tgz() -> anyhow::Result<()> {

let path = fixtures::v5_json_tx_path().join("0-99900.tgz");

let tx_count = json_rescue_v5_load::decompress_and_extract(&path, &pool).await?;
let tx_count = json_rescue_v5_load::single_thread_decompress_extract(&path, &pool).await?;

assert!(tx_count == 5244);

Expand Down Expand Up @@ -89,13 +89,15 @@ async fn test_load_entrypoint() -> anyhow::Result<()> {

let path = fixtures::v5_json_tx_path();

let tx_count = json_rescue_v5_load::rip(&path, &pool).await?;
dbg!(&tx_count);
assert!(tx_count == 13);
json_rescue_v5_load::rip_concurrent_limited(&path, &pool).await?;
// dbg!(&tx_count);
// assert!(tx_count == 13);

Ok(())
}



#[tokio::test]
async fn test_rescue_v5_parse_set_wallet_tx() -> anyhow::Result<()> {
libra_forensic_db::log_setup();
Expand Down

0 comments on commit 657f6ba

Please sign in to comment.