diff --git a/eva_cttv_pipeline/trait_mapping/main.py b/eva_cttv_pipeline/trait_mapping/main.py index 65a9cdb4..454a4c07 100644 --- a/eva_cttv_pipeline/trait_mapping/main.py +++ b/eva_cttv_pipeline/trait_mapping/main.py @@ -1,7 +1,7 @@ from collections import Counter import csv import logging -import progressbar +import multiprocessing from eva_cttv_pipeline.trait_mapping.output import output_trait from eva_cttv_pipeline.trait_mapping.oxo import get_oxo_results @@ -30,8 +30,7 @@ def get_uris_for_oxo(zooma_result_list: list) -> set: return uri_set -def process_trait(trait: Trait, filters: dict, zooma_host: str, oxo_target_list: list, - oxo_distance: int) -> Trait: +def process_trait(trait: Trait, filters: dict, zooma_host: str, oxo_target_list: list, oxo_distance: int) -> Trait: """ Process a single trait. Find any mappings in Zooma. If there are no high confidence Zooma mappings that are in EFO then query OxO with any high confidence mappings not in EFO. @@ -68,11 +67,12 @@ def process_trait(trait: Trait, filters: dict, zooma_host: str, oxo_target_list: return trait -def main(input_filepath, output_mappings_filepath, output_curation_filepath, filters, zooma_host, - oxo_target_list, oxo_distance, unattended): +def main(input_filepath, output_mappings_filepath, output_curation_filepath, filters, zooma_host, oxo_target_list, + oxo_distance): logger.info('Started parsing trait names') trait_names_list = parse_trait_names(input_filepath) trait_names_counter = Counter(trait_names_list) + logger.info("Loaded {} trait names".format(len(trait_names_counter))) with open(output_mappings_filepath, "w", newline='') as mapping_file, \ open(output_curation_filepath, "wt") as curation_file: @@ -80,19 +80,19 @@ def main(input_filepath, output_mappings_filepath, output_curation_filepath, fil mapping_writer.writerow(["#clinvar_trait_name", "uri", "label"]) curation_writer = csv.writer(curation_file, delimiter="\t") - trait_names_iterator = trait_names_counter.items() - if not unattended: - progress = progressbar.ProgressBar(max_value=len(trait_names_counter), - widgets=[progressbar.AdaptiveETA(samples=1000)]) - trait_names_iterator = progress(trait_names_iterator) - - logger.info("Loaded {} trait names".format(len(trait_names_counter))) - for i, (trait_name, freq) in enumerate(trait_names_iterator): - trait = Trait(trait_name, freq) - trait = process_trait(trait, filters, zooma_host, oxo_target_list, - oxo_distance) + logger.info('Processing trait names in parallel') + trait_list = [Trait(trait_name, freq) for trait_name, freq in trait_names_counter.items()] + trait_process_pool = multiprocessing.Pool(processes=12) + + processed_trait_list = [ + trait_process_pool.apply( + process_trait, + args=(trait, filters, zooma_host, oxo_target_list, oxo_distance) + ) + for trait in trait_list + ] + + for trait in processed_trait_list: output_trait(trait, mapping_writer, curation_writer) - if unattended and i % 100 == 0: - logger.info("Processed {} records".format(i)) logger.info('Finished processing trait names')