Skip to content

Commit

Permalink
NiFi: cohort mp fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Feb 26, 2024
1 parent c46b2a1 commit 877ee7f
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions nifi/user-scripts/cogstack_cohort_generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,14 @@ def process_annotation_records(annotation_records: list, _doc2ptt: dict):
with Pool(processes=CPU_THREADS) as patient_process_pool:
results = list()

rec_que = Queue()

record_chunks = list(chunk(input_patient_record_data, CPU_THREADS))

counter = 0
for record_chunk in record_chunks:
patient_process_pool_results.append(patient_process_pool.starmap_async(process_patient_records, [(record_chunk,)], chunksize=1, error_callback=logging.error))
rec_que.put(record_chunk)
patient_process_pool_results.append(patient_process_pool.starmap_async(process_patient_records, [(rec_que.get(),)], chunksize=1, error_callback=logging.error))
counter += 1

try:
Expand All @@ -207,11 +210,14 @@ def process_annotation_records(annotation_records: list, _doc2ptt: dict):
with Pool(processes=CPU_THREADS) as annotations_process_pool:
results = list()

rec_que = Queue()

record_chunks = list(chunk(input_annotations, CPU_THREADS))

counter = 0
for record_chunk in record_chunks:
annotation_process_pool_results.append(annotations_process_pool.starmap_async(process_annotation_records, [(record_chunk, doc2ptt )], chunksize=1, error_callback=logging.error))
rec_que.put(record_chunk)
annotation_process_pool_results.append(annotations_process_pool.starmap_async(process_annotation_records, [(rec_que.get(), doc2ptt )], chunksize=1, error_callback=logging.error))
counter += 1

try:
Expand Down

0 comments on commit 877ee7f

Please sign in to comment.