Skip to content

Commit

Permalink
Merge pull request #278 from 4dn-dcic/ec2booting
Browse files Browse the repository at this point in the history
Ec2booting
  • Loading branch information
SooLee authored Apr 13, 2020
2 parents 64c88b0 + 49b9e8b commit 5a6bf3f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 18 deletions.
25 changes: 22 additions & 3 deletions tests/tibanna/unicorn/check_task_awsem/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
from tibanna.exceptions import (
EC2StartingException,
StillRunningException,
MetricRetrievalException
MetricRetrievalException,
EC2IdleException
)
import pytest
import boto3
import random
import string
import json
from datetime import datetime
from datetime import datetime, timedelta
from dateutil.tz import tzutc
from tibanna.vars import AWSEM_TIME_STAMP_FORMAT


@pytest.fixture()
def check_task_input():
Expand All @@ -31,19 +35,34 @@ def test_check_task_awsem_fails_if_no_job_started(check_task_input, s3):
jobid = 'notmyjobid'
check_task_input_modified = check_task_input
check_task_input_modified['jobid'] = jobid
check_task_input_modified['config']['start_time'] = datetime.strftime(datetime.now(tzutc()) - timedelta(minutes=4),
AWSEM_TIME_STAMP_FORMAT)
job_started = "%s.job_started" % jobid
s3.delete_objects(Delete={'Objects': [{'Key': job_started}]})
with pytest.raises(EC2StartingException) as excinfo:
service.handler(check_task_input_modified, '')
assert 'Failed to find jobid' in str(excinfo.value)


@pytest.mark.webtest
def test_check_task_awsem_fails_if_no_job_started_for_too_long(check_task_input, s3):
# ensure there is no job started
jobid = 'notmyjobid'
check_task_input_modified = check_task_input
check_task_input_modified['jobid'] = jobid
check_task_input_modified['config']['start_time'] = datetime.strftime(datetime.now(tzutc()) - timedelta(minutes=13),
AWSEM_TIME_STAMP_FORMAT)
job_started = "%s.job_started" % jobid
s3.delete_objects(Delete={'Objects': [{'Key': job_started}]})
with pytest.raises(EC2IdleException) as excinfo:
service.handler(check_task_input_modified, '')
assert 'Failed to find jobid' in str(excinfo.value)


@pytest.mark.webtest
def test_check_task_awsem_throws_exception_if_not_done(check_task_input):
with pytest.raises(StillRunningException) as excinfo:
service.handler(check_task_input, '')

assert 'still running' in str(excinfo.value)
assert 'error' not in check_task_input

Expand Down
2 changes: 1 addition & 1 deletion tibanna/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Version information."""

# The following line *must* be the last in the module, exactly as formatted:
__version__ = "0.17.0"
__version__ = "0.17.1"
5 changes: 3 additions & 2 deletions tibanna/awsem.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .base import SerializableObject
from .ec2_utils import Config
from .exceptions import MalFormattedPostrunJsonException
from .vars import AWSEM_TIME_STAMP_FORMAT


class AwsemRunJson(SerializableObject):
Expand Down Expand Up @@ -31,7 +32,7 @@ def update(self, **kwargs):

@property
def start_time_as_str(self):
return datetime.strptime(self.start_time, '%Y%m%d-%H:%M:%S-UTC')
return datetime.strptime(self.start_time, AWSEM_TIME_STAMP_FORMAT)


class AwsemRunJsonApp(SerializableObject):
Expand Down Expand Up @@ -130,7 +131,7 @@ def create_Output(self, Output):
@property
def end_time_as_str(self):
try:
return datetime.strptime(self.end_time, '%Y%m%d-%H:%M:%S-UTC')
return datetime.strptime(self.end_time, AWSEM_TIME_STAMP_FORMAT)
except:
return None

Expand Down
32 changes: 20 additions & 12 deletions tibanna/check_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
MetricRetrievalException,
AWSEMErrorHandler
)
from .vars import PARSE_AWSEM_TIME
from .core import API


Expand All @@ -40,32 +41,40 @@ def __init__(self, input_json):
self.input_json = copy.deepcopy(input_json)

def run(self):
input_json_copy = self.input_json

# s3 bucket that stores the output
bucket_name = input_json_copy['config']['log_bucket']
bucket_name = self.input_json['config']['log_bucket']
instance_id = self.input_json['config'].get('instance_id', '')

# info about the jobby job
jobid = input_json_copy['jobid']
jobid = self.input_json['jobid']
job_started = "%s.job_started" % jobid
job_success = "%s.success" % jobid
job_error = "%s.error" % jobid

public_postrun_json = input_json_copy['config'].get('public_postrun_json', False)
public_postrun_json = self.input_json['config'].get('public_postrun_json', False)

# check to see ensure this job has started else fail
if not does_key_exist(bucket_name, job_started):
start_time = PARSE_AWSEM_TIME(self.input_json['config']['start_time'])
now = datetime.now(tzutc())
# terminate the instance if EC2 is not booting for more than 10 min.
if start_time + timedelta(minutes=10) < now:
try:
boto3.client('ec2').terminate_instances(InstanceIds=[instance_id])
except:
pass # most likely already terminated or never initiated
raise EC2IdleException("Failed to find jobid %s, ec2 is not initializing for too long. Terminating the instance." % jobid)
raise EC2StartingException("Failed to find jobid %s, ec2 is probably still booting" % jobid)

# check to see if job has error, report if so
if does_key_exist(bucket_name, job_error):
try:
self.handle_postrun_json(bucket_name, jobid, input_json_copy, public_read=public_postrun_json)
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json)
except Exception as e:
printlog("error handling postrun json %s" % str(e))
eh = AWSEMErrorHandler()
if 'custom_errors' in input_json_copy['args']:
eh.add_custom_errors(input_json_copy['args']['custom_errors'])
if 'custom_errors' in self.input_json['args']:
eh.add_custom_errors(self.input_json['args']['custom_errors'])
log = API().log(job_id=jobid)
ex = eh.parse_log(log)
if ex:
Expand All @@ -76,13 +85,12 @@ def run(self):

# check to see if job has completed
if does_key_exist(bucket_name, job_success):
self.handle_postrun_json(bucket_name, jobid, input_json_copy, public_read=public_postrun_json)
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json)
print("completed successfully")
return input_json_copy
return self.input_json

# checking if instance is terminated for no reason
instance_id = input_json_copy['config'].get('instance_id', '')
if instance_id: # skip test for instance_id by not giving it to input_json_copy
if instance_id: # skip test for instance_id by not giving it to self.input_json
try:
res = boto3.client('ec2').describe_instances(InstanceIds=[instance_id])
except Exception as e:
Expand Down
10 changes: 10 additions & 0 deletions tibanna/vars.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import boto3
import sys
from datetime import datetime
from dateutil.tz import tzutc


if boto3.session.Session().get_credentials() is None:
Expand Down Expand Up @@ -78,6 +80,14 @@
# field name reserved for Tibanna setting
_tibanna = '_tibanna'

# Awsem time stamp format
AWSEM_TIME_STAMP_FORMAT = '%Y%m%d-%H:%M:%S-UTC'


def PARSE_AWSEM_TIME(t_str):
t = datetime.strptime(t_str, AWSEM_TIME_STAMP_FORMAT)
return t.replace(tzinfo=tzutc())


SFN_TYPE = 'unicorn'
LAMBDA_TYPE = ''
Expand Down

0 comments on commit 5a6bf3f

Please sign in to comment.