Skip to content

Commit

Permalink
Ongoing outputs implementation #45
Browse files Browse the repository at this point in the history
  • Loading branch information
Javier Carnero committed Jun 20, 2018
1 parent 8280a90 commit 0e31db5
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 50 deletions.
14 changes: 14 additions & 0 deletions hpc_plugin/external_repositories/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
########
# Copyright (c) 2017-2018 MSO4SC - [email protected]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
22 changes: 22 additions & 0 deletions hpc_plugin/external_repositories/ckan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from external_repository import ExternalRepository


class Ckan(ExternalRepository):

def __init__(self, publish_item):
super().__init__(publish_item)

self.entrypoint = publish_item['entrypoint']
self.api_key = publish_item['api_key']
self.dataset = publish_item['dataset']
self.file_path = publish_item['file_path']

def _build_publish_call(self, logger):
# detach call??
operation = "create"
# TODO: if resource file exists, operation="update"
call = "curl -H'Authorization: " + self.api_key + "' " + \
"'" + self.entrypoint + "/api/action/resource_" + operation + "' " + \
"--form upload=@ " + self.file_path + \
"--form package_id=" + self.package_id
return call
47 changes: 47 additions & 0 deletions hpc_plugin/external_repositories/external_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from hpc_plugin.ssh import SshClient


class ExternalRepository(object):

def factory(publish_item):
if publish_item['type'] == "CKAN": # TODO: manage key error
from ckan import Ckan
return Ckan(publish_item)
else:
return None
factory = staticmethod(factory)

def __init__(self, publish_item):
self.er_type = publish_item['type'] # TODO: manage key error

def publish(self,
ssh_client,
logger,
workdir=None):
"""
Publish the local file in the external repository
@type ssh_client: SshClient
@param ssh_client: ssh client connected to an HPC login node
@rtype string
@return TODO:
"""
if not SshClient.check_ssh_client(ssh_client, logger):
return False

call = self._build_publish_call(logger)
if call is None:
return False

return ssh_client.execute_shell_command(
call,
workdir=workdir)

def _build_publish_call(self, logger):
"""
Creates a script to publish the local file
@rtype string
@return string with the publish call. None if an error arise.
"""
raise NotImplementedError("'_build_publish_call' not implemented.")
22 changes: 22 additions & 0 deletions hpc_plugin/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ def close_connection(self):
if self._tunnel is not None:
self._tunnel.close()

def execute_shell_command(self,
cmd,
workdir=None,
wait_result=False):
if not workdir:
return ssh_client.send_command(cmd,
wait_result=wait_result)
else:
call = "export CURRENT_WORKDIR=" + workdir + " && "
call += "cd " + workdir + " && "
call += cmd
return ssh_client.send_command(call,
wait_result=wait_result)

def send_command(self,
command,
exec_timeout=3000,
Expand Down Expand Up @@ -178,6 +192,14 @@ def send_command(self,
else:
return False

@staticmethod
def check_ssh_client(ssh_client,
logger):
if not isinstance(ssh_client, SshClient) or not ssh_client.is_open():
logger.error("SSH Client can't be used")
return False
return True


class SshForward(object):
"""Represents a ssh port forwarding"""
Expand Down
52 changes: 50 additions & 2 deletions hpc_plugin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from ssh import SshClient
from workload_managers.workload_manager import WorkloadManager
from external_repositories.external_repository import ExternalRepository


@operation
Expand Down Expand Up @@ -76,6 +77,7 @@ def prepare_hpc(config,
wait_result=True)

if exit_code is not 0:
client.close_connection()
raise NonRecoverableError(
"failed to connect to HPC: exit code " + str(exit_code))

Expand Down Expand Up @@ -111,6 +113,7 @@ def cleanup_hpc(config, skip, simulate, **kwargs): # pylint: disable=W0613
wm_type = config['workload_manager']
wm = WorkloadManager.factory(wm_type)
if not wm:
client.close_connection()
raise NonRecoverableError(
"Workload Manager '" +
wm_type +
Expand Down Expand Up @@ -355,6 +358,7 @@ def send_job(job_options, **kwargs): # pylint: disable=W0613

wm = WorkloadManager.factory(wm_type)
if not wm:
client.close_connection()
raise NonRecoverableError(
"Workload Manager '" +
wm_type +
Expand Down Expand Up @@ -398,9 +402,10 @@ def cleanup_job(job_options, skip, **kwargs): # pylint: disable=W0613

client = SshClient(ctx.instance.runtime_properties['credentials'])

# TODO(emepetres): manage errors
# TODO: manage errors
wm = WorkloadManager.factory(wm_type)
if not wm:
client.close_connection()
raise NonRecoverableError(
"Workload Manager '" +
wm_type +
Expand Down Expand Up @@ -443,9 +448,10 @@ def stop_job(job_options, **kwargs): # pylint: disable=W0613
wm_type = ctx.instance.runtime_properties['workload_manager']
client = SshClient(ctx.instance.runtime_properties['credentials'])

# TODO(emepetres): manage errors
# TODO: manage errors
wm = WorkloadManager.factory(wm_type)
if not wm:
client.close_connection()
raise NonRecoverableError(
"Workload Manager '" +
wm_type +
Expand Down Expand Up @@ -473,3 +479,45 @@ def stop_job(job_options, **kwargs): # pylint: disable=W0613
except KeyError:
# The job wasn't configured properly, no need to be stopped
ctx.logger.warning('Job was not stopped as it was not configured.')


@operation
def publish(publish_options, **kwargs):
""" Publish the job outputs """
try:
simulate = ctx.instance.runtime_properties['simulate']

name = kwargs['name']
is_singularity = 'hpc.nodes.singularity_job' in ctx.node.\
type_hierarchy

if not simulate:
workdir = ctx.instance.runtime_properties['workdir']
client = SshClient(ctx.instance.runtime_properties['credentials'])

for publish_item in publish_options:
er = ExternalRepository.factory(publish_item)
if not er:
client.close_connection()
raise NonRecoverableError(
"External repository '" +
publish_item['type'] + # TODO: manage key error
"' not supported.")

client.close_connection()
else:
ctx.logger.warning('Instance ' + ctx.instance.id + ' simulated')
is_stopped = True

if is_stopped:
ctx.logger.info(
'Job ' + name + ' (' + ctx.instance.id + ') stopped.')
else:
ctx.logger.error('Job ' + name + ' (' + ctx.instance.id +
') not stopped.')
raise NonRecoverableError('Job ' + name + ' (' + ctx.instance.id +
') not stopped.')
except KeyError:
# The job wasn't configured properly, no need to be stopped
ctx.logger.warning(
'Job outputs where not published as it was not configured.')
100 changes: 100 additions & 0 deletions hpc_plugin/tests/blueprint/blueprint_sbatch_output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
########
# Copyright (c) 2017 MSO4SC - [email protected]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

tosca_definitions_version: cloudify_dsl_1_3

imports:
# to speed things up, it is possible downloading this file,
# - http://www.getcloudify.org/spec/cloudify/4.1/types.yaml
- http://raw.githubusercontent.com/mso4sc/cloudify-hpc-plugin/master/resources/types/cfy_types.yaml
# relative import of plugin.yaml that resides in the blueprint directory
- hpc_plugin/test_plugin.yaml

inputs:
# Monitor
monitor_entrypoint:
description: Monitor entrypoint IP
default: ""
type: string

# Job prefix name
job_prefix:
description: Job name prefix in HPCs
default: "cfyhpc"
type: string

partition:
description: Partition in which the jobs will run
default: "public"
type: string

mso4sc_hpc_primary:
description: Configuration for the primary HPC to be used
default: {}

mso4sc_datacatalogue:
description: entrypoint of the data catalogue
default: ""

mso4sc_publish_key:
description: API Key to publish the outputs
default: ""

publish_dataset_id:
description: ID of the CKAN dataset
default: ""

publish_outputs_path:
description: Local path to the outputs to be uploaded
default: ""

node_templates:
first_hpc:
type: hpc.nodes.Compute
properties:
config: { get_input: mso4sc_hpc_primary }
external_monitor_entrypoint: { get_input: monitor_entrypoint }
job_prefix: { get_input: job_prefix }
workdir_prefix: "single_sbatch"
skip_cleanup: True
simulate: True # COMMENT to test against a real HPC

single_job:
type: hpc.nodes.job
properties:
job_options:
type: 'SBATCH'
command: "touch.script single.test"
deployment:
bootstrap: 'scripts/bootstrap_sbatch_example.sh'
revert: 'scripts/revert_sbatch_example.sh'
inputs:
- 'single'
- { get_input: partition }
publish:
- type: "CKAN"
entrypoint: { get_input: mso4sc_datacatalogue}
api_key: { get_input: mso4sc_publish_key }
dataset: { get_input: publish_dataset_id }
file_path: { get_input: publish_outputs_path }
skip_cleanup: True
relationships:
- type: job_contained_in_hpc
target: first_hpc

outputs:
single_job_name:
description: single job name in the HPC
value: { get_attribute: [single_job, job_name] }
8 changes: 8 additions & 0 deletions hpc_plugin/tests/blueprint/hpc_plugin/test_plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ node_types:
default: {}
job_options:
description: Job main command and options
publish:
description: Config to publish its outputs
default: []
skip_cleanup:
description: True to not clean after execution (debug purposes)
type: boolean
Expand Down Expand Up @@ -154,6 +157,11 @@ node_types:
inputs:
job_options:
default: { get_property: [SELF, job_options] }
publish:
implementation: hpc.hpc_plugin.tasks.publish
inputs:
publish_options:
default: { get_property: [SELF, publish] }
cleanup:
implementation: hpc.hpc_plugin.tasks.cleanup_job
inputs:
Expand Down
27 changes: 27 additions & 0 deletions hpc_plugin/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ def queue(self):
# print result.task.dump()
return result.task

def publish(self):
""" Send the instance to the HPC queue if it is a Job """
if not self.parent_node.is_job:
return

self.winstance.send_event('Publishing HPC job..')
result = self.winstance.execute_operation('hpc.interfaces.'
'lifecycle.publish',
kwargs={"name": self.name})
# TODO: How to do it in non-blocking??
result.task.wait_for_terminated()
if result.task.get_state() != tasks.TASK_FAILED:
self.winstance.send_event('..HPC job published')

return result.task

def set_status(self, status):
""" Update the instance state """
if not status == self._status:
Expand Down Expand Up @@ -201,6 +217,17 @@ def queue_all_instances(self):
self.status = 'QUEUED'
return tasks_list

def publish(self):
""" Send all instances to the HPC queue if it represents a Job """
if not self.is_job:
return []

tasks_list = []
for job_instance in self.instances:
tasks_list.append(job_instance.publish())

return tasks_list

def is_ready(self):
""" True if it has no more dependencies to satisfy """
return self.parent_depencencies_left == 0
Expand Down
Loading

0 comments on commit 0e31db5

Please sign in to comment.