diff --git a/gnomad/utils/file_utils.py b/gnomad/utils/file_utils.py index c026b66bd..8ddf1425e 100644 --- a/gnomad/utils/file_utils.py +++ b/gnomad/utils/file_utils.py @@ -1,21 +1,14 @@ # noqa: D100 -import asyncio import base64 import gzip import logging import os import subprocess import uuid -from concurrent.futures import ThreadPoolExecutor -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import List, Optional, Tuple, Union import hail as hl -from hailtop.aiocloud.aiogoogle import GoogleStorageAsyncFS -from hailtop.aiotools import AsyncFS, LocalAsyncFS -from hailtop.aiotools.router_fs import RouterAsyncFS -from hailtop.utils import bounded_gather -from hailtop.utils.rich_progress_bar import SimpleCopyToolProgressBar from gnomad.resources.resource_utils import DataException @@ -24,85 +17,6 @@ logger.setLevel(logging.INFO) -async def parallel_file_exists_async( - fpaths: List[str], parallelism: int = 750 -) -> Dict[str, bool]: - """ - Check whether a large number of files exist. - - Created for use with hail Batch jobs. - Normal `file_exists` function is very slow when checking a large number of files. - - :param fpaths: List of file paths to check. Files can be in local or Google cloud storage. - :param parallelism: Integer that sets parallelism of file existence checking task. Default is 750. - :return: Dictionary of file paths (str) and whether the file exists (boolean). - """ - - async def async_file_exists(fs: AsyncFS, fpath: str) -> bool: - """ - Determine file existence. - - :param fs: AsyncFS object. - :param fpath: Path to file to check. - :return: Whether file exists. - """ - fext = os.path.splitext(fpath)[1] - if fext in [".ht", ".mt"]: - fpath += "/_SUCCESS" - try: - await fs.statfile(fpath) - except FileNotFoundError: - return False - else: - return True - - with SimpleCopyToolProgressBar( - total=len(fpaths), description="check files for existence", disable=False - ) as pbar: - with ThreadPoolExecutor() as thread_pool: - async with RouterAsyncFS( - filesystems=[LocalAsyncFS(thread_pool), GoogleStorageAsyncFS()] - ) as fs: - - def check_existence_and_update_pbar_thunk(fpath: str) -> Callable: - """ - Create function to check if file exists and update progress bar in stdout. - - Function delays coroutine creation to avoid creating too many live coroutines. - - :param fpath: Path to file to check. - :return: Function that checks for file existence and updates progress bar. - """ - - async def unapplied_function(): - x = await async_file_exists(fs, fpath) - pbar.update(1) - return x - - return unapplied_function - - file_existence_checks = [ - check_existence_and_update_pbar_thunk(fpath) for fpath in fpaths - ] - file_existence = await bounded_gather( - *file_existence_checks, parallelism=parallelism - ) - return dict(zip(fpaths, file_existence)) - - -def parallel_file_exists(fpaths: List[str], parallelism: int = 750) -> Dict[str, bool]: - """ - Call `parallel_file_exists_async` to check whether large number of files exist. - - :param fpaths: List of file paths to check. Files can be in local or Google cloud storage. - :param parallelism: Integer that sets parallelism of file existence checking task. Default is 750. - :return: Dictionary of file paths (str) and whether the file exists (boolean). - """ - return asyncio.get_event_loop().run_until_complete( - parallel_file_exists_async(fpaths, parallelism) - ) - - def file_exists(fname: str) -> bool: """ Check whether a file exists.