Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RE2022-195 - add retrieve samples logic #327

Merged
merged 18 commits into from
Jun 16, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 114 additions & 44 deletions src/loaders/workspace_downloader/workspace_downloader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
usage: workspace_downloader.py [-h] --workspace_id WORKSPACE_ID [--kbase_collection KBASE_COLLECTION] [--source_version SOURCE_VERSION]
[--root_dir ROOT_DIR] [--kb_base_url KB_BASE_URL] [--workers WORKERS] [--token_filepath TOKEN_FILEPATH]
[--keep_job_dir]
[--keep_job_dir] [--retrieve_sample]

PROTOTYPE - Download genome files from the workspace service (WSS).

Expand All @@ -24,6 +24,7 @@
--token_filepath TOKEN_FILEPATH
A file path that stores KBase token
--keep_job_dir Keep SDK job directory after download task is completed
--retrieve_sample Retrieve sample for each genome object


e.g.
Expand Down Expand Up @@ -52,6 +53,7 @@
from multiprocessing import Pool, Queue, cpu_count

import docker
import requests
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved

from src.clients.AssemblyUtilClient import AssemblyUtil
from src.clients.workspaceClient import Workspace
Expand All @@ -68,26 +70,27 @@

class Conf:
def __init__(
self,
job_dir,
output_dir,
workers,
kb_base_url,
token_filepath,
self,
job_dir,
output_dir,
workers,
kb_base_url,
token_filepath,
):
port = loader_helper.find_free_port()
token = loader_helper.get_token(token_filepath)
self.token = loader_helper.get_token(token_filepath)

self.start_callback_server(
docker.from_env(), uuid.uuid4().hex, job_dir, kb_base_url, token, port
docker.from_env(), uuid.uuid4().hex, job_dir, kb_base_url, self.token, port
)

ws_url = os.path.join(kb_base_url, "ws")
self.sample_url = os.path.join(kb_base_url, "sampleservice")
callback_url = "http://" + loader_helper.get_ip() + ":" + str(port)
print("callback_url:", callback_url)

self.ws = Workspace(ws_url, token=token)
self.asu = AssemblyUtil(callback_url, token=token)
self.ws = Workspace(ws_url, token=self.token)
self.asu = AssemblyUtil(callback_url, token=self.token)
self.queue = Queue()
self.pth = output_dir
self.job_dir = job_dir
Expand Down Expand Up @@ -115,7 +118,7 @@ def setup_callback_server_envs(self, job_dir, kb_base_url, token, port):
return env, vol

def start_callback_server(
self, client, container_name, job_dir, kb_base_url, token, port
self, client, container_name, job_dir, kb_base_url, token, port
):
env, vol = self.setup_callback_server_envs(job_dir, kb_base_url, token, port)
self.container = client.containers.run(
Expand Down Expand Up @@ -150,7 +153,7 @@ def _make_job_dir(root_dir, job_dir, username):


def _make_collection_source_dir(
root_dir, collection_source_dir, collection, source_verion
root_dir, collection_source_dir, collection, source_verion
):
"""
Helper function that creates a collection & source_version and link in data
Expand Down Expand Up @@ -213,9 +216,9 @@ def _create_softlink(csd_upa_dir, upa_dir):
"""
if os.path.exists(csd_upa_dir):
if (
os.path.isdir(csd_upa_dir)
and os.path.islink(csd_upa_dir)
and os.readlink(csd_upa_dir) == upa_dir
os.path.isdir(csd_upa_dir)
and os.path.islink(csd_upa_dir)
and os.readlink(csd_upa_dir) == upa_dir
):
return
raise ValueError(
Expand Down Expand Up @@ -245,7 +248,7 @@ def _process_object_info(obj_info, genome_upa):


def list_objects(
wsid, conf, filter_objects_name_by, include_metadata=False, batch_size=10000
wsid, conf, filter_objects_name_by, include_metadata=False, batch_size=10000
):
"""
List all objects information given a workspace ID.
Expand Down Expand Up @@ -278,31 +281,99 @@ def process_input(conf):
if not task:
print("Stopping")
break
upa, obj_info, genome_upa = task
upa, obj_info, genome_upa, retrieve_sample = task

upa_dir = os.path.join(conf.pth, upa)
if not os.path.isdir(upa_dir) or not loader_helper.is_upa_info_complete(upa_dir):

# remove legacy upa_dir to avoid FileExistsError in hard link
if os.path.isdir(upa_dir):
shutil.rmtree(upa_dir)

# cfn points to the assembly file outside of the container
# get_assembly_as_fasta writes the file to /kb/module/workdir/tmp/<filename> inside the container.
# workdir is shared between the container and the external file system
# Any file path get_assembly_as_fasta returns will be relative to inside the container, and so is not useful for this script
cfn = os.path.join(conf.job_dir, "workdir/tmp", upa)
# upa file is downloaded to cfn
conf.asu.get_assembly_as_fasta({"ref": upa.replace("_", "/"), "filename": upa})

# each upa in output_dir as a separate directory
dstd = os.path.join(conf.pth, upa)
os.makedirs(dstd, exist_ok=True)

dst = os.path.join(dstd, f"{upa}.fa")
# Hard link .fa file from job_dir to output_dir in WS
os.link(cfn, dst)

metafile = os.path.join(dstd, f"{upa}.meta")
# save meta file with relevant object_info
with open(metafile, "w", encoding="utf8") as json_file:
json.dump(_process_object_info(obj_info, genome_upa), json_file, indent=2)

print("Completed %s" % (upa))
else:
print(f"Skip downloading {upa} as it already exists")

# cfn points to the assembly file outside of the container
# get_assembly_as_fasta writes the file to /kb/module/workdir/tmp/<filename> inside the container.
# workdir is shared between the container and the external file system
# Any file path get_assembly_as_fasta returns will be relative to inside the container, and so is not useful for this script
if retrieve_sample:
_download_sample_data(conf, upa)
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved

cfn = os.path.join(conf.job_dir, "workdir/tmp", upa)
# upa file is downloaded to cfn
conf.asu.get_assembly_as_fasta({"ref": upa.replace("_", "/"), "filename": upa})

# each upa in output_dir as a seperate directory
dstd = os.path.join(conf.pth, upa)
os.makedirs(dstd, exist_ok=True)
def _download_sample_data(conf, upa):
# retrieve sample data from sample service and save to file

dst = os.path.join(dstd, f"{upa}.fa")
# Hard link .fa file from job_dir to output_dir in WS
os.link(cfn, dst)
dstd = os.path.join(conf.pth, upa)
os.makedirs(dstd, exist_ok=True)
sample_file = os.path.join(dstd, f"{upa}.sample")

metafile = os.path.join(dstd, f"{upa}.meta")
# save meta file with relevant object_info
with open(metafile, "w", encoding="utf8") as json_file:
json.dump(_process_object_info(obj_info, genome_upa), json_file, indent=2)
if os.path.isfile(sample_file):
print(f"Skip downloading sample for {upa} as it already exists")
return

print("Completed %s" % (upa))
# retrieve data links associated with upa
links_ret = _post_sample_service(conf.token,
conf.sample_url,
"get_data_links_from_data",
{"upa": upa.replace("_", "/")})
data_links = links_ret['links']
if not data_links:
print(f"No sample data links found for {upa}")
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
return

# there should only be one data link for each upa
if len(data_links) != 1:
raise ValueError(f"Expected 1 data link for {upa}, got {len(data_links)}")

# retrieve sample data and save to file
sample_id = data_links[0]['id']
sample_ret = _post_sample_service(conf.token,
conf.sample_url,
"get_sample",
{"id": sample_id})
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved

with open(sample_file, "w", encoding="utf8") as json_file:
json.dump(sample_ret, json_file, indent=2)
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved


def _post_sample_service(token, sample_url, method, params):
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved
# Sends a POST request to the sample service API.

headers = {
"Authorization": token,
"Content-Type": "application/json"
}
payload = {
"method": f"SampleService.{method}",
"id": str(uuid.uuid4()),
"params": [params]
}
resp = requests.post(url=sample_url, headers=headers, json=payload)
resp_json = resp.json()
if resp_json.get('error'):
raise RuntimeError(f"Error from SampleService - {resp_json['error']}")
result = resp_json['result'][0]

return result


def main():
Expand Down Expand Up @@ -361,6 +432,11 @@ def main():
action="store_true",
help="Keep SDK job directory after download task is completed",
)
optional.add_argument(
"--retrieve_sample",
action="store_true",
help="Retrieve sample for each genome object",
)

args = parser.parse_args()

Expand All @@ -372,6 +448,7 @@ def main():
workers = args.workers
token_filepath = args.token_filepath
keep_job_dir = args.keep_job_dir
retrieve_sample = args.retrieve_sample

if bool(kbase_collection) ^ bool(source_version):
parser.error(
Expand Down Expand Up @@ -445,15 +522,8 @@ def main():
for obj_info in assembly_objs:
upa = "{6}_{0}_{4}".format(*obj_info)
upas.append(upa)
upa_dir = os.path.join(output_dir, upa)
if os.path.isdir(upa_dir) and loader_helper.is_upa_info_complete(upa_dir):
continue

# remove legacy upa_dir to avoid FileExistsError in hard link
if os.path.isdir(upa_dir):
shutil.rmtree(upa_dir)
genome_upa = assembly_genome_map[upa.replace("_", "/")]
conf.queue.put([upa, obj_info, genome_upa])
conf.queue.put([upa, obj_info, genome_upa, retrieve_sample])
MrCreosote marked this conversation as resolved.
Show resolved Hide resolved

for i in range(workers + 1):
conf.queue.put(None)
Expand Down