Skip to content

Commit

Permalink
Merge pull request #709 from kbase/dev_taskfarmer_gen
Browse files Browse the repository at this point in the history
RE2022-336: remove threads from tool execution
  • Loading branch information
Tianhao-Gu authored Apr 4, 2024
2 parents 01c92e8 + 7bf911a commit da3d2b1
Showing 1 changed file with 62 additions and 98 deletions.
160 changes: 62 additions & 98 deletions src/loaders/compute_tools/tool_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import datetime
import gzip
import json
import math
import multiprocessing
import os
import re
import shutil
Expand All @@ -25,7 +23,7 @@
import uuid
from collections import namedtuple
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple, Union
from typing import Callable, Dict, List, Tuple, Union

import pandas as pd

Expand All @@ -34,11 +32,6 @@
# TODO CODE add a common module for saving and loading the metadata shared between the compute
# and parser

# Fraction amount of system cores can be utilized
# (i.e. 0.5 - program will use 50% of total processors,
# 0.1 - program will use 10% of total processors)
_SYSTEM_UTILIZATION = 0.5 # This might need to be customizable per tool

# source genome files can be missing for those collections
# genomes with missing files will be skipped rather than raising an error
# TODO DOWNLOAD having download script not create empty directories for genomes with missing files
Expand Down Expand Up @@ -93,12 +86,8 @@ def __init__(
the source version)
--root_dir ROOT_DIR Root directory for the collections project.
(default: /global/cfs/cdirs/kbase/collections)
--threads THREADS Total number of threads used by the script. (default:
half of system cpu count)
--program_threads PROGRAM_THREADS
--threads_per_tool_run THREADS_PER_TOOL_RUN
Number of threads to execute a single tool command.
threads / program_threads determines the number of
batches. (default: 32)
--node_id NODE_ID node ID for running job
--debug Debug mode.
--data_id_file DATA_ID_FILE
Expand Down Expand Up @@ -138,18 +127,13 @@ def __init__(
env,
kbase_collection,
source_ver)
self._threads = args.threads
self._program_threads = args.program_threads
self._threads_per_tool_run = args.threads_per_tool_run
self._debug = args.debug
self._data_id_file = args.data_id_file
self._node_id = args.node_id
self._source_file_ext = args.source_file_ext
self._data_ids = self._get_data_ids()

if not self._threads:
self._threads = max(int(multiprocessing.cpu_count() * min(_SYSTEM_UTILIZATION, 1)), 1)
self._threads = max(1, self._threads)

self._work_dir = Path(
Path(root_dir),
loader_common_names.COLLECTION_DATA_DIR,
Expand Down Expand Up @@ -195,13 +179,8 @@ def _parse_args(self):
help=f'{loader_common_names.ROOT_DIR_DESCR} (default: {loader_common_names.ROOT_DIR})'
)
optional.add_argument(
'--threads', type=int,
help='Total number of threads used by the script. (default: half of system cpu count)'
)
optional.add_argument(
'--program_threads', type=int, default=32,
help='Number of threads to execute a single tool command. '
+ 'threads / program_threads determines the number of batches. (default: 32)'
'--threads_per_tool_run', type=int, default=32,
help='Number of threads to execute a single tool command.'
)
optional.add_argument(
'--node_id', type=str, default=str(uuid.uuid4()), help='node ID for running job'
Expand Down Expand Up @@ -246,46 +225,63 @@ def _get_data_ids(self):
data_ids = all_data_ids
return list(set(data_ids))

def _prepare_execution(
self,
unzip: bool
) -> Tuple[Path, Dict[str, Dict[str, Union[str, Path]]], List[str]]:

batch_dir, genomes_meta = _prepare_tool(
self._work_dir,
loader_common_names.COMPUTE_OUTPUT_NO_BATCH,
self._node_id,
self._data_ids,
self._source_data_dir,
self._source_file_ext,
self._allow_missing_files,
self._tool_data_id_from_filename,
self._suffix_ids,
)

unzipped_files_to_delete = []
if unzip:
unzipped_files_to_delete = _unzip_files(genomes_meta)

return batch_dir, genomes_meta, unzipped_files_to_delete

def _finalize_execution(
self,
unzipped_files_to_delete: List[str]):

if unzipped_files_to_delete:
print(f"Deleting {len(unzipped_files_to_delete)} unzipped files: {unzipped_files_to_delete[:5]}...")
for file in unzipped_files_to_delete:
os.remove(file)

def parallel_single_execution(self, tool_callable: Callable[[str, str, Path, Path, int, bool], None], unzip=False):
"""
Run a tool by a single data file, storing the results in a single batch directory with
the individual runs stored in directories by the data ID.
One tool execution per data ID. Tool execution is parallelized by the number of threads
specified in the constructor.
One tool execution per data ID. Tool execution is in a serial manner.
Results from execution need to be processed/parsed individually.
Use case: microtrait - execute microtrait logic on each individual genome file. The result file is stored in
each individual genome directory. Parser program will parse the result file in each individual genome
directory.
tool_callable - the callable for the tool that takes 5 arguments:
tool_callable - the callable for the tool that takes 6 arguments:
* The tool safe data ID
* The data ID
* The input file
* The output directory
* The number of threads to use for the tool run
* A debug boolean
unzip - if True, unzip the input file before passing it to the tool callable. (unzipped file will be deleted)
"""
start = time.time()
batch_dir, genomes_meta = _prepare_tool(
self._work_dir,
loader_common_names.COMPUTE_OUTPUT_NO_BATCH,
self._node_id,
self._data_ids,
self._source_data_dir,
self._source_file_ext,
self._allow_missing_files,
self._tool_data_id_from_filename,
self._suffix_ids,
)

unzipped_files_to_delete = list()
if unzip:
unzipped_files_to_delete = _unzip_files(genomes_meta)
batch_dir, genomes_meta, unzipped_files_to_delete = self._prepare_execution(unzip)

# RUN tool in parallel with multiprocessing
args_list = []
for data_id, meta in genomes_meta.items():
output_dir = batch_dir / data_id
Expand All @@ -298,27 +294,22 @@ def parallel_single_execution(self, tool_callable: Callable[[str, str, Path, Pat
meta.get(loader_common_names.META_UNCOMPRESSED_FILE,
meta[loader_common_names.META_SOURCE_FILE]),
output_dir,
self._program_threads,
self._threads_per_tool_run,
self._debug))

try:
self._execute(self._threads, tool_callable, args_list, start, False)
self._execute(tool_callable, args_list, start, False)
finally:
if unzipped_files_to_delete:
print(f"Deleting {len(unzipped_files_to_delete)} unzipped files: {unzipped_files_to_delete[:5]}...")
for file in unzipped_files_to_delete:
os.remove(file)
self._finalize_execution(unzipped_files_to_delete)

create_metadata_file(genomes_meta, batch_dir)

def parallel_batch_execution(self, tool_callable: Callable[[Dict[str, GenomeTuple], Path, int, bool], None],
unzip=False):
"""
Run a tool in batched mode, where > 1 data file is processed by the tool in one
call. Each batch gets its own batch directory.
Run a tool in batched mode, where > 1 data file is processed by the tool in one call.
Data IDs are divided into batches, and each batch is processed in parallel. The tool execution results can
be consolidated into individual files for each batch
All data IDs are collectively processed in a single batch during tool execution.
Use case: gtdb-tk - concurrently execute gtdb_tk on a batch of genomes, and one result file
(gtdbtk.ar53.summary.tsv) is produced per batch.
Expand All @@ -327,66 +318,39 @@ def parallel_batch_execution(self, tool_callable: Callable[[Dict[str, GenomeTupl
tool_callable - the callable for the tool that takes 4 arguments:
* A dictionary of the tool safe data ID to the GenomeTuple
* The output directory for results
* The number of threads to use for the batch
* The number of threads to use for the tool run
* A debug boolean
"""
start = time.time()
num_batches = max(math.floor(self._threads / self._program_threads), 1)
# distribute genome ids evenly across batches
chunk_size = math.ceil(len(self._data_ids) / num_batches)
batch_input = []
metas = []
unzipped_files_to_delete = list()
for batch_number, i in enumerate(range(0, len(self._data_ids), chunk_size)):
data_ids = self._data_ids[i: i + chunk_size]
batch_dir, meta = _prepare_tool(
self._work_dir,
batch_number,
self._node_id,
data_ids,
self._source_data_dir,
self._source_file_ext,
self._allow_missing_files,
self._tool_data_id_from_filename,
self._suffix_ids,
)

if unzip:
unzipped_files_to_delete.extend(_unzip_files(meta))
batch_dir, genomes_meta, unzipped_files_to_delete = self._prepare_execution(unzip)

metas.append((meta, batch_dir))
ids_to_files = dict()
for data_id, m in meta.items():
# use the uncompressed file if it exists, otherwise use the source file
source_file = m.get(loader_common_names.META_UNCOMPRESSED_FILE,
m[loader_common_names.META_SOURCE_FILE])
ids_to_files[m[loader_common_names.META_TOOL_IDENTIFIER]] = GenomeTuple(source_file, data_id)
ids_to_files = dict()
for data_id, m in genomes_meta.items():
# use the uncompressed file if it exists, otherwise use the source file
source_file = m.get(loader_common_names.META_UNCOMPRESSED_FILE,
m[loader_common_names.META_SOURCE_FILE])
ids_to_files[m[loader_common_names.META_TOOL_IDENTIFIER]] = GenomeTuple(source_file, data_id)

batch_input.append((ids_to_files, batch_dir, self._program_threads, self._debug))
batch_input = [(ids_to_files, batch_dir, self._threads_per_tool_run, self._debug)]

try:
self._execute(num_batches, tool_callable, batch_input, start, True)
self._execute(tool_callable, batch_input, start, True)
finally:
if unzipped_files_to_delete:
print(f"Deleting {len(unzipped_files_to_delete)} unzipped files: {unzipped_files_to_delete[:5]}...")
for file in unzipped_files_to_delete:
os.remove(file)
self._finalize_execution(unzipped_files_to_delete)

for meta in metas:
create_metadata_file(*meta)
create_metadata_file(genomes_meta, batch_dir)

def _execute(
self,
threads: int,
tool_callable: Callable[..., None],
args: List[Tuple[Dict[str, GenomeTuple], Path, int, bool]] | List[Tuple[str, str, Path, Path, int, bool]],
start: datetime.datetime,
total: bool,
):
pool = multiprocessing.Pool(processes=threads)
pool.starmap(tool_callable, args)
pool.close()
pool.join()

for arg in args:
tool_callable(*arg)

prefix = "In total used" if total else "Used"
print(f"{prefix} {round((time.time() - start) / 60, 2)} minutes to "
+ f"execute {self._tool} for {len(self._data_ids)} data units"
Expand Down

0 comments on commit da3d2b1

Please sign in to comment.