Skip to content

Commit

Permalink
Release 0.2.4
Browse files Browse the repository at this point in the history
  • Loading branch information
christiam committed Mar 3, 2022
1 parent b87c75f commit f5feafd
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 45 deletions.
4 changes: 2 additions & 2 deletions CITATION.cff
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cff-version: "1.1.0"
message: "If you use this software, please cite it using these metadata."
title: ElasticBLAST
version: "0.2.3"
date-released: 2022-02-16
version: "0.2.4"
date-released: 2022-03-04
license: "NCBI Public Domain"
repository-code: "https://github.com/ncbi/elastic-blast/"
authors:
Expand Down
16 changes: 16 additions & 0 deletions bin/results2clustername.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash
# results2clustername.sh: Script to convert ElasticBLAST results to the default
# cluster name
#
# Author: Christiam Camacho ([email protected])
# Created: Thu 08 Apr 2021 04:07:29 PM EDT

if [ $# -ne 1 ] ; then
echo "Usage: $0 <ElasticBLAST results path>"
exit 1
fi
elb_results=$1
md5=md5sum
command -v $md5 >& /dev/null || md5=md5
results_hash=$(printf $elb_results | $md5 | cut -b-9)
echo elasticblast-$USER-$results_hash
7 changes: 7 additions & 0 deletions docker-blast/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ aws-build-from-local-sources: setup.cfg_cloud
gcloud builds submit --config=awscloudbuild.yaml --substitutions=TAG_NAME="${VERSION}",_IMG="${AWS_IMG}",_SERVER="${AWS_SERVER}",_AWS_ECR_PASSWD="`aws ecr-public get-login-password --region ${AWS_REGION}`",_DOCKERFILE='Dockerfile-build-from-local-sources' .
$(MAKE) clean-local-sources

.PHONY: gcp-check
gcp-check:
gcloud builds submit --config test-docker-image-gcp.yaml --substitutions _TAG=$(VERSION),_IMG=${IMG}

.PHONY: aws-check
aws-check:
gcloud builds submit --config test-docker-image-aws.yaml --substitutions _IMG="${AWS_IMG}:${VERSION}"

.PHONY: clean
clean:
Expand Down
23 changes: 23 additions & 0 deletions docker-blast/test-docker-image-aws.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
steps:
- name: '${_IMG}'
args: ['update_blastdb.pl', '--version']
- name: '${_IMG}'
args: ['blastn', '-version-full']
- name: '${_IMG}'
args: ['blastdb_path', '-version-full']
- name: '${_IMG}'
args: ['which', 'vmtouch']
- name: '${_IMG}'
args: ['aws', '--version']
- name: '${_IMG}'
args: ['aws', 's3', '--no-sign-request', 'ls', 's3://ncbi-blast-databases/latest-dir']
- name: '${_IMG}'
args: ['gcloud', '--version']
- name: '${_IMG}'
args: ['printenv', 'BLASTDB', 'PATH']
- name: '${_IMG}'
args: ['fasta-split', '--help']
- name: '${_IMG}'
args: ['splitq_download_db_search', '--version']
- name: '${_IMG}'
args: ['splitq_download_db_search', '--help']
23 changes: 23 additions & 0 deletions docker-blast/test-docker-image-gcp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
steps:
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['update_blastdb.pl', '--version']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['blastn', '-version-full']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['blastdb_path', '-version-full']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['which', 'vmtouch']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['aws', '--version']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['aws', 's3', '--no-sign-request', 'ls', 's3://ncbi-blast-databases/latest-dir']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['gcloud', '--version']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['printenv', 'BLASTDB', 'PATH']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['fasta-split', '--help']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['splitq_download_db_search', '--version']
- name: 'gcr.io/${PROJECT_ID}/${_IMG}:${_TAG}'
args: ['splitq_download_db_search', '--help']
14 changes: 5 additions & 9 deletions docker-job-submit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ SHELL=/bin/bash
.PHONY: all pre-check check clean build publish gcp-build gcp-check gcp-clean

IMG?=ncbi/elasticblast-job-submit
VERSION?=1.2.1
VERSION?=2.0.0
GCP_PROJECT?=$(shell gcloud config get-value project 2>/dev/null)
GCP_TEST_BUCKET?=gs://elasticblast-test/cloud-job-submission
AWS_REGION?=us-east-1
Expand Down Expand Up @@ -76,24 +76,20 @@ aws-build-from-local-sources:
-gcloud builds submit --config awscloudbuild.yaml --substitutions _SERVER=${AWS_SERVER},TAG_NAME=${VERSION},_IMG=${AWS_IMG},_DOCKERFILE=Dockerfile-build-from-local-sources.aws,_AWS_ECR_PASSWD="`aws ecr-public get-login-password --region ${AWS_REGION}`"
rm -fr src bin requirements setup.cfg_cloud setup.py


.PHONY: gcp-test
gcp-test:
gcloud builds submit --timeout=120 --config cloudrun.yaml --substitutions _IMG="gcr.io/${GCP_PROJECT}/${IMG}:${VERSION}",_ELB_GCP_PROJECT="${ELB_GCP_PROJECT}",_ELB_GCP_ZONE="${ELB_GCP_ZONE}",_ELB_RESULTS="${ELB_RESULTS}",_ELB_CLUSTER_NAME="${ELB_CLUSTER_NAME}"

.PHONY: aws-test
aws-test:
.PHONY: aws-check
aws-check:
gcloud builds submit --config awscloudrun.yaml --substitutions _IMG="${AWS_IMG}:${VERSION}",_ELB_RESULTS="${ELB_RESULTS}",_ELB_CLUSTER_NAME="${ELB_CLUSTER_NAME}"

gcp-clean:
-gcloud container images delete gcr.io/${GCP_PROJECT}/${IMG}:${VERSION}

.PHONY: gcp-check
gcp-check:
-gcloud container images list --repository=gcr.io/${GCP_PROJECT}/${IMG}
-gcloud container images list-tags gcr.io/${GCP_PROJECT}/${IMG}
-gcloud container images describe gcr.io/${GCP_PROJECT}/${IMG}:latest
-gcloud container images describe gcr.io/${GCP_PROJECT}/${IMG}:${VERSION}
#gcloud builds submit --config test-cloudbuild.yaml --substitutions _TAG=$(VERSION),_IMG=${IMG}
gcloud builds submit --timeout=120 --config cloudrun.yaml --substitutions _IMG="gcr.io/${GCP_PROJECT}/${IMG}:${VERSION}",_ELB_GCP_PROJECT="${ELB_GCP_PROJECT}",_ELB_GCP_ZONE="${ELB_GCP_ZONE}",_ELB_RESULTS="${ELB_RESULTS}",_ELB_CLUSTER_NAME="${ELB_CLUSTER_NAME}"

gcp-list-tagless-images:
gcloud container images list-tags gcr.io/${GCP_PROJECT}/${IMG} \
Expand Down
4 changes: 2 additions & 2 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ setuptools
importlib-resources
importlib-metadata
pex
boto3==1.20.31
botocore==1.23.33
boto3==1.21.10
botocore==1.24.10
awslimitchecker
tenacity
dataclasses_json
Expand Down
93 changes: 68 additions & 25 deletions src/elastic_blast/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import boto3 # type: ignore
from botocore.exceptions import ClientError, NoCredentialsError, ParamValidationError, WaiterError # type: ignore

from dataclasses_json import dataclass_json
from dataclasses import dataclass, field
from copy import deepcopy

from .util import convert_labels_to_aws_tags, convert_disk_size_to_gb
from .util import convert_memory_to_mb, UserReportError
from .util import ElbSupportedPrograms, get_usage_reporting, sanitize_aws_batch_job_name
Expand Down Expand Up @@ -110,6 +114,37 @@ def check_cluster(cfg: ElasticBlastConfig) -> bool:
return False


@dataclass_json
@dataclass
class JobIds:
"""Serializable store of AWS Batch job ids for query splitting, cloud job
submission and BLAST searches"""
query_splitting: str = ''
job_submission: str = ''
search: List[str] = field(default_factory=list)

def __bool__(self):
"""Boolean value of the object: True if at least one job id is set"""
return bool(self.query_splitting) or bool(self.job_submission) or bool(self.search)

def merge(self, obj):
"""Merge another JobIds object into self"""
if not self.query_splitting and obj.query_splitting:
self.query_splitting = obj.query_splitting
if not self.job_submission and obj.job_submission:
self.job_submission = obj.job_submission
self.search = list(set(self.search + obj.search))

def to_list(self) -> List[str]:
"""Return all jobs ids as a list"""
id_list = [job for job in self.search]
if self.query_splitting:
id_list.append(self.query_splitting)
if self.job_submission:
id_list.append(self.job_submission)
return id_list


class ElasticBlastAws(ElasticBlast):
""" Implementation of core ElasticBLAST functionality in AWS.
Uses a CloudFormation template and AWS Batch for its main operation.
Expand Down Expand Up @@ -146,8 +181,7 @@ def _init(self, cfg: ElasticBlastConfig, create: bool):
self.subnets = None
self._provide_subnets()
self.cf_stack = None
self.job_ids : List[str] = []
self.qs_job_id = None
self.job_ids = JobIds()

initialized = True

Expand Down Expand Up @@ -639,7 +673,7 @@ def cloud_query_split(self, query_files: List[str]) -> None:
jobName=jname,
parameters=parameters,
containerOverrides=overrides)
self.qs_job_id = job['jobId']
self.job_ids.query_splitting = job['jobId']
logging.info(f"Submitted AWS Batch job {job['jobId']} to split query {query_files[0]}")
self.upload_job_ids()
else:
Expand All @@ -653,15 +687,15 @@ def wait_for_cloud_query_split(self) -> None:
"""
if self.dry_run:
return
if not self.qs_job_id:
if not self.job_ids.query_splitting:
msg = 'Query splitting job was not submitted!'
logging.fatal(msg)
raise RuntimeError(msg)

while True:
job_batch = self.batch.describe_jobs(jobs=[self.qs_job_id])['jobs']
job_batch = self.batch.describe_jobs(jobs=[self.job_ids.query_splitting])['jobs']
job_status = job_batch[0]['status']
logging.debug(f'Query splitting job status {job_status} for {self.qs_job_id}')
logging.debug(f'Query splitting job status {job_status} for {self.job_ids.query_splitting}')
if job_status == 'SUCCEEDED':
break
if job_status == 'FAILED':
Expand All @@ -674,7 +708,7 @@ def wait_for_cloud_query_split(self) -> None:
for k in ['exitCode', 'reason']:
if k in container:
failure_details += f'Container{k[0].upper()+k[1:]}: {container[k]} '
msg = f'Query splitting on the cloud failed (jobId={self.qs_job_id})'
msg = f'Query splitting on the cloud failed (jobId={self.job_ids.query_splitting})'
if failure_details: msg += failure_details
logging.fatal(msg)
raise UserReportError(CLUSTER_ERROR, msg)
Expand Down Expand Up @@ -736,11 +770,11 @@ def _cloud_submit(self) -> None:
"parameters": parameters,
"containerOverrides": overrides
}
if self.qs_job_id:
submit_job_args["dependsOn"] = [{'jobId': self.qs_job_id}]
if self.job_ids.query_splitting:
submit_job_args["dependsOn"] = [{'jobId': self.job_ids.query_splitting}]
job = self.batch.submit_job(**submit_job_args)
logging.info(f'Submitted AWS Batch job {job["jobId"]} to submit search jobs')
self.job_ids.append(job['jobId'])
self.job_ids.job_submission = job['jobId']
self.upload_job_ids()


Expand All @@ -751,8 +785,6 @@ def client_submit(self, query_batches: List[str], one_stage_cloud_query_split: b
query_batches - list of bucket names of queries to submit
one_stage_cloud_query_split - do the query split in the cloud as a part
of executing a regular job """
self.job_ids = []

prog = self.cfg.blast.program

if self.cfg.cluster.db_source != DBSource.AWS:
Expand Down Expand Up @@ -829,10 +861,10 @@ def is_int(value: str):
"parameters": parameters,
"containerOverrides": overrides
}
if self.qs_job_id:
submit_job_args["dependsOn"] = [{'jobId': self.qs_job_id}]
if self.job_ids.query_splitting:
submit_job_args["dependsOn"] = [{'jobId': self.job_ids.query_splitting}]
job = self.batch.submit_job(**submit_job_args)
self.job_ids.append(job['jobId'])
self.job_ids.search.append(job['jobId'])
logging.debug(f"Job definition parameters for job {job['jobId']} {parameters}")
logging.info(f"Submitted AWS Batch job {job['jobId']} with query {q}")
else:
Expand Down Expand Up @@ -873,15 +905,23 @@ def get_job_ids(self) -> List[str]:
def upload_job_ids(self) -> None:
"""Save AWS Batch job ids in a metadata file in S3, if the metadata
file is present, append job ids"""
current_job_ids = deepcopy(self.job_ids)
self._load_job_ids_from_aws()
current_job_ids.merge(self.job_ids)
self.job_ids = current_job_ids

bucket_name, key = parse_bucket_name_key(f'{self.results_bucket}/{ELB_METADATA_DIR}/{ELB_AWS_JOB_IDS}')
bucket = self.s3.Bucket(bucket_name)
job_ids = self.job_ids
if self.qs_job_id:
job_ids.append(self.qs_job_id)
job_ids = list(set(job_ids))
bucket.put_object(Body=json.dumps(job_ids).encode(), Key=key)
logging.debug(f'Uploaded {len(job_ids)} job IDs to {self.results_bucket}/{ELB_METADATA_DIR}/{ELB_AWS_JOB_IDS}')
bucket.put_object(Body=self.job_ids.to_json().encode(), Key=key) # type: ignore
logging.debug(f'Uploaded job IDs to {self.results_bucket}/{ELB_METADATA_DIR}/{ELB_AWS_JOB_IDS}')

# This code is needed for janitor backward compatibility in version
# 0.2.4, and can be removed when the ElasticBLAST janitor is upgraded to version 0.2.4.
ELB_AWS_OLD_JOB_IDS = 'job-ids.json'
bucket_name, key = parse_bucket_name_key(f'{self.results_bucket}/{ELB_METADATA_DIR}/{ELB_AWS_OLD_JOB_IDS}')
bucket = self.s3.Bucket(bucket_name)
bucket.put_object(Body=json.dumps(self.job_ids.to_list()).encode(), Key=key)
logging.debug(f'Uploaded job IDs to {self.results_bucket}/{ELB_METADATA_DIR}/{ELB_AWS_OLD_JOB_IDS}')


def upload_query_length(self, query_length: int) -> None:
Expand Down Expand Up @@ -920,6 +960,8 @@ def check_status(self, extended=False) -> Tuple[ElbStatus, Dict[str, int], str]:
elif njobs == 0:
# This is likely the case when dry-run is set to True
retval = ElbStatus.UNKNOWN
elif (self.job_ids.query_splitting or self.job_ids.job_submission) and not self.job_ids.search:
retval = ElbStatus.SUBMITTING
elif running > 0 or pending > 0:
retval = ElbStatus.RUNNING
elif (pending + running + failed) == 0 and succeeded == njobs:
Expand All @@ -941,8 +983,8 @@ def _load_job_ids_from_aws(self):
try:
bucket.download_file(key, tmp.name)
with open(tmp.name) as f_ids:
self.job_ids += json.load(f_ids)
self.job_ids = list(set(self.job_ids))
new_job_ids = JobIds.from_json(f_ids.read())
self.job_ids.merge(new_job_ids)
except ClientError as err:
err_code = err.response['Error']['Code']
fnx_name = inspect.stack()[0].function
Expand All @@ -965,11 +1007,12 @@ def _check_status(self, extended) -> Tuple[Dict[str, int], str]:

if not self.job_ids:
self._load_job_ids_from_aws()
job_ids = self.job_ids.to_list()

# check status of jobs in batches of JOB_BATCH_NUM
JOB_BATCH_NUM = 100
for i in range(0, len(self.job_ids), JOB_BATCH_NUM):
job_batch = self.batch.describe_jobs(jobs=self.job_ids[i:i + JOB_BATCH_NUM])['jobs']
for i in range(0, len(job_ids), JOB_BATCH_NUM):
job_batch = self.batch.describe_jobs(jobs=job_ids[i:i + JOB_BATCH_NUM])['jobs']
# get number for AWS Batch job states
for st in AWS_BATCH_JOB_STATES:
counts[st] += sum([j['status'] == st for j in job_batch])
Expand Down
4 changes: 2 additions & 2 deletions src/elastic_blast/commands/run_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from botocore.exceptions import ClientError #type: ignore
from elastic_blast.base import PositiveInteger
from elastic_blast.aws_traits import create_aws_config
from elastic_blast.aws import handle_aws_error
from elastic_blast.aws import handle_aws_error, JobIds
from elastic_blast.util import safe_exec
from elastic_blast.filehelper import parse_bucket_name_key
from elastic_blast.constants import ELB_AWS_JOB_IDS, ELB_QUERY_LENGTH, ELB_METADATA_DIR
Expand Down Expand Up @@ -494,7 +494,7 @@ def _read_job_logs_aws(cfg, write_logs):
bucket, key = parse_bucket_name_key(fname)
resp = s3.get_object(Bucket=bucket, Key=key)
body = resp['Body']
job_list = json.loads(body.read().decode())
job_list = JobIds.from_json(body.read().decode()).to_list()

write_logs.write('AWS job log dump\n')

Expand Down
2 changes: 1 addition & 1 deletion src/elastic_blast/commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _status(args, cfg: ElasticBlastConfig, clean_up_stack: List[Any]) -> int:
while True:
status, counts, verbose_result = elastic_blast.check_status(args.verbose)
result = str(status)
if counts:
if status == ElbStatus.RUNNING and counts and sum(counts.values()) > 0:
result = '\n'.join([f'{x} {counts[x.lower()]}' for x in
('Pending', 'Running', 'Succeeded', 'Failed')
])
Expand Down
4 changes: 2 additions & 2 deletions src/elastic_blast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class ElbExecutionMode(Enum):
ELB_BACKEND_LOG = 'backends.log'
ELB_TAXIDLIST_FILE = 'taxidlist.txt'
ELB_META_CONFIG_FILE = 'elastic-blast-config.json'
ELB_AWS_JOB_IDS = 'job-ids.json'
ELB_AWS_JOB_IDS = 'job-ids-v2.json'
ELB_QUERY_LENGTH = 'query_length.txt'
ELB_GCP_BATCH_LIST = 'batch_list.txt'
# this file contents should match the number of lines in ELB_GCP_BATCH_LIST
Expand Down Expand Up @@ -203,7 +203,7 @@ def __str__(self):
ELB_DOCKER_VERSION = '1.0.3'
ELB_QS_DOCKER_VERSION = '0.1.2'
ELB_JANITOR_DOCKER_VERSION = '0.2.0'
ELB_JOB_SUBMIT_DOCKER_VERSION = '1.2.1'
ELB_JOB_SUBMIT_DOCKER_VERSION = '2.0.0'

ELB_DOCKER_IMAGE_GCP = f'gcr.io/ncbi-sandbox-blast/ncbi/elb:{ELB_DOCKER_VERSION}'
ELB_DOCKER_IMAGE_AWS = f'public.ecr.aws/ncbi-elasticblast/elasticblast-elb:{ELB_DOCKER_VERSION}'
Expand Down
1 change: 0 additions & 1 deletion src/elastic_blast/filehelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ def open_for_read(fname):
body.readable = lambda: True
body.writable = lambda: False
body.seekable = lambda: False
body.closed = False
body.flush = lambda: None
if tarred or gzipped:
fileobj = unpack_stream(body, gzipped, tarred)
Expand Down
2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ def Object(self, bucket, key):
return obj


class MockedStream(str):
class MockedStream(io.IOBase):
"""A string stream class needed for mocked downloads from S3, used by
filehelper.open_for_read"""
def __init__(self, data):
Expand Down

0 comments on commit f5feafd

Please sign in to comment.