diff --git a/hpc_plugin/external_repositories/__init__.py b/hpc_plugin/external_repositories/__init__.py new file mode 100644 index 0000000..fac5f99 --- /dev/null +++ b/hpc_plugin/external_repositories/__init__.py @@ -0,0 +1,14 @@ +######## +# Copyright (c) 2017-2018 MSO4SC - javier.carnero@atos.net +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/hpc_plugin/external_repositories/ckan.py b/hpc_plugin/external_repositories/ckan.py new file mode 100644 index 0000000..d3afa1c --- /dev/null +++ b/hpc_plugin/external_repositories/ckan.py @@ -0,0 +1,22 @@ +from external_repository import ExternalRepository + + +class Ckan(ExternalRepository): + + def __init__(self, publish_item): + super().__init__(publish_item) + + self.entrypoint = publish_item['entrypoint'] + self.api_key = publish_item['api_key'] + self.dataset = publish_item['dataset'] + self.file_path = publish_item['file_path'] + + def _build_publish_call(self, logger): + # detach call?? + operation = "create" + # TODO: if resource file exists, operation="update" + call = "curl -H'Authorization: " + self.api_key + "' " + \ + "'" + self.entrypoint + "/api/action/resource_" + operation + "' " + \ + "--form upload=@ " + self.file_path + \ + "--form package_id=" + self.package_id + return call diff --git a/hpc_plugin/external_repositories/external_repository.py b/hpc_plugin/external_repositories/external_repository.py new file mode 100644 index 0000000..878af6f --- /dev/null +++ b/hpc_plugin/external_repositories/external_repository.py @@ -0,0 +1,47 @@ +from hpc_plugin.ssh import SshClient + + +class ExternalRepository(object): + + def factory(publish_item): + if publish_item['type'] == "CKAN": # TODO: manage key error + from ckan import Ckan + return Ckan(publish_item) + else: + return None + factory = staticmethod(factory) + + def __init__(self, publish_item): + self.er_type = publish_item['type'] # TODO: manage key error + + def publish(self, + ssh_client, + logger, + workdir=None): + """ + Publish the local file in the external repository + + @type ssh_client: SshClient + @param ssh_client: ssh client connected to an HPC login node + @rtype string + @return TODO: + """ + if not SshClient.check_ssh_client(ssh_client, logger): + return False + + call = self._build_publish_call(logger) + if call is None: + return False + + return ssh_client.execute_shell_command( + call, + workdir=workdir) + + def _build_publish_call(self, logger): + """ + Creates a script to publish the local file + + @rtype string + @return string with the publish call. None if an error arise. + """ + raise NotImplementedError("'_build_publish_call' not implemented.") diff --git a/hpc_plugin/ssh.py b/hpc_plugin/ssh.py index e881633..daaa9f4 100644 --- a/hpc_plugin/ssh.py +++ b/hpc_plugin/ssh.py @@ -89,6 +89,20 @@ def close_connection(self): if self._tunnel is not None: self._tunnel.close() + def execute_shell_command(self, + cmd, + workdir=None, + wait_result=False): + if not workdir: + return ssh_client.send_command(cmd, + wait_result=wait_result) + else: + call = "export CURRENT_WORKDIR=" + workdir + " && " + call += "cd " + workdir + " && " + call += cmd + return ssh_client.send_command(call, + wait_result=wait_result) + def send_command(self, command, exec_timeout=3000, @@ -178,6 +192,14 @@ def send_command(self, else: return False + @staticmethod + def check_ssh_client(ssh_client, + logger): + if not isinstance(ssh_client, SshClient) or not ssh_client.is_open(): + logger.error("SSH Client can't be used") + return False + return True + class SshForward(object): """Represents a ssh port forwarding""" diff --git a/hpc_plugin/tasks.py b/hpc_plugin/tasks.py index f26385c..cc1ff8f 100644 --- a/hpc_plugin/tasks.py +++ b/hpc_plugin/tasks.py @@ -21,6 +21,7 @@ from ssh import SshClient from workload_managers.workload_manager import WorkloadManager +from external_repositories.external_repository import ExternalRepository @operation @@ -76,6 +77,7 @@ def prepare_hpc(config, wait_result=True) if exit_code is not 0: + client.close_connection() raise NonRecoverableError( "failed to connect to HPC: exit code " + str(exit_code)) @@ -111,6 +113,7 @@ def cleanup_hpc(config, skip, simulate, **kwargs): # pylint: disable=W0613 wm_type = config['workload_manager'] wm = WorkloadManager.factory(wm_type) if not wm: + client.close_connection() raise NonRecoverableError( "Workload Manager '" + wm_type + @@ -355,6 +358,7 @@ def send_job(job_options, **kwargs): # pylint: disable=W0613 wm = WorkloadManager.factory(wm_type) if not wm: + client.close_connection() raise NonRecoverableError( "Workload Manager '" + wm_type + @@ -398,9 +402,10 @@ def cleanup_job(job_options, skip, **kwargs): # pylint: disable=W0613 client = SshClient(ctx.instance.runtime_properties['credentials']) - # TODO(emepetres): manage errors + # TODO: manage errors wm = WorkloadManager.factory(wm_type) if not wm: + client.close_connection() raise NonRecoverableError( "Workload Manager '" + wm_type + @@ -443,9 +448,10 @@ def stop_job(job_options, **kwargs): # pylint: disable=W0613 wm_type = ctx.instance.runtime_properties['workload_manager'] client = SshClient(ctx.instance.runtime_properties['credentials']) - # TODO(emepetres): manage errors + # TODO: manage errors wm = WorkloadManager.factory(wm_type) if not wm: + client.close_connection() raise NonRecoverableError( "Workload Manager '" + wm_type + @@ -473,3 +479,45 @@ def stop_job(job_options, **kwargs): # pylint: disable=W0613 except KeyError: # The job wasn't configured properly, no need to be stopped ctx.logger.warning('Job was not stopped as it was not configured.') + + +@operation +def publish(publish_options, **kwargs): + """ Publish the job outputs """ + try: + simulate = ctx.instance.runtime_properties['simulate'] + + name = kwargs['name'] + is_singularity = 'hpc.nodes.singularity_job' in ctx.node.\ + type_hierarchy + + if not simulate: + workdir = ctx.instance.runtime_properties['workdir'] + client = SshClient(ctx.instance.runtime_properties['credentials']) + + for publish_item in publish_options: + er = ExternalRepository.factory(publish_item) + if not er: + client.close_connection() + raise NonRecoverableError( + "External repository '" + + publish_item['type'] + # TODO: manage key error + "' not supported.") + + client.close_connection() + else: + ctx.logger.warning('Instance ' + ctx.instance.id + ' simulated') + is_stopped = True + + if is_stopped: + ctx.logger.info( + 'Job ' + name + ' (' + ctx.instance.id + ') stopped.') + else: + ctx.logger.error('Job ' + name + ' (' + ctx.instance.id + + ') not stopped.') + raise NonRecoverableError('Job ' + name + ' (' + ctx.instance.id + + ') not stopped.') + except KeyError: + # The job wasn't configured properly, no need to be stopped + ctx.logger.warning( + 'Job outputs where not published as it was not configured.') diff --git a/hpc_plugin/tests/blueprint/blueprint_sbatch_output.yaml b/hpc_plugin/tests/blueprint/blueprint_sbatch_output.yaml new file mode 100644 index 0000000..682cb79 --- /dev/null +++ b/hpc_plugin/tests/blueprint/blueprint_sbatch_output.yaml @@ -0,0 +1,100 @@ +######## +# Copyright (c) 2017 MSO4SC - javier.carnero@atos.net +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +tosca_definitions_version: cloudify_dsl_1_3 + +imports: + # to speed things up, it is possible downloading this file, + # - http://www.getcloudify.org/spec/cloudify/4.1/types.yaml + - http://raw.githubusercontent.com/mso4sc/cloudify-hpc-plugin/master/resources/types/cfy_types.yaml + # relative import of plugin.yaml that resides in the blueprint directory + - hpc_plugin/test_plugin.yaml + +inputs: + # Monitor + monitor_entrypoint: + description: Monitor entrypoint IP + default: "" + type: string + + # Job prefix name + job_prefix: + description: Job name prefix in HPCs + default: "cfyhpc" + type: string + + partition: + description: Partition in which the jobs will run + default: "public" + type: string + + mso4sc_hpc_primary: + description: Configuration for the primary HPC to be used + default: {} + + mso4sc_datacatalogue: + description: entrypoint of the data catalogue + default: "" + + mso4sc_publish_key: + description: API Key to publish the outputs + default: "" + + publish_dataset_id: + description: ID of the CKAN dataset + default: "" + + publish_outputs_path: + description: Local path to the outputs to be uploaded + default: "" + +node_templates: + first_hpc: + type: hpc.nodes.Compute + properties: + config: { get_input: mso4sc_hpc_primary } + external_monitor_entrypoint: { get_input: monitor_entrypoint } + job_prefix: { get_input: job_prefix } + workdir_prefix: "single_sbatch" + skip_cleanup: True + simulate: True # COMMENT to test against a real HPC + + single_job: + type: hpc.nodes.job + properties: + job_options: + type: 'SBATCH' + command: "touch.script single.test" + deployment: + bootstrap: 'scripts/bootstrap_sbatch_example.sh' + revert: 'scripts/revert_sbatch_example.sh' + inputs: + - 'single' + - { get_input: partition } + publish: + - type: "CKAN" + entrypoint: { get_input: mso4sc_datacatalogue} + api_key: { get_input: mso4sc_publish_key } + dataset: { get_input: publish_dataset_id } + file_path: { get_input: publish_outputs_path } + skip_cleanup: True + relationships: + - type: job_contained_in_hpc + target: first_hpc + +outputs: + single_job_name: + description: single job name in the HPC + value: { get_attribute: [single_job, job_name] } diff --git a/hpc_plugin/tests/blueprint/hpc_plugin/test_plugin.yaml b/hpc_plugin/tests/blueprint/hpc_plugin/test_plugin.yaml index 0745326..574415a 100644 --- a/hpc_plugin/tests/blueprint/hpc_plugin/test_plugin.yaml +++ b/hpc_plugin/tests/blueprint/hpc_plugin/test_plugin.yaml @@ -126,6 +126,9 @@ node_types: default: {} job_options: description: Job main command and options + publish: + description: Config to publish its outputs + default: [] skip_cleanup: description: True to not clean after execution (debug purposes) type: boolean @@ -154,6 +157,11 @@ node_types: inputs: job_options: default: { get_property: [SELF, job_options] } + publish: + implementation: hpc.hpc_plugin.tasks.publish + inputs: + publish_options: + default: { get_property: [SELF, publish] } cleanup: implementation: hpc.hpc_plugin.tasks.cleanup_job inputs: diff --git a/hpc_plugin/workflows.py b/hpc_plugin/workflows.py index b2b4bdb..2ae25f0 100644 --- a/hpc_plugin/workflows.py +++ b/hpc_plugin/workflows.py @@ -99,6 +99,22 @@ def queue(self): # print result.task.dump() return result.task + def publish(self): + """ Send the instance to the HPC queue if it is a Job """ + if not self.parent_node.is_job: + return + + self.winstance.send_event('Publishing HPC job..') + result = self.winstance.execute_operation('hpc.interfaces.' + 'lifecycle.publish', + kwargs={"name": self.name}) + # TODO: How to do it in non-blocking?? + result.task.wait_for_terminated() + if result.task.get_state() != tasks.TASK_FAILED: + self.winstance.send_event('..HPC job published') + + return result.task + def set_status(self, status): """ Update the instance state """ if not status == self._status: @@ -201,6 +217,17 @@ def queue_all_instances(self): self.status = 'QUEUED' return tasks_list + def publish(self): + """ Send all instances to the HPC queue if it represents a Job """ + if not self.is_job: + return [] + + tasks_list = [] + for job_instance in self.instances: + tasks_list.append(job_instance.publish()) + + return tasks_list + def is_ready(self): """ True if it has no more dependencies to satisfy """ return self.parent_depencencies_left == 0 diff --git a/hpc_plugin/workload_managers/workload_manager.py b/hpc_plugin/workload_managers/workload_manager.py index fbdd3f2..0a259e3 100644 --- a/hpc_plugin/workload_managers/workload_manager.py +++ b/hpc_plugin/workload_managers/workload_manager.py @@ -131,7 +131,7 @@ def submit_job(self, @rtype string @return Slurm's job name sent. None if an error arise. """ - if not self._checkSshClient(ssh_client, logger): + if not SshClient.check_ssh_client(ssh_client, logger): return False if is_singularity: @@ -176,8 +176,7 @@ def submit_job(self, # prepare the scale env variables if 'scale_env_mapping_call' in response: scale_env_mapping_call = response['scale_env_mapping_call'] - output, exit_code = self._execute_shell_command( - ssh_client, + output, exit_code = ssh_client.execute_shell_command( scale_env_mapping_call, workdir=workdir, wait_result=True) @@ -190,10 +189,10 @@ def submit_job(self, # submit the job call = response['call'] - output, exit_code = self._execute_shell_command(ssh_client, - call, - workdir=workdir, - wait_result=True) + output, exit_code = ssh_client.execute_shell_command( + call, + workdir=workdir, + wait_result=True) if exit_code is not 0: logger.error("Job submission '" + call + "' exited with code " + str(exit_code) + ":\n" + output) @@ -221,13 +220,13 @@ def clean_job_aux_files(self, @rtype string @return Slurm's job name stopped. None if an error arise. """ - if not self._checkSshClient(ssh_client, logger): + if not SshClient.check_ssh_client(ssh_client, logger): return False if is_singularity: - return self._execute_shell_command(ssh_client, - "rm " + name + ".script", - workdir=workdir) + return ssh_client.execute_shell_command( + "rm " + name + ".script", + workdir=workdir) return True def stop_job(self, @@ -251,7 +250,7 @@ def stop_job(self, @rtype string @return Slurm's job name stopped. None if an error arise. """ - if not self._checkSshClient(ssh_client, logger): + if not SshClient.check_ssh_client(ssh_client, logger): return False call = self._build_job_cancellation_call(name, @@ -260,9 +259,9 @@ def stop_job(self, if call is None: return False - return self._execute_shell_command(ssh_client, - call, - workdir=workdir) + return ssh_client.execute_shell_command( + call, + workdir=workdir) def create_new_workdir(self, ssh_client, base_dir, base_name): workdir = self._get_time_name(base_name) @@ -273,8 +272,7 @@ def create_new_workdir(self, ssh_client, base_dir, base_name): workdir = self._get_random_name(base_name) full_path = base_dir + "/" + workdir - if self._execute_shell_command( - ssh_client, + if ssh_client.execute_shell_command( "mkdir -p " + base_dir + "/" + workdir): return full_path else: @@ -348,14 +346,6 @@ def _build_job_cancellation_call(self, raise NotImplementedError( "'_build_job_cancellation_call' not implemented.") - def _checkSshClient(self, - ssh_client, - logger): - if not isinstance(ssh_client, SshClient) or not ssh_client.is_open(): - logger.error("SSH Client can't be used") - return False - return True - def _create_shell_script(self, ssh_client, name, @@ -371,10 +361,10 @@ def _create_shell_script(self, create_call = "echo \"" + script_data + "\" >> " + name + \ "; chmod +x " + name - _, exit_code = self._execute_shell_command(ssh_client, - create_call, - workdir=workdir, - wait_result=True) + _, exit_code = ssh_client.execute_shell_command( + create_call, + workdir=workdir, + wait_result=True) if exit_code is not 0: logger.error( "failed to create script: call '" + create_call + @@ -383,21 +373,6 @@ def _create_shell_script(self, return True - def _execute_shell_command(self, - ssh_client, - cmd, - workdir=None, - wait_result=False): - if not workdir: - return ssh_client.send_command(cmd, - wait_result=wait_result) - else: - call = "export CURRENT_WORKDIR=" + workdir + " && " - call += "cd " + workdir + " && " - call += cmd - return ssh_client.send_command(call, - wait_result=wait_result) - def _get_random_name(self, base_name): """ Get a random name with a prefix """ return base_name + '_' + self.__id_generator() @@ -413,9 +388,9 @@ def __id_generator(self, for _ in range(size)) def _exists_path(self, ssh_client, path): - _, exit_code = self._execute_shell_command(ssh_client, - '[ -d "' + path + '" ]', - wait_result=True) + _, exit_code = ssh_client.execute_shell_command( + '[ -d "' + path + '" ]', + wait_result=True) if exit_code == 0: return True diff --git a/plugin.yaml b/plugin.yaml index 4f0926e..0d1b2b6 100644 --- a/plugin.yaml +++ b/plugin.yaml @@ -10,7 +10,7 @@ plugins: # "plugins" directory. source: https://github.com/MSO4SC/cloudify-hpc-plugin/archive/canary.zip package_name: cloudify-hpc-plugin - package_version: '1.2' + package_version: '1.3' workflows: run_jobs: @@ -119,6 +119,9 @@ node_types: default: {} job_options: description: Job main command and options + publish: + description: Config to publish its outputs + default: [] skip_cleanup: description: True to not clean after execution (debug purposes) type: boolean @@ -147,6 +150,11 @@ node_types: inputs: job_options: default: { get_property: [SELF, job_options] } + publish: + implementation: hpc.hpc_plugin.tasks.publish + inputs: + publish_options: + default: { get_property: [SELF, publish] } cleanup: implementation: hpc.hpc_plugin.tasks.cleanup_job inputs: