Skip to content

Commit

Permalink
RE2022-209: workspace uploader (working script) (#381)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiangs18 authored Aug 17, 2023
1 parent 5a74792 commit a8b506d
Show file tree
Hide file tree
Showing 13 changed files with 1,089 additions and 322 deletions.
271 changes: 177 additions & 94 deletions src/clients/AssemblyUtilClient.py

Large diffs are not rendered by default.

13 changes: 12 additions & 1 deletion src/loaders/common/loader_common_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

# Arguments Descriptions

# Name for root directory argument
ROOT_DIR_ARG_NAME = "root_dir"
# Description of the --root_dir argument in various loaders programs.
ROOT_DIR_DESCR = "Root directory for the collections project."

# Name for load version argument
LOAD_VER_ARG_NAME = "load_ver"
# Description of the --load_ver argument in various loaders programs.
Expand Down Expand Up @@ -33,6 +38,7 @@
"""
File structure at NERSC for loader programs
"""
WS = "WS" # workspace

ROOT_DIR = (
"/global/cfs/cdirs/kbase/collections" # root directory for the collections project
Expand Down Expand Up @@ -71,7 +77,7 @@
# JSON keys in the download metadata file in a download directory
SOURCE_METADATA_FILE_KEYS = ["upa", "name", "type", "timestamp"]
# callback server docker image name
CALLBACK_IMAGE_NAME = "scanon/callback"
CALLBACK_IMAGE_NAME = "kbase/callback:test" #TODO switch to kbase/callback:latest

# a list of IDs provided to the computation script
DATA_ID_COLUMN_HEADER = "genome_id" # TODO DATA_ID change to data ID for generality
Expand Down Expand Up @@ -120,3 +126,8 @@
# TODO DOWNLOAD if we settle on a standard file name scheme for downloaders we can get
# rid of this
STANDARD_FILE_EXCLUDE_SUBSTRINGS = ['cds_from', 'rna_from', 'ERR']

KB_BASE_URL_MAP = {'CI': 'https://ci.kbase.us/services/',
'NEXT': 'https://next.kbase.us/services/',
'APPDEV': 'https://appdev.kbase.us/services/',
'PROD': 'https://kbase.us/services/'}
155 changes: 121 additions & 34 deletions src/loaders/common/loader_helper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import argparse
import itertools
import json
import os
import socket
import stat
import subprocess
import time
import uuid
from collections import defaultdict
from contextlib import closing
from pathlib import Path
Expand All @@ -13,15 +16,18 @@

import src.common.storage.collection_and_field_names as names
from src.common.storage.db_doc_conversions import collection_data_id_key
from src.loaders.common import loader_common_names
from src.loaders.common.loader_common_names import (
COLLECTION_SOURCE_DIR,
DOCKER_HOST,
FATAL_ERROR,
FATAL_STACKTRACE,
FATAL_TOOL,
IMPORT_DIR,
KB_AUTH_TOKEN,
SDK_JOB_DIR,
SOURCE_DATA_DIR,
SOURCE_METADATA_FILE_KEYS,
WS,
)

"""
Expand Down Expand Up @@ -144,7 +150,7 @@ def get_token(token_filepath):

def start_podman_service(uid: int):
"""
Start podman service. Used by workspace_downloader.py script.
Start podman service. Used by workspace_downloader.py and workspace_uploader.py scripts.
uid - the integer unix user ID of the user running the service.
"""
Expand All @@ -154,7 +160,8 @@ def start_podman_service(uid: int):
time.sleep(1)
return_code = proc.poll()
if return_code:
raise ValueError(f"The command {command} failed with return code {return_code}")
raise ValueError(f"The command {command} failed with return code {return_code}. "
f"Podman service failed to start")
os.environ["DOCKER_HOST"] = DOCKER_HOST.format(uid)
return proc

Expand All @@ -179,69 +186,149 @@ def is_upa_info_complete(upa_dir: str):
return True


def make_collection_source_dir(
root_dir: str,
env: str,
collection: str,
source_ver: str
) -> str:
def make_job_dir(root_dir, username):
"""Helper function that creates a job_dir for a user under root directory."""
job_dir = os.path.join(root_dir, SDK_JOB_DIR, username, uuid.uuid4().hex)
os.makedirs(job_dir, exist_ok=True)
# only user can read, write, or execute
os.chmod(job_dir, stat.S_IRWXU)
return job_dir


def make_job_data_dir(job_dir):
"""
Helper function that creates a temporary directory for sharing files between the host, callback server, and container.
SDK modules (like AssemblyUtil) have the shared directory mounted in the container at `/kb/module/work`. The
scratch directory provided to the SDK module `*Impl.py` code is `/kb/module/work/tmp`. The SDK code is expected
to read and write shared files there.
The callback server mounts `<job_dir>/workdir` as the host shared directory into the SDK module.
`<job_dir>` is also mounted into the callback server and it writes job information (e.g. the token and job configuration)
into `<job_dir>/workdir`
"""
data_dir = os.path.join(job_dir, "workdir/tmp")
os.makedirs(data_dir)
return data_dir


def make_sourcedata_ws_dir(root_dir, env, workspace_id):
"""Helper function that creates a output directory for a specific workspace id under root directory."""
output_dir = os.path.join(root_dir, SOURCE_DATA_DIR, WS, env, str(workspace_id))
os.makedirs(output_dir, exist_ok=True)
return output_dir


def make_collection_source_dir(root_dir: str, env: str, collection: str, source_ver: str) -> str:
"""
Helper function that creates a collection & source_version and link in data
to that collection from the overall source data dir.
"""
csd = os.path.join(root_dir, loader_common_names.COLLECTION_SOURCE_DIR, env, collection, source_ver)
os.makedirs(csd, exist_ok=True)
return csd
collection_source_dir = os.path.join(root_dir, COLLECTION_SOURCE_DIR, env, collection, source_ver)
os.makedirs(collection_source_dir, exist_ok=True)
return collection_source_dir


def create_softlinks_in_csd(csd: str, work_dir: str, genome_ids: list[str], taxonomy_files: list[str] = None) -> None:
def create_softlinks_in_collection_source_dir(
collection_source_dir: str,
work_dir: str,
genome_ids: list[str],
taxonomy_files:list[str] = None
) -> None:
"""
Create softlinks in the collection source dir to the genome files in the work dir.
"""
if not taxonomy_files:
taxonomy_files = []

for genome_id in genome_ids:
genome_dir = os.path.join(work_dir, genome_id)
csd_genome_dir = os.path.join(csd, genome_id)
create_softlink_between_dirs(csd_genome_dir, genome_dir)
target_dir = os.path.join(work_dir, genome_id)
new_dir = os.path.join(collection_source_dir, genome_id)
create_softlink_between_dirs(new_dir, target_dir)

for taxonomy_file in taxonomy_files:
csd_file = os.path.join(csd, taxonomy_file)
sd_file = os.path.join(work_dir, taxonomy_file)
create_softlink_between_files(csd_file, sd_file)
new_file = os.path.join(collection_source_dir, taxonomy_file)
target_file = os.path.join(work_dir, taxonomy_file)
create_softlink_between_files(new_file, target_file)

print(f"Genome files in {csd} \nnow link to {work_dir}")
print(f"Genome files in {collection_source_dir} \nnow link to {work_dir}")


def create_softlink_between_dirs(csd_dir, sd_dir):
def create_softlink_between_dirs(new_dir, target_dir):
"""
Creates a softlink between two directories.
Creates a softlink from new_dir to the contents of target_dir.
"""
if os.path.exists(csd_dir):
if os.path.exists(new_dir):
if (
os.path.isdir(csd_dir)
and os.path.islink(csd_dir)
and os.readlink(csd_dir) == sd_dir
os.path.isdir(new_dir)
and os.path.islink(new_dir)
and os.readlink(new_dir) == target_dir
):
return
raise ValueError(
f"{csd_dir} already exists and does not link to {sd_dir} as expected"
f"{new_dir} already exists and does not link to {target_dir} as expected"
)
os.symlink(target_dir, new_dir, target_is_directory=True)


def create_softlink_between_files(new_file, target_file):
"""
Creates a softlink from new_file to the contents of target_file.
"""
if os.path.exists(new_file):
if (os.path.islink(new_file) and os.readlink(new_file) == target_file):
return
raise ValueError(
f"{new_file} already exists and does not link to {target_file} as expected"
)
os.symlink(sd_dir, csd_dir, target_is_directory=True)
os.symlink(target_file, new_file)


def create_softlink_between_files(csd_file, sd_file):
def create_hardlink_between_files(new_file, target_file):
"""
Creates a softlink between two files.
Creates a hardlink from new_file to the contents of target_file.
"""
if os.path.exists(csd_file):
if (os.path.islink(csd_file) and os.readlink(csd_file) == sd_file):
if os.path.exists(new_file):
if os.path.samefile(target_file, new_file):
return
raise ValueError(
f"{csd_file} already exists and does not link to {sd_file} as expected"
f"{new_file} already exists and does not link to {target_file} as expected"
)
os.symlink(sd_file, csd_file)
os.link(target_file, new_file)


def list_objects(wsid, conf, object_type, include_metadata=False, batch_size=10000):
"""
List all objects information given a workspace ID.
"""
if batch_size > 10000:
raise ValueError("Maximum value for listing workspace objects is 10000")

maxObjectID = conf.ws.get_workspace_info({"id": wsid})[4]
batch_input = [
[idx + 1, idx + batch_size] for idx in range(0, maxObjectID, batch_size)
]
objs = [
conf.ws.list_objects(
_list_objects_params(wsid, min_id, max_id, object_type, include_metadata)
)
for min_id, max_id in batch_input
]
res_objs = list(itertools.chain.from_iterable(objs))
return res_objs


def _list_objects_params(wsid, min_id, max_id, type_str, include_metadata):
"""Helper function that creates params needed for list_objects function."""
params = {
"ids": [wsid],
"minObjectID": min_id,
"maxObjectID": max_id,
"type": type_str,
"includeMetadata": int(include_metadata),
}
return params


def get_ip():
Expand Down
14 changes: 9 additions & 5 deletions src/loaders/compute_tools/tool_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def __init__(
(default: PROD)
--load_ver LOAD_VER KBase load version (e.g. r207.kbase.1). (defaults to
the source version)
--root_dir ROOT_DIR Root directory.
--root_dir ROOT_DIR Root directory for the collections project.
(default: /global/cfs/cdirs/kbase/collections)
--threads THREADS Total number of threads used by the script. (default:
half of system cpu count)
--program_threads PROGRAM_THREADS
Expand Down Expand Up @@ -127,11 +128,12 @@ def __init__(
kbase_collection = getattr(args, loader_common_names.KBASE_COLLECTION_ARG_NAME)
source_ver = getattr(args, loader_common_names.SOURCE_VER_ARG_NAME)
load_ver = getattr(args, loader_common_names.LOAD_VER_ARG_NAME)
root_dir = getattr(args, loader_common_names.ROOT_DIR_ARG_NAME)
if not load_ver:
load_ver = source_ver

self._allow_missing_files = kbase_collection in _IGNORE_MISSING_FILES_COLLECTIONS
self._source_data_dir = Path(args.root_dir,
self._source_data_dir = Path(root_dir,
loader_common_names.COLLECTION_SOURCE_DIR,
env,
kbase_collection,
Expand All @@ -149,7 +151,7 @@ def __init__(
self._threads = max(1, self._threads)

self._work_dir = Path(
Path(args.root_dir),
Path(root_dir),
loader_common_names.COLLECTION_DATA_DIR,
env,
kbase_collection,
Expand Down Expand Up @@ -186,9 +188,11 @@ def _parse_args(self):
f'--{loader_common_names.LOAD_VER_ARG_NAME}', type=str,
help=loader_common_names.LOAD_VER_DESCR + ' (defaults to the source version)'
)

optional.add_argument(
'--root_dir', type=str, default=loader_common_names.ROOT_DIR, help='Root directory.'
f'--{loader_common_names.ROOT_DIR_ARG_NAME}',
type=str,
default=loader_common_names.ROOT_DIR,
help=f'{loader_common_names.ROOT_DIR_DESCR} (default: {loader_common_names.ROOT_DIR})'
)
optional.add_argument(
'--threads', type=int,
Expand Down
12 changes: 8 additions & 4 deletions src/loaders/genome_collection/compute_genome_taxa_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
optional arguments:
--env {CI,NEXT,APPDEV,PROD,NONE}
Environment containing the data to be processed. (default: PROD)
--root_dir ROOT_DIR Root directory for the collections project (default: /global/cfs/cdirs/kbase/collections)
--root_dir ROOT_DIR Root directory for the collections project. (default: /global/cfs/cdirs/kbase/collections)
--input_source {GTDB,genome_attributes}
Input file source
Expand Down Expand Up @@ -122,15 +122,19 @@ def main():
default='PROD',
help="Environment containing the data to be processed. (default: PROD)",
)
optional.add_argument('--root_dir', type=str, default=loader_common_names.ROOT_DIR,
help=f'Root directory for the collections project (default: {loader_common_names.ROOT_DIR})')
optional.add_argument(
f'--{loader_common_names.ROOT_DIR_ARG_NAME}',
type=str,
default=loader_common_names.ROOT_DIR,
help=f'{loader_common_names.ROOT_DIR_DESCR} (default: {loader_common_names.ROOT_DIR})'
)

optional.add_argument('--input_source', type=str, choices=VALID_SOURCE, default='GTDB',
help='Input file source')

args = parser.parse_args()
load_files = args.load_files
root_dir = args.root_dir
root_dir = getattr(args, loader_common_names.ROOT_DIR_ARG_NAME)
load_version = getattr(args, loader_common_names.LOAD_VER_ARG_NAME)
kbase_collection = getattr(args, loader_common_names.KBASE_COLLECTION_ARG_NAME)
env = getattr(args, loader_common_names.ENV_ARG_NAME)
Expand Down
10 changes: 7 additions & 3 deletions src/loaders/genome_collection/parse_tool_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,8 +843,12 @@ def main():
help=f'Extract results from tools. '
f'(default: retrieve all available sub-directories in the '
f'[{loader_common_names.LOAD_VER_ARG_NAME}] directory)')
optional.add_argument('--root_dir', type=str, default=loader_common_names.ROOT_DIR,
help=f'Root directory for the collections project. (default: {loader_common_names.ROOT_DIR})')
optional.add_argument(
f'--{loader_common_names.ROOT_DIR_ARG_NAME}',
type=str,
default=loader_common_names.ROOT_DIR,
help=f'{loader_common_names.ROOT_DIR_DESCR} (default: {loader_common_names.ROOT_DIR})'
)
optional.add_argument('--check_genome', action="store_true",
help='Ensure a corresponding genome exists for every assembly')
optional.add_argument(
Expand All @@ -859,9 +863,9 @@ def main():
kbase_collection = getattr(args, loader_common_names.KBASE_COLLECTION_ARG_NAME)
source_ver = getattr(args, loader_common_names.SOURCE_VER_ARG_NAME)
load_ver = getattr(args, loader_common_names.LOAD_VER_ARG_NAME)
root_dir = getattr(args, loader_common_names.ROOT_DIR_ARG_NAME)
if not load_ver:
load_ver = source_ver
root_dir = args.root_dir
check_genome = args.check_genome

if not args.skip_retrieve_sample:
Expand Down
Loading

0 comments on commit a8b506d

Please sign in to comment.