diff --git a/nifi/user-scripts/cogstack_cohort_generate_data.py b/nifi/user-scripts/cogstack_cohort_generate_data.py index 22612219..c0100427 100644 --- a/nifi/user-scripts/cogstack_cohort_generate_data.py +++ b/nifi/user-scripts/cogstack_cohort_generate_data.py @@ -181,11 +181,14 @@ def process_annotation_records(annotation_records: list, _doc2ptt: dict): with Pool(processes=CPU_THREADS) as patient_process_pool: results = list() + rec_que = Queue() + record_chunks = list(chunk(input_patient_record_data, CPU_THREADS)) counter = 0 for record_chunk in record_chunks: - patient_process_pool_results.append(patient_process_pool.starmap_async(process_patient_records, [(record_chunk,)], chunksize=1, error_callback=logging.error)) + rec_que.put(record_chunk) + patient_process_pool_results.append(patient_process_pool.starmap_async(process_patient_records, [(rec_que.get(),)], chunksize=1, error_callback=logging.error)) counter += 1 try: @@ -207,11 +210,14 @@ def process_annotation_records(annotation_records: list, _doc2ptt: dict): with Pool(processes=CPU_THREADS) as annotations_process_pool: results = list() + rec_que = Queue() + record_chunks = list(chunk(input_annotations, CPU_THREADS)) counter = 0 for record_chunk in record_chunks: - annotation_process_pool_results.append(annotations_process_pool.starmap_async(process_annotation_records, [(record_chunk, doc2ptt )], chunksize=1, error_callback=logging.error)) + rec_que.put(record_chunk) + annotation_process_pool_results.append(annotations_process_pool.starmap_async(process_annotation_records, [(rec_que.get(), doc2ptt )], chunksize=1, error_callback=logging.error)) counter += 1 try: