Skip to content

Commit

Permalink
Preliminar data outputs working #45
Browse files Browse the repository at this point in the history
- Publish data on public and private datasets after the job has finished
- Logs improvement #64
  • Loading branch information
Javier Carnero committed Jun 21, 2018
1 parent 0e31db5 commit 7d3cc9c
Show file tree
Hide file tree
Showing 18 changed files with 256 additions and 126 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ script:
- tox -e $TOX_ENV
branches:
only:
- master
- master
- canary
30 changes: 26 additions & 4 deletions hpc_plugin/external_repositories/ckan.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
########
# Copyright (c) 2017-2018 MSO4SC - [email protected]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Ckan specific communication to publish data """


from external_repository import ExternalRepository


class Ckan(ExternalRepository):

def __init__(self, publish_item):
super().__init__(publish_item)
super(Ckan, self).__init__(publish_item)

self.entrypoint = publish_item['entrypoint']
self.api_key = publish_item['api_key']
self.dataset = publish_item['dataset']
self.file_path = publish_item['file_path']
self.name = publish_item['name']
self.description = publish_item["description"]

def _build_publish_call(self, logger):
# detach call??
operation = "create"
# TODO: if resource file exists, operation="update"
call = "curl -H'Authorization: " + self.api_key + "' " + \
"'" + self.entrypoint + "/api/action/resource_" + operation + "' " + \
"--form upload=@ " + self.file_path + \
"--form package_id=" + self.package_id
"'" + self.entrypoint + "/api/action/resource_" + \
operation + "' " + \
"--form upload=@" + self.file_path + " " + \
"--form package_id=" + self.dataset + " " + \
"--form name=" + self.name + " " + \
"--form description='" + self.description + "'"
return call
26 changes: 22 additions & 4 deletions hpc_plugin/external_repositories/external_repository.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
########
# Copyright (c) 2017-2018 MSO4SC - [email protected]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Holds the external repository common behaviour """


from hpc_plugin.ssh import SshClient


class ExternalRepository(object):

def factory(publish_item):
if publish_item['type'] == "CKAN": # TODO: manage key error
if publish_item['type'] == "CKAN":
from ckan import Ckan
return Ckan(publish_item)
else:
return None
factory = staticmethod(factory)

def __init__(self, publish_item):
self.er_type = publish_item['type'] # TODO: manage key error
self.er_type = publish_item['type']

def publish(self,
ssh_client,
Expand All @@ -24,7 +41,7 @@ def publish(self,
@type ssh_client: SshClient
@param ssh_client: ssh client connected to an HPC login node
@rtype string
@return TODO:
@return False if something went wrong
"""
if not SshClient.check_ssh_client(ssh_client, logger):
return False
Expand All @@ -35,7 +52,8 @@ def publish(self,

return ssh_client.execute_shell_command(
call,
workdir=workdir)
workdir=workdir,
wait_result=False) # TODO: poner a true

def _build_publish_call(self, logger):
"""
Expand Down
14 changes: 9 additions & 5 deletions hpc_plugin/ssh.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
########
# Copyright (c) 2017 MSO4SC - [email protected]
# Copyright (c) 2017-2018 MSO4SC - [email protected]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
"""
import select
import thread
import logging
import cStringIO

try:
Expand All @@ -30,6 +31,8 @@

from paramiko import client, RSAKey

logging.getLogger("paramiko").setLevel(logging.WARNING)


class SshClient(object):
"""Represents a ssh client"""
Expand Down Expand Up @@ -94,14 +97,15 @@ def execute_shell_command(self,
workdir=None,
wait_result=False):
if not workdir:
return ssh_client.send_command(cmd,
wait_result=wait_result)
return self.send_command(cmd,
wait_result=wait_result)
else:
# TODO: set scale variables as well
call = "export CURRENT_WORKDIR=" + workdir + " && "
call += "cd " + workdir + " && "
call += cmd
return ssh_client.send_command(call,
wait_result=wait_result)
return self.send_command(call,
wait_result=wait_result)

def send_command(self,
command,
Expand Down
88 changes: 52 additions & 36 deletions hpc_plugin/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
########
# Copyright (c) 2017 MSO4SC - [email protected]
# Copyright (c) 2017-2018 MSO4SC - [email protected]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
""" Holds the plugin tasks """

import requests
import traceback
from cloudify import ctx
from cloudify.decorators import operation
from cloudify.exceptions import NonRecoverableError
Expand Down Expand Up @@ -72,9 +73,9 @@ def prepare_hpc(config,
wm_type +
"' not supported.")
client = SshClient(config['credentials'])
_, exit_code = wm._execute_shell_command(client,
'uname',
wait_result=True)
_, exit_code = client.execute_shell_command(
'uname',
wait_result=True)

if exit_code is not 0:
client.close_connection()
Expand Down Expand Up @@ -113,15 +114,14 @@ def cleanup_hpc(config, skip, simulate, **kwargs): # pylint: disable=W0613
wm_type = config['workload_manager']
wm = WorkloadManager.factory(wm_type)
if not wm:
client.close_connection()
raise NonRecoverableError(
"Workload Manager '" +
wm_type +
"' not supported.")
client = SshClient(config['credentials'])
_, exit_code = wm._execute_shell_command(client,
'rm -r ' + workdir,
wait_result=True)
_, exit_code = client.execute_shell_command(
'rm -r ' + workdir,
wait_result=True)
client.close_connection()
ctx.logger.info('..all clean.')
else:
Expand Down Expand Up @@ -322,19 +322,19 @@ def deploy_job(script,
call = "./" + name
for dinput in inputs:
call += ' ' + dinput
_, exit_code = wm._execute_shell_command(client,
call,
workdir=workdir,
wait_result=True)
_, exit_code = client.execute_shell_command(
call,
workdir=workdir,
wait_result=True)
if exit_code is not 0:
logger.warning(
"failed to deploy job: call '" + call + "', exit code " +
str(exit_code))

if not skip_cleanup:
if not wm._execute_shell_command(client,
"rm " + name,
workdir=workdir):
if not client.execute_shell_command(
"rm " + name,
workdir=workdir):
logger.warning("failed removing bootstrap script")

client.close_connection()
Expand Down Expand Up @@ -393,6 +393,11 @@ def cleanup_job(job_options, skip, **kwargs): # pylint: disable=W0613

try:
simulate = ctx.instance.runtime_properties['simulate']
except KeyError:
# The job wasn't configured properly, so no cleanup needed
ctx.logger.warning('Job was not cleaned up as it was not configured.')

try:
name = kwargs['name']
if not simulate:
is_singularity = 'hpc.nodes.singularity_job' in ctx.node.\
Expand All @@ -402,7 +407,6 @@ def cleanup_job(job_options, skip, **kwargs): # pylint: disable=W0613

client = SshClient(ctx.instance.runtime_properties['credentials'])

# TODO: manage errors
wm = WorkloadManager.factory(wm_type)
if not wm:
client.close_connection()
Expand All @@ -428,17 +432,22 @@ def cleanup_job(job_options, skip, **kwargs): # pylint: disable=W0613
else:
ctx.logger.error('Job ' + name + ' (' + ctx.instance.id +
') not cleaned.')
except KeyError:
# The job wasn't configured properly, so no cleanup needed
ctx.logger.warning('Job was not cleaned up as it was not configured.')
except Exception as exp:
print(traceback.format_exc())
ctx.logger.error(
'Something happend when trying to clean up: ' + exp.message)


@operation
def stop_job(job_options, **kwargs): # pylint: disable=W0613
""" Stops a job in the HPC """
try:
simulate = ctx.instance.runtime_properties['simulate']
except KeyError:
# The job wasn't configured properly, no need to be stopped
ctx.logger.warning('Job was not stopped as it was not configured.')

try:
name = kwargs['name']
is_singularity = 'hpc.nodes.singularity_job' in ctx.node.\
type_hierarchy
Expand All @@ -448,7 +457,6 @@ def stop_job(job_options, **kwargs): # pylint: disable=W0613
wm_type = ctx.instance.runtime_properties['workload_manager']
client = SshClient(ctx.instance.runtime_properties['credentials'])

# TODO: manage errors
wm = WorkloadManager.factory(wm_type)
if not wm:
client.close_connection()
Expand Down Expand Up @@ -476,48 +484,56 @@ def stop_job(job_options, **kwargs): # pylint: disable=W0613
') not stopped.')
raise NonRecoverableError('Job ' + name + ' (' + ctx.instance.id +
') not stopped.')
except KeyError:
# The job wasn't configured properly, no need to be stopped
ctx.logger.warning('Job was not stopped as it was not configured.')
except Exception as exp:
print(traceback.format_exc())
ctx.logger.error(
'Something happend when trying to stop: ' + exp.message)


@operation
def publish(publish_options, **kwargs):
""" Publish the job outputs """
try:
simulate = ctx.instance.runtime_properties['simulate']
except KeyError as exp:
# The job wasn't configured properly, no need to publish
ctx.logger.warning(
'Job outputs where not published as' +
' the job was not configured properly.')
return

try:
name = kwargs['name']
is_singularity = 'hpc.nodes.singularity_job' in ctx.node.\
type_hierarchy

published = True
if not simulate:
workdir = ctx.instance.runtime_properties['workdir']
client = SshClient(ctx.instance.runtime_properties['credentials'])

for publish_item in publish_options:
if not published:
break
er = ExternalRepository.factory(publish_item)
if not er:
client.close_connection()
raise NonRecoverableError(
"External repository '" +
publish_item['type'] + # TODO: manage key error
publish_item['type'] +
"' not supported.")
published = er.publish(client, ctx.logger, workdir)

client.close_connection()
else:
ctx.logger.warning('Instance ' + ctx.instance.id + ' simulated')
is_stopped = True

if is_stopped:
if published:
ctx.logger.info(
'Job ' + name + ' (' + ctx.instance.id + ') stopped.')
'Job ' + name + ' (' + ctx.instance.id + ') published.')
else:
ctx.logger.error('Job ' + name + ' (' + ctx.instance.id +
') not stopped.')
') not published.')
raise NonRecoverableError('Job ' + name + ' (' + ctx.instance.id +
') not stopped.')
except KeyError:
# The job wasn't configured properly, no need to be stopped
ctx.logger.warning(
'Job outputs where not published as it was not configured.')
') not published.')
except Exception as exp:
print(traceback.format_exc())
ctx.logger.error(
'Cannot publish: ' + exp.message)
20 changes: 5 additions & 15 deletions hpc_plugin/tests/blueprint/blueprint-inputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,8 @@ mso4sc_hpc_primary:
country_tz: "Europe/Madrid"
workload_manager: "SLURM"

# Second HPC configuration
mso4sc_hpc_secondary:
credentials:
host: "HOST"
user: "USER"
password: "PASSWD"
tunnel:
host: "TUNNEL_HOST"
user: "TUNNEL_USER"
private_key: |
-----BEGIN RSA PRIVATE KEY-----
....
-----END RSA PRIVATE KEY-----
country_tz: "Europe/Paris"
workload_manager: "SLURM"
mso4sc_datacatalogue_entrypoint: "http://193.144.35.207"

mso4sc_datacatalogue_key: "****"

mso4sc_outdataset_outputs_at: "dataset_id"
Loading

0 comments on commit 7d3cc9c

Please sign in to comment.