diff --git a/examples/imagemagick/script.sh b/examples/imagemagick/script.sh index 0c7654af..c4764e36 100644 --- a/examples/imagemagick/script.sh +++ b/examples/imagemagick/script.sh @@ -1,7 +1,7 @@ #!/bin/bash -echo "SCRIPT: Invoked Image Grayifier. File available in $SCAR_INPUT_FILE" -FILE_NAME=`basename $SCAR_INPUT_FILE` -OUTPUT_FILE=$SCAR_OUTPUT_FOLDER/$FILE_NAME -echo "SCRIPT: Converting input image file $SCAR_INPUT_FILE to grayscale to output file $OUTPUT_FILE" -convert $SCAR_INPUT_FILE -type Grayscale $OUTPUT_FILE \ No newline at end of file +echo "SCRIPT: Invoked Image Grayifier. File available in $INPUT_FILE_PATH" +FILE_NAME=`basename $INPUT_FILE_PATH` +OUTPUT_FILE=$STORAGE_OUTPUT_DIR/$FILE_NAME +echo "SCRIPT: Converting input image file $INPUT_FILE_PATH to grayscale to output file $OUTPUT_FILE" +convert $INPUT_FILE_PATH -type Grayscale $OUTPUT_FILE diff --git a/examples/video-process/yolov3-object-detection.sh b/examples/video-process/yolov3-object-detection.sh index 9d7deacc..34cfd9a9 100755 --- a/examples/video-process/yolov3-object-detection.sh +++ b/examples/video-process/yolov3-object-detection.sh @@ -1,10 +1,12 @@ #!/bin/bash -FILENAME="`basename $SCAR_INPUT_FILE`" -RESULT="$SCAR_OUTPUT_FOLDER/$FILENAME.out" -OUTPUT_IMAGE="$SCAR_OUTPUT_FOLDER/$FILENAME" +FILENAME="`basename $INPUT_FILE_PATH`" +# Remove extension from filename +FILENAME=${FILENAME%.*} +RESULT="$STORAGE_OUTPUT_DIR/$FILENAME.out" +OUTPUT_IMAGE="$STORAGE_OUTPUT_DIR/$FILENAME" -echo "SCRIPT: Analyzing file '$SCAR_INPUT_FILE', saving the result in '$RESULT' and the output image in '$OUTPUT_IMAGE.png'" +echo "SCRIPT: Analyzing file '$INPUT_FILE_PATH', saving the result in '$RESULT' and the output image in '$OUTPUT_IMAGE'" cd /opt/darknet -./darknet detect cfg/yolov3.cfg yolov3.weights $SCAR_INPUT_FILE -out $OUTPUT_IMAGE > $RESULT \ No newline at end of file +./darknet detect cfg/yolov3.cfg yolov3.weights $INPUT_FILE_PATH -out $OUTPUT_IMAGE > $RESULT diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 00000000..7c212541 --- /dev/null +++ b/src/logger.py @@ -0,0 +1,67 @@ +# Copyright (C) GRyCAP - I3M - UPV +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os + +loglevel = logging.INFO +if "LOG_LEVEL" in os.environ: + loglevel = os.environ["LOG_LEVEL"] +FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' +logging.basicConfig(level=loglevel, format=FORMAT) +logger = logging.getLogger('oscar') + +def debug(cli_msg, log_msg=None): + if loglevel == logging.DEBUG: + print(cli_msg) + logger.debug(log_msg) if log_msg else logger.debug(cli_msg) + +def info(cli_msg=None, log_msg=None): + if cli_msg and loglevel == logging.INFO: + print(cli_msg) + logger.info(log_msg) if log_msg else logger.info(cli_msg) + +def warning(cli_msg, log_msg=None): + print(cli_msg) + logger.warning(log_msg) if log_msg else logger.warning(cli_msg) + +def error(cli_msg, log_msg=None): + if log_msg: + print(log_msg) + logger.error(log_msg) + else: + print(cli_msg) + logger.error(cli_msg) + +def exception(msg): + logger.exception(msg) + +def log_exception(error_msg, exception): + error(error_msg, error_msg + ": {0}".format(exception)) + +def print_json(value): + print(json.dumps(value)) + +def info_json(cli_msg, log_msg=None): + print_json(cli_msg) + logger.info(log_msg) if log_msg else logger.info(cli_msg) + +def warning_json(cli_msg, log_msg=None): + print_json(cli_msg) + logger.warning(log_msg) if log_msg else logger.warning(cli_msg) + +def error_json(cli_msg, log_msg=None): + print_json(cli_msg) + logger.error(log_msg) if log_msg else logger.error(cli_msg) diff --git a/src/providers/onpremises/clients/kaniko.py b/src/providers/onpremises/clients/kaniko.py index eeef46df..00d34bcc 100644 --- a/src/providers/onpremises/clients/kaniko.py +++ b/src/providers/onpremises/clients/kaniko.py @@ -16,7 +16,6 @@ import src.utils as utils import os import stat -import logging class KanikoClient(): @@ -29,7 +28,7 @@ def __init__(self, function_args): self.root_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) self.job_name = "{0}-build-job".format(function_args['name']) - def copy_dockerfile(self): + def _copy_dockerfile(self): # Get function Dockerfile paths func_dockerfile_path = utils.join_paths(self.root_path, "src", "providers", "onpremises", "function_template", "Dockerfile") func_dockerfile_dest_path = utils.join_paths(self.function_image_folder, "Dockerfile") @@ -39,7 +38,7 @@ def copy_dockerfile(self): for line in f_in: f_out.write(line.replace("FROM ubuntu", "FROM {0}".format(self.function_args['image']))) - def download_binaries(self): + def _download_binaries(self): # Download latest fwatchdog binary and set exec permissions utils.download_github_asset('openfaas', 'faas', 'fwatchdog', self.function_image_folder) fwatchdog_path = os.path.join(self.function_image_folder, 'fwatchdog') @@ -52,24 +51,24 @@ def download_binaries(self): supervisor_st = os.stat(supervisor_path) os.chmod(supervisor_path, supervisor_st.st_mode | stat.S_IEXEC) - def copy_user_script(self): + def _copy_user_script(self): utils.create_file_with_content(utils.join_paths(self.function_image_folder, "user_script.sh"), utils.base64_to_utf8_string(self.function_args['script'])) - def copy_required_files(self): + def _copy_required_files(self): os.makedirs(self.function_image_folder , exist_ok=True) # Get function Dockerfile paths - self.copy_dockerfile() + self._copy_dockerfile() # Download required binaries - self.download_binaries() + self._download_binaries() # Create user script - self.copy_user_script() + self._copy_user_script() - def delete_image_files(self): + def _delete_image_files(self): # Delete all the temporal files created for the image creation utils.delete_folder(self.function_image_folder) - def create_job_definition(self): + def _create_job_definition(self): self.registry_image_id = "{0}/{1}".format(self.registry_name, self.function_args['name']) job = { 'apiVersion': 'batch/v1', @@ -118,12 +117,12 @@ def create_job_definition(self): def create_and_push_docker_image(self, kubernetes_client): # Copy/create function required files - self.copy_required_files() + self._copy_required_files() # Build the docker image - job = self.create_job_definition() + job = self._create_job_definition() # Send request to the k8s api kubernetes_client.create_job(job, self.job_name, self.namespace) # Wait until build finishes kubernetes_client.wait_job(self.job_name, self.namespace, delete=True) # Avoid storing unnecessary files - self.delete_image_files() + self._delete_image_files() diff --git a/src/providers/onpremises/clients/minio.py b/src/providers/onpremises/clients/minio.py index 03f7bd0a..184d9144 100644 --- a/src/providers/onpremises/clients/minio.py +++ b/src/providers/onpremises/clients/minio.py @@ -18,10 +18,10 @@ class MinioClient(): - def __init__(self, function_args): + def __init__(self, function_args, minio_id): self.function_name = function_args['name'] - if 'envVars' in function_args and 'OUTPUT_BUCKET' in function_args['envVars']: - self.output_bucket = function_args['envVars']['OUTPUT_BUCKET'] + if minio_id and 'envVars' in function_args and 'STORAGE_PATH_OUTPUT_'.format(minio_id) in function_args['envVars']: + self.output_bucket = function_args['envVars']['STORAGE_PATH_OUTPUT_'.format(minio_id)] self.access_key = utils.get_environment_variable("MINIO_USER") self.secret_key = utils.get_environment_variable("MINIO_PASS") self.client = minio.Minio(utils.get_environment_variable("MINIO_ENDPOINT"), @@ -30,14 +30,14 @@ def __init__(self, function_args): secure=False) def create_input_bucket(self): - self.create_bucket('{0}-in'.format(self.function_name)) - self.set_bucket_event_notification('{0}-in'.format(self.function_name)) + self._create_bucket('{0}-in'.format(self.function_name)) + self._set_bucket_event_notification('{0}-in'.format(self.function_name)) def create_output_bucket(self): if not hasattr(self, 'output_bucket'): - self.create_bucket('{0}-out'.format(self.function_name)) + self._create_bucket('{0}-out'.format(self.function_name)) - def create_bucket(self, bucket_name): + def _create_bucket(self, bucket_name): try: self.client.make_bucket(bucket_name) except minio.error.BucketAlreadyOwnedByYou as err: @@ -45,45 +45,47 @@ def create_bucket(self, bucket_name): except minio.error.ResponseError as err: print(err) - def set_bucket_event_notification(self, bucket_name): + def _set_bucket_event_notification(self, bucket_name): try: notification = {'QueueConfigurations': [ {'Arn': 'arn:minio:sqs::1:webhook', - 'Events': ['s3:ObjectCreated:*'] - } + 'Events': ['s3:ObjectCreated:*']} ]} self.client.set_bucket_notification(bucket_name, notification) except minio.error.ResponseError as err: print(err) def delete_input_bucket(self): - self.delete_bucket_event_notification('{0}-in'.format(self.function_name)) - self.delete_bucket('{0}-in'.format(self.function_name)) + self._delete_bucket_event_notification('{0}-in'.format(self.function_name)) + self._delete_bucket('{0}-in'.format(self.function_name)) def delete_output_bucket(self): - self.delete_bucket('{0}-out'.format(self.function_name)) + self._delete_bucket('{0}-out'.format(self.function_name)) - def delete_bucket_files(self, bucket_name): + def _delete_bucket_files(self, bucket_name): try: for file in self.client.list_objects_v2(bucket_name): self.client.remove_object(bucket_name, file.object_name) except minio.error.ResponseError as err: print(err) - def delete_bucket(self, bucket_name): + def _delete_bucket(self, bucket_name): try: - self.delete_bucket_files(bucket_name) + self._delete_bucket_files(bucket_name) self.client.remove_bucket(bucket_name) except minio.error.ResponseError as err: print(err) - def delete_bucket_event_notification(self, bucket_name): + def _delete_bucket_event_notification(self, bucket_name): try: notification = {'QueueConfigurations': []} self.client.set_bucket_notification(bucket_name, notification) except minio.error.ResponseError as err: print(err) + def get_input_bucket_name(self): + return self.output_bucket if hasattr(self, 'input_bucket') else '{0}-in'.format(self.function_name) + def get_output_bucket_name(self): return self.output_bucket if hasattr(self, 'output_bucket') else '{0}-out'.format(self.function_name) diff --git a/src/providers/onpremises/clients/onedata.py b/src/providers/onpremises/clients/onedata.py index 19de4e14..887a7a0c 100644 --- a/src/providers/onpremises/clients/onedata.py +++ b/src/providers/onpremises/clients/onedata.py @@ -14,7 +14,6 @@ # limitations under the License. import src.utils as utils -import os import logging import requests @@ -25,14 +24,15 @@ class OnedataClient(): cdmi_version_header = {'X-CDMI-Specification-Version': '1.1.1'} cdmi_container_header = {'Content-Type': 'application/cdmi-container'} - def __init__(self, function_args): + def __init__(self, function_args, onedata_id): self.function_name = function_args['name'] + self.onedata_id = onedata_id self.endpoint = utils.get_environment_variable("OPENFAAS_ENDPOINT") - if 'envVars' in function_args and 'OUTPUT_BUCKET' in function_args['envVars']: - self.output_bucket = function_args['envVars']['OUTPUT_BUCKET'].strip('/ ') - self.oneprovider_host = function_args['envVars']['ONEPROVIDER_HOST'] - self.onedata_access_token = function_args['envVars']['ONEDATA_ACCESS_TOKEN'] - self.onedata_space = function_args['envVars']['ONEDATA_SPACE'].strip('/ ') + if 'envVars' in function_args and 'STORAGE_PATH_OUTPUT_'.format(onedata_id) in function_args['envVars']: + self.output_bucket = function_args['envVars']['STORAGE_PATH_OUTPUT_'.format(onedata_id)].strip('/ ') + self.oneprovider_host = function_args['envVars']['STORAGE_AUTH_ONEDATA_{}_HOST'.format(onedata_id)] + self.onedata_access_token = function_args['envVars']['STORAGE_AUTH_ONEDATA_{}_TOKEN'.format(onedata_id)] + self.onedata_space = function_args['envVars']['STORAGE_AUTH_ONEDATA_{}_SPACE'.format(onedata_id)].strip('/ ') @utils.lazy_property def onedata_auth_header(self): @@ -75,13 +75,13 @@ def folder_exists(self, folder_name): return False def create_input_folder(self): - self.create_folder('{0}-in'.format(self.function_name)) + self._create_folder('{0}-in'.format(self.function_name)) def create_output_folder(self): if not hasattr(self, 'output_bucket'): - self.create_folder('{0}-out'.format(self.function_name)) + self._create_folder('{0}-out'.format(self.function_name)) - def create_folder(self, folder_name): + def _create_folder(self, folder_name): url = 'https://{0}{1}{2}/{3}/'.format(self.oneprovider_host, self.cdmi_path, self.onedata_space, folder_name) headers = {**self.cdmi_version_header, **self.cdmi_container_header, **self.onedata_auth_header} try: diff --git a/src/providers/onpremises/controller.py b/src/providers/onpremises/controller.py index 1298b5f9..6178a176 100644 --- a/src/providers/onpremises/controller.py +++ b/src/providers/onpremises/controller.py @@ -13,21 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from src.cmdtemplate import Commands -import src.utils as utils from flask import Response +from src.cmdtemplate import Commands from src.providers.onpremises.clients.kaniko import KanikoClient +from src.providers.onpremises.clients.kubernetes import KubernetesClient from src.providers.onpremises.clients.minio import MinioClient from src.providers.onpremises.clients.onedata import OnedataClient from src.providers.onpremises.clients.openfaas import OpenFaasClient -from src.providers.onpremises.clients.kubernetes import KubernetesClient from threading import Thread import json -import logging - -loglevel = logging.DEBUG -FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' -logging.basicConfig(format=FORMAT, level=loglevel) +import random +import src.logger as logger +import src.utils as utils class CustomResponse(): def __init__(self, content=None, status_code=None, headers=None): @@ -47,50 +44,50 @@ class OnPremises(Commands): @utils.lazy_property def openfaas(self): - logging.debug("Initializing OpenFaas client") + logger.debug("Initializing OpenFaas client") openfaas = OpenFaasClient(self.function_args) return openfaas @utils.lazy_property def minio(self): - logging.debug("Initializing Minio client") - minio = MinioClient(self.function_args) + logger.debug("Initializing Minio client") + minio = MinioClient(self.function_args, self.minio_id) return minio @utils.lazy_property def onedata(self): - logging.debug("Initializing Onedata client") - onedata = OnedataClient(self.function_args) + logger.debug("Initializing Onedata client") + onedata = OnedataClient(self.function_args, self.onedata_id) return onedata @utils.lazy_property def kaniko(self): - logging.debug("Initializing Kaniko client") + logger.debug("Initializing Kaniko client") kaniko = KanikoClient(self.function_args) return kaniko @utils.lazy_property def kubernetes(self): - logging.debug("Initializing Kubernetes client") + logger.debug("Initializing Kubernetes client") kubernetes = KubernetesClient() return kubernetes def __init__(self, function_args=None): if function_args: - logging.debug("Function creation arguments received: {}".format(function_args)) + logger.debug("Function creation arguments received: {}".format(function_args)) self.function_args = function_args if function_args else {} self.get_function_environment_variables() def init(self): function_exists, response = self.openfaas.is_function_created() if function_exists: - logging.info("Function with name '{}' found".format(self.function_args['name'])) + logger.info("Function with name '{}' found".format(self.function_args['name'])) kwargs = {'response' : response.content, 'status' : str(response.status_code), 'headers' : response.headers.items()} return Response(**kwargs) else: - logging.info("Initialize asynchronous function creation") + logger.info("Initialize asynchronous function creation") # Start initializing the function init_t = Thread(target=self.asynch_init) init_t.start() @@ -100,20 +97,22 @@ def init(self): def asynch_init(self): # Create docker image - logging.info("Creating docker image with kaniko") + logger.info("Creating docker image with kaniko") self.kaniko.create_and_push_docker_image(self.kubernetes) - self.set_docker_variables() + # Override the function image name with the new image_id + self.function_args["image"] = self.kaniko.registry_image_id # Create minio buckets - logging.info("Creating minio buckets") - self.create_minio_buckets() - if self.is_onedata_defined(): - logging.info('Creating Onedata folders') - self.create_onedata_folders() - logging.info('Creating OneTrigger deployment') + logger.info("Creating minio buckets") + self.minio_id = self._get_storage_provider_id('MINIO') + self._create_minio_buckets() + if self._is_onedata_defined(): + logger.info('Creating Onedata folders') + self._create_onedata_folders() + logger.info('Creating OneTrigger deployment') self.onedata.deploy_onetrigger(self.kubernetes) - self.set_minio_variables() + self._set_minio_variables() # Create openfaas function - logging.info("Creating OpenFaas function") + logger.info("Creating OpenFaas function") self._parse_output(self.openfaas.create_function(self.function_args)) @flask_response @@ -124,12 +123,12 @@ def process_minio_event(self, minio_event): @flask_response def ls(self): - logging.info("Retrieving functions information") + logger.info("Retrieving functions information") return self.openfaas.get_functions_info() @flask_response def invoke(self, body, asynch=True): - logging.info("Invoking '{}' function".format(self.function_args['name'])) + logger.info("Invoking '{}' function".format(self.function_args['name'])) return self.openfaas.invoke_function(body, asynch) def run(self): @@ -137,7 +136,7 @@ def run(self): @flask_response def update(self): - logging.info("Update functionality not implemented yet") + logger.info("Update functionality not implemented yet") # Service not implemented (yet) return CustomResponse(content='Update functionality not implemented', status_code=501) @@ -145,18 +144,18 @@ def update(self): def rm(self): # Delete minio buckets (if selected) if 'deleteBuckets' in self.function_args and self.function_args['deleteBuckets']: - logging.info("Deleting Minio buckets") + logger.info("Deleting Minio buckets") self.minio.delete_input_bucket() self.minio.delete_output_bucket() # Delete Onetrigger deployment and Onedata folders (if selected) - if self.is_onedata_defined(): - logging.info("Deleting OneTrigger deployment") + if self._is_onedata_defined(): + logger.info("Deleting OneTrigger deployment") self.onedata.delete_onetrigger_deploy(self.kubernetes) if 'deleteBuckets' in self.function_args and self.function_args['deleteBuckets']: - logging.info("Deleting Onedata folders") + logger.info("Deleting Onedata folders") self.onedata.delete_input_folder() self.onedata.delete_output_folder() - logging.info("Deleting OpenFaas function") + logger.info("Deleting OpenFaas function") return self.openfaas.delete_function() def log(self): @@ -191,34 +190,57 @@ def add_function_annotation(self, key, value): else: self.function_args["annotations"] = { key: value } - def set_docker_variables(self): - # Override the function image name - self.function_args["image"] = self.kaniko.registry_image_id - - def create_minio_buckets(self): + def _create_minio_buckets(self): self.minio.create_input_bucket() self.minio.create_output_bucket() - def is_onedata_defined(self): + def _is_onedata_defined(self): if 'envVars' in self.function_args: - if 'ONEPROVIDER_HOST' in self.function_args['envVars'] and \ - 'ONEDATA_ACCESS_TOKEN' in self.function_args['envVars'] and \ - 'ONEDATA_SPACE' in self.function_args['envVars']: + self.onedata_id = self._get_storage_provider_id('ONEDATA') + if self.onedata_id and 'STORAGE_AUTH_ONEDATA_{}_HOST'.format(self.onedata_id) in self.function_args['envVars'] and \ + 'STORAGE_AUTH_ONEDATA_{}_TOKEN'.format(self.onedata_id) in self.function_args['envVars'] and \ + 'STORAGE_AUTH_ONEDATA_{}_SPACE'.format(self.onedata_id) in self.function_args['envVars']: return self.onedata.check_connection() return False - def create_onedata_folders(self): + def _get_storage_provider_id(self, storage_provider): + ''' + Reads the global variables to get the provider's id. + Variable schema: STORAGE_AUTH_$1_$2_$3 + $1: MINIO | S3 | ONEDATA + $2: STORAGE_ID (Specified in the function definition file, is unique for each storage defined) + $3: USER | PASS | TOKEN | SPACE | HOST + + e.g.: STORAGE_AUTH_MINIO_12345_USER + ''' + for envvar in self.function_args['envVars']: + if envvar.startswith('STORAGE_AUTH_{}_'.format(storage_provider)): + ''' + The provider_id can be composed by several fields but it's always between the position [3:-1] + e.g.: + - "STORAGE_AUTH_MINIO_123_456_USER" -> ['STORAGE', 'AUTH', 'MINIO', '123', '456', 'USER'] + - "STORAGE_AUTH_MINIO_123-456_USER" -> ['STORAGE', 'AUTH', 'MINIO', '123-456', 'USER'] + ''' + return "_".join(envvar.split("_")[3:-1]) + + def _create_onedata_folders(self): self.onedata.create_input_folder() self.onedata.create_output_folder() + self._set_io_folder_variables(self._get_storage_provider_id('ONEDATA')) + + def _set_minio_variables(self): + provider_id = random.randint(1,1000001) + self.add_function_environment_variable("STORAGE_AUTH_MINIO_{}_USER".format(provider_id), self.minio.get_access_key()) + self.add_function_environment_variable("STORAGE_AUTH_MINIO_{}_PASS".format(provider_id), self.minio.get_secret_key()) + self._set_io_folder_variables(provider_id) - def set_minio_variables(self): - self.add_function_environment_variable("AWS_ACCESS_KEY_ID", self.minio.get_access_key()) - self.add_function_environment_variable("AWS_SECRET_ACCESS_KEY", self.minio.get_secret_key()) - self.add_function_environment_variable("OUTPUT_BUCKET", self.minio.get_output_bucket_name()) + def _set_io_folder_variables(self, provider_id): + self.add_function_environment_variable("STORAGE_PATH_INPUT_{}".format(provider_id), self.minio.get_input_bucket_name()) + self.add_function_environment_variable("STORAGE_PATH_OUTPUT_{}".format(provider_id), self.minio.get_output_bucket_name()) def _parse_output(self, response): if response: if response.status_code == 200: - logging.info("Request petition successful") + logger.info("Request petition successful") else: - logging.info("Request call returned code '{0}': {1}".format(response.status_code, response.text)) + logger.info("Request call returned code '{0}': {1}".format(response.status_code, response.text))