Skip to content

Commit

Permalink
Merge pull request #643 from broadinstitute/mw/drop_async_file_exists
Browse files Browse the repository at this point in the history
Drop async file exists function
  • Loading branch information
mike-w-wilson authored Oct 31, 2023
2 parents abf42b1 + 678d5f0 commit 188dd89
Showing 1 changed file with 1 addition and 87 deletions.
88 changes: 1 addition & 87 deletions gnomad/utils/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
# noqa: D100

import asyncio
import base64
import gzip
import logging
import os
import subprocess
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Dict, List, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union

import hail as hl
from hailtop.aiocloud.aiogoogle import GoogleStorageAsyncFS
from hailtop.aiotools import AsyncFS, LocalAsyncFS
from hailtop.aiotools.router_fs import RouterAsyncFS
from hailtop.utils import bounded_gather
from hailtop.utils.rich_progress_bar import SimpleCopyToolProgressBar

from gnomad.resources.resource_utils import DataException

Expand All @@ -24,85 +17,6 @@
logger.setLevel(logging.INFO)


async def parallel_file_exists_async(
fpaths: List[str], parallelism: int = 750
) -> Dict[str, bool]:
"""
Check whether a large number of files exist.
Created for use with hail Batch jobs.
Normal `file_exists` function is very slow when checking a large number of files.
:param fpaths: List of file paths to check. Files can be in local or Google cloud storage.
:param parallelism: Integer that sets parallelism of file existence checking task. Default is 750.
:return: Dictionary of file paths (str) and whether the file exists (boolean).
"""

async def async_file_exists(fs: AsyncFS, fpath: str) -> bool:
"""
Determine file existence.
:param fs: AsyncFS object.
:param fpath: Path to file to check.
:return: Whether file exists.
"""
fext = os.path.splitext(fpath)[1]
if fext in [".ht", ".mt"]:
fpath += "/_SUCCESS"
try:
await fs.statfile(fpath)
except FileNotFoundError:
return False
else:
return True

with SimpleCopyToolProgressBar(
total=len(fpaths), description="check files for existence", disable=False
) as pbar:
with ThreadPoolExecutor() as thread_pool:
async with RouterAsyncFS(
filesystems=[LocalAsyncFS(thread_pool), GoogleStorageAsyncFS()]
) as fs:

def check_existence_and_update_pbar_thunk(fpath: str) -> Callable:
"""
Create function to check if file exists and update progress bar in stdout.
Function delays coroutine creation to avoid creating too many live coroutines.
:param fpath: Path to file to check.
:return: Function that checks for file existence and updates progress bar.
"""

async def unapplied_function():
x = await async_file_exists(fs, fpath)
pbar.update(1)
return x

return unapplied_function

file_existence_checks = [
check_existence_and_update_pbar_thunk(fpath) for fpath in fpaths
]
file_existence = await bounded_gather(
*file_existence_checks, parallelism=parallelism
)
return dict(zip(fpaths, file_existence))


def parallel_file_exists(fpaths: List[str], parallelism: int = 750) -> Dict[str, bool]:
"""
Call `parallel_file_exists_async` to check whether large number of files exist.
:param fpaths: List of file paths to check. Files can be in local or Google cloud storage.
:param parallelism: Integer that sets parallelism of file existence checking task. Default is 750.
:return: Dictionary of file paths (str) and whether the file exists (boolean).
"""
return asyncio.get_event_loop().run_until_complete(
parallel_file_exists_async(fpaths, parallelism)
)


def file_exists(fname: str) -> bool:
"""
Check whether a file exists.
Expand Down

0 comments on commit 188dd89

Please sign in to comment.