From 5340ec40416d951233516b4581893004c4488136 Mon Sep 17 00:00:00 2001 From: Elad Date: Sun, 12 Mar 2023 20:08:18 +0200 Subject: [PATCH] fixed library & taxonomy download function and added multiprocessing --- scripts/k2 | 129 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 55 deletions(-) diff --git a/scripts/k2 b/scripts/k2 index 92f27cd..fa46d98 100755 --- a/scripts/k2 +++ b/scripts/k2 @@ -1,5 +1,6 @@ #!/usr/bin/env python3 + import argparse import bz2 import ftplib @@ -23,6 +24,7 @@ import urllib import urllib.error import urllib.parse import urllib.request +import multiprocessing LOG = None SCRIPT_PATHNAME = None @@ -297,12 +299,15 @@ def find_kraken2_binary(name): if os.path.exists(os.path.join(script_parent_directory, name)): binary = os.path.join(script_parent_directory, name) if binary is None: - project_root = get_parent_directory(script_parent_directory) - if "src" in os.listdir(project_root) and name in os.listdir( - os.path.join(project_root, "src") - ): - binary = os.path.join(project_root, os.path.join("src", name)) - if not binary: + try: + binary = str(shutil.which('kraken2').removesuffix('/bin/kraken2'))+ "/libexec/"+name + except FileNotFoundError : + project_root = get_parent_directory(script_parent_directory) + if "src" in os.listdir(project_root) and name in os.listdir( + os.path.join(project_root, "src") + ): + binary = os.path.join(project_root, os.path.join("src", name)) + else: LOG.error("Unable to find {:s}, exiting\n".format(name)) sys.exit(1) return binary @@ -518,11 +523,11 @@ def add_to_library(args): with open(filename, "r") as in_file: scan_fasta_file(in_file, out_file, lenient=True) shutil.copyfile(filename, destination) - if not args.no_masking: - mask_files( - [destination], destination + ".masked", protein=args.protein - ) - shutil.move(destination + ".masked", destination) + #if not args.no_masking: + # mask_files( + # [destination], destination + ".masked", protein=args.protein + # ) + # shutil.move(destination + ".masked", destination) with open("added.md5", "a") as out_file: out_file.write(filehash + "\t" + destination + "\n") LOG.info("Added " + filename + " to library " + args.db + "\n") @@ -564,7 +569,8 @@ def assign_taxid_to_sequences(manifest_to_taxid, is_protein): max_out_chars = 0 for filepath in sorted(manifest_to_taxid): taxid = manifest_to_taxid[filepath] - with gzip.open(filepath) as in_file: + filepath = os.path.basename(filepath.removesuffix('.gz')) + with open(filepath, 'rb+') as in_file: while True: line = in_file.readline() if line == b"": @@ -674,6 +680,11 @@ def download_file(url, local_name=None): clear_console_line() LOG.info("Saved {:s} to {:s}\n".format(local_name, os.getcwd())) +def download_multiple_files(url): + ## download ~31654 files of the bacteria library in parallel, depends on the amount of allocated cpus + filename = os.path.basename(url).removesuffix('.gz') + if not os.path.isfile(filename): + subprocess.call(['wget', '-q', url]) def make_manifest_filter(file, regex): def inner(listing): @@ -701,6 +712,27 @@ def get_manifest(server, remote_path, regex): ftp.retrlines("LIST", callback=make_manifest_filter(f, regex)) ftp.close() +def purge(pattern): + dir = os.getcwd() + for f in os.listdir(dir): + if re.search(pattern, f): + os.remove(os.path.join(dir, f)) + + +def multiprocess_urls(remote_dir, filepaths): + server = 'https://ftp.ncbi.nih.gov' + pool = multiprocessing.Pool(processes=50) + url_list = [] + for f in filepaths: + ftp_url = server+remote_dir+f + url_list.append(ftp_url) + ## Download the files in the fna format + print("Downloading "+ str(len(url_list))+ " files.") + pool.map(download_multiple_files, url_list) + ## unzip the files + pool.close() + pool.join() + def download_files_from_manifest( server, @@ -717,26 +749,16 @@ def download_files_from_manifest( i = 0 for filepath in f: LOG.info( - "Checking if manifest files exist on server {:s}\r".format( + "Generating file list from manifest {:s}\r".format( spinner[i % 4] ) ) i += 1 filepath = filepath.strip() - if not ftp.exists(urllib.parse.urljoin(remote_dir, filepath)): - if filepath_to_taxid_table: - del filepath_to_taxid_table[filepath] - LOG.warning( - "{:s} does not exist on server, skipping\n".format( - remote_dir + filepath - ) - ) - continue filepaths.append(filepath) - ftp.download(remote_dir, filepaths) - ftp.close() + multiprocess_urls(remote_dir, filepaths) if decompress: - decompress_files(filepaths, out_filename) + subprocess.call("gunzip -f *.gz",shell=True) def download_taxonomy(args): @@ -747,29 +769,23 @@ def download_taxonomy(args): if not args.skip_maps: if not args.protein: for subsection in ["gb", "wgs"]: - LOG.info( - "Downloading nucleotide {:s} accession to taxon map\n".format( - subsection - ) - ) filename = "nucl_" + subsection + ".accession2taxid.gz" - ftp.download("/pub/taxonomy/accession2taxid/", filename) + uncompressed_file = "nucl_" + subsection + ".accession2taxid" + if not os.path.exists(uncompressed_file) and not os.path.exists(filename): + LOG.info("Downloading nucleotide {:s} accession to taxon map\n".format(subsection)) + + subprocess.call(f"wget -q https://{NCBI_SERVER}/pub/taxonomy/accession2taxid/{filename}", shell=True) else: LOG.info("Downloading protein accession to taxon map\n") - ftp.download( - "/pub/taxonomy/accession2taxid", "prot.accession2taxid.gz" - ) + subprocess.call(f"wget -q https://{NCBI_SERVER}/pub/taxonomy/accession2taxid/prot.accession2taxid.gz", shell=True) LOG.info("Downloaded accession to taxon map(s)\n") LOG.info("Downloading taxonomy tree data\n") - ftp.download("/pub/taxonomy", "taxdump.tar.gz") - ftp.close() + subprocess.call(f"wget -q https://{NCBI_SERVER}/pub/taxonomy/taxdump.tar.gz", shell=True) LOG.info("Decompressing taxonomy data\n") - decompress_files(glob.glob("*accession2taxid.gz")) - LOG.info("Untarring taxonomy tree data\n") - with tarfile.open("taxdump.tar.gz", "r:gz") as tar: - tar.extractall() - remove_files(glob.glob("*.gz")) - + subprocess.call("tar xzf taxdump.tar.gz", shell=True) + LOG.info("Decompressing nucleotides\n") + subprocess.call("gunzip -f *.gz",shell=True) + purge('taxdump.tar') def download_genomic_library(args): library_filename = "library.faa" if args.protein else "library.fna" @@ -796,10 +812,10 @@ def download_genomic_library(args): if args.library == "human": remote_dir_name = "vertebrate_mammalian/Homo_sapiens" try: - url = "ftp://{:s}/genomes/refseq/{:s}/assembly_summary.txt".format( + url = "https://{:s}/genomes/refseq/{:s}/assembly_summary.txt".format( NCBI_SERVER, remote_dir_name ) - download_file(url) + subprocess.call(f"wget -q {url}", shell=True) except urllib.error.URLError: LOG.error( "Error downloading assembly summary file for {:s}, exiting\n".format( @@ -821,6 +837,7 @@ def download_genomic_library(args): download_files_from_manifest( NCBI_SERVER, "/genomes/", + decompress=True, filepath_to_taxid_table=filepath_to_taxid_table, ) assign_taxid_to_sequences(filepath_to_taxid_table, args.protein) @@ -872,8 +889,9 @@ def download_genomic_library(args): library_pathname = os.path.join(library_pathname, args.library) os.makedirs(library_pathname, exist_ok=True) os.chdir(library_pathname) - ftp = FTP(NCBI_SERVER) - ftp.download("pub/UniVec", args.library) + ## UniVec_Core + print('Downloading UniVec_Core'+'\n') + subprocess.call('wget https://ftp.ncbi.nlm.nih.gov/pub/UniVec/UniVec_Core', shell=True) special_taxid = 28384 LOG.info( "Adding taxonomy ID of {:d} to all sequences\n".format( @@ -894,11 +912,11 @@ def download_genomic_library(args): with open("prelim_map.txt", "w") as out_file: scan_fasta_file(in_file, out_file) files_to_remove = [args.library] - if not args.no_masking: - mask_files( - [library_filename], library_filename + ".masked", args.protein - ) - shutil.move(library_filename + ".masked", library_filename) + #if not args.no_masking: + # mask_files( + # [library_filename], library_filename + ".masked", args.protein + # ) + # shutil.move(library_filename + ".masked", library_filename) LOG.info("Added {:s} to {:s}\n".format(args.library, args.db)) if files_to_remove: clean_up(files_to_remove) @@ -987,7 +1005,8 @@ def build_kraken2_db(args): os.chdir(args.db) if not os.path.isdir("taxonomy"): LOG.error("Cannot find taxonomy subdirectory in database\n") - sys.exit(1) + LOG.info("Downloading taxonomy...\n") + download_taxonomy(args) if not os.path.isdir("library"): LOG.error("Cannot find library subdirectory in database\n") sys.exit(1) @@ -1042,7 +1061,7 @@ def build_kraken2_db(args): ) else: LOG.error( - "Accession to taxid map files are required to build this database.\n" + "Accession to taxid map files are required to build th database.\n" ) LOG.error( "Run k2 download-taxonomy --db {:s} again".format(args.db) @@ -1055,7 +1074,7 @@ def build_kraken2_db(args): argv = [estimate_capacity_binary, "-S", construct_seed_template(args)] if args.protein: argv.append("-X") - wrapper_args_to_binary_args( + wrapper_args_to_binary_args( args, argv, get_binary_options(estimate_capacity_binary) ) fasta_filenames = glob.glob( @@ -1371,7 +1390,7 @@ def build_gg_taxonomy(in_file): display_name = genus + " " + species else: match = re.search("([a-z])__([^;]+)$", node) - if match: + if match: rank = rank_codes[match.group(1)] display_name = match.group(2) rank = rank or "no rank"