Skip to content

Commit

Permalink
Merge pull request #76 from grycap/storage
Browse files Browse the repository at this point in the history
 Update OSCAR to use the new supervisor
  • Loading branch information
Alfonso Pérez authored Apr 1, 2019
2 parents ecd4e70 + 61222e8 commit 46552aa
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 102 deletions.
10 changes: 5 additions & 5 deletions examples/imagemagick/script.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

echo "SCRIPT: Invoked Image Grayifier. File available in $SCAR_INPUT_FILE"
FILE_NAME=`basename $SCAR_INPUT_FILE`
OUTPUT_FILE=$SCAR_OUTPUT_FOLDER/$FILE_NAME
echo "SCRIPT: Converting input image file $SCAR_INPUT_FILE to grayscale to output file $OUTPUT_FILE"
convert $SCAR_INPUT_FILE -type Grayscale $OUTPUT_FILE
echo "SCRIPT: Invoked Image Grayifier. File available in $INPUT_FILE_PATH"
FILE_NAME=`basename $INPUT_FILE_PATH`
OUTPUT_FILE=$STORAGE_OUTPUT_DIR/$FILE_NAME
echo "SCRIPT: Converting input image file $INPUT_FILE_PATH to grayscale to output file $OUTPUT_FILE"
convert $INPUT_FILE_PATH -type Grayscale $OUTPUT_FILE
12 changes: 7 additions & 5 deletions examples/video-process/yolov3-object-detection.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/bin/bash

FILENAME="`basename $SCAR_INPUT_FILE`"
RESULT="$SCAR_OUTPUT_FOLDER/$FILENAME.out"
OUTPUT_IMAGE="$SCAR_OUTPUT_FOLDER/$FILENAME"
FILENAME="`basename $INPUT_FILE_PATH`"
# Remove extension from filename
FILENAME=${FILENAME%.*}
RESULT="$STORAGE_OUTPUT_DIR/$FILENAME.out"
OUTPUT_IMAGE="$STORAGE_OUTPUT_DIR/$FILENAME"

echo "SCRIPT: Analyzing file '$SCAR_INPUT_FILE', saving the result in '$RESULT' and the output image in '$OUTPUT_IMAGE.png'"
echo "SCRIPT: Analyzing file '$INPUT_FILE_PATH', saving the result in '$RESULT' and the output image in '$OUTPUT_IMAGE'"

cd /opt/darknet
./darknet detect cfg/yolov3.cfg yolov3.weights $SCAR_INPUT_FILE -out $OUTPUT_IMAGE > $RESULT
./darknet detect cfg/yolov3.cfg yolov3.weights $INPUT_FILE_PATH -out $OUTPUT_IMAGE > $RESULT
67 changes: 67 additions & 0 deletions src/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (C) GRyCAP - I3M - UPV
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import os

loglevel = logging.INFO
if "LOG_LEVEL" in os.environ:
loglevel = os.environ["LOG_LEVEL"]
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=loglevel, format=FORMAT)
logger = logging.getLogger('oscar')

def debug(cli_msg, log_msg=None):
if loglevel == logging.DEBUG:
print(cli_msg)
logger.debug(log_msg) if log_msg else logger.debug(cli_msg)

def info(cli_msg=None, log_msg=None):
if cli_msg and loglevel == logging.INFO:
print(cli_msg)
logger.info(log_msg) if log_msg else logger.info(cli_msg)

def warning(cli_msg, log_msg=None):
print(cli_msg)
logger.warning(log_msg) if log_msg else logger.warning(cli_msg)

def error(cli_msg, log_msg=None):
if log_msg:
print(log_msg)
logger.error(log_msg)
else:
print(cli_msg)
logger.error(cli_msg)

def exception(msg):
logger.exception(msg)

def log_exception(error_msg, exception):
error(error_msg, error_msg + ": {0}".format(exception))

def print_json(value):
print(json.dumps(value))

def info_json(cli_msg, log_msg=None):
print_json(cli_msg)
logger.info(log_msg) if log_msg else logger.info(cli_msg)

def warning_json(cli_msg, log_msg=None):
print_json(cli_msg)
logger.warning(log_msg) if log_msg else logger.warning(cli_msg)

def error_json(cli_msg, log_msg=None):
print_json(cli_msg)
logger.error(log_msg) if log_msg else logger.error(cli_msg)
25 changes: 12 additions & 13 deletions src/providers/onpremises/clients/kaniko.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import src.utils as utils
import os
import stat
import logging

class KanikoClient():

Expand All @@ -29,7 +28,7 @@ def __init__(self, function_args):
self.root_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
self.job_name = "{0}-build-job".format(function_args['name'])

def copy_dockerfile(self):
def _copy_dockerfile(self):
# Get function Dockerfile paths
func_dockerfile_path = utils.join_paths(self.root_path, "src", "providers", "onpremises", "function_template", "Dockerfile")
func_dockerfile_dest_path = utils.join_paths(self.function_image_folder, "Dockerfile")
Expand All @@ -39,7 +38,7 @@ def copy_dockerfile(self):
for line in f_in:
f_out.write(line.replace("FROM ubuntu", "FROM {0}".format(self.function_args['image'])))

def download_binaries(self):
def _download_binaries(self):
# Download latest fwatchdog binary and set exec permissions
utils.download_github_asset('openfaas', 'faas', 'fwatchdog', self.function_image_folder)
fwatchdog_path = os.path.join(self.function_image_folder, 'fwatchdog')
Expand All @@ -52,24 +51,24 @@ def download_binaries(self):
supervisor_st = os.stat(supervisor_path)
os.chmod(supervisor_path, supervisor_st.st_mode | stat.S_IEXEC)

def copy_user_script(self):
def _copy_user_script(self):
utils.create_file_with_content(utils.join_paths(self.function_image_folder, "user_script.sh"),
utils.base64_to_utf8_string(self.function_args['script']))

def copy_required_files(self):
def _copy_required_files(self):
os.makedirs(self.function_image_folder , exist_ok=True)
# Get function Dockerfile paths
self.copy_dockerfile()
self._copy_dockerfile()
# Download required binaries
self.download_binaries()
self._download_binaries()
# Create user script
self.copy_user_script()
self._copy_user_script()

def delete_image_files(self):
def _delete_image_files(self):
# Delete all the temporal files created for the image creation
utils.delete_folder(self.function_image_folder)

def create_job_definition(self):
def _create_job_definition(self):
self.registry_image_id = "{0}/{1}".format(self.registry_name, self.function_args['name'])
job = {
'apiVersion': 'batch/v1',
Expand Down Expand Up @@ -118,12 +117,12 @@ def create_job_definition(self):

def create_and_push_docker_image(self, kubernetes_client):
# Copy/create function required files
self.copy_required_files()
self._copy_required_files()
# Build the docker image
job = self.create_job_definition()
job = self._create_job_definition()
# Send request to the k8s api
kubernetes_client.create_job(job, self.job_name, self.namespace)
# Wait until build finishes
kubernetes_client.wait_job(self.job_name, self.namespace, delete=True)
# Avoid storing unnecessary files
self.delete_image_files()
self._delete_image_files()
36 changes: 19 additions & 17 deletions src/providers/onpremises/clients/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

class MinioClient():

def __init__(self, function_args):
def __init__(self, function_args, minio_id):
self.function_name = function_args['name']
if 'envVars' in function_args and 'OUTPUT_BUCKET' in function_args['envVars']:
self.output_bucket = function_args['envVars']['OUTPUT_BUCKET']
if minio_id and 'envVars' in function_args and 'STORAGE_PATH_OUTPUT_'.format(minio_id) in function_args['envVars']:
self.output_bucket = function_args['envVars']['STORAGE_PATH_OUTPUT_'.format(minio_id)]
self.access_key = utils.get_environment_variable("MINIO_USER")
self.secret_key = utils.get_environment_variable("MINIO_PASS")
self.client = minio.Minio(utils.get_environment_variable("MINIO_ENDPOINT"),
Expand All @@ -30,60 +30,62 @@ def __init__(self, function_args):
secure=False)

def create_input_bucket(self):
self.create_bucket('{0}-in'.format(self.function_name))
self.set_bucket_event_notification('{0}-in'.format(self.function_name))
self._create_bucket('{0}-in'.format(self.function_name))
self._set_bucket_event_notification('{0}-in'.format(self.function_name))

def create_output_bucket(self):
if not hasattr(self, 'output_bucket'):
self.create_bucket('{0}-out'.format(self.function_name))
self._create_bucket('{0}-out'.format(self.function_name))

def create_bucket(self, bucket_name):
def _create_bucket(self, bucket_name):
try:
self.client.make_bucket(bucket_name)
except minio.error.BucketAlreadyOwnedByYou as err:
print(err)
except minio.error.ResponseError as err:
print(err)

def set_bucket_event_notification(self, bucket_name):
def _set_bucket_event_notification(self, bucket_name):
try:
notification = {'QueueConfigurations': [
{'Arn': 'arn:minio:sqs::1:webhook',
'Events': ['s3:ObjectCreated:*']
}
'Events': ['s3:ObjectCreated:*']}
]}
self.client.set_bucket_notification(bucket_name, notification)
except minio.error.ResponseError as err:
print(err)

def delete_input_bucket(self):
self.delete_bucket_event_notification('{0}-in'.format(self.function_name))
self.delete_bucket('{0}-in'.format(self.function_name))
self._delete_bucket_event_notification('{0}-in'.format(self.function_name))
self._delete_bucket('{0}-in'.format(self.function_name))

def delete_output_bucket(self):
self.delete_bucket('{0}-out'.format(self.function_name))
self._delete_bucket('{0}-out'.format(self.function_name))

def delete_bucket_files(self, bucket_name):
def _delete_bucket_files(self, bucket_name):
try:
for file in self.client.list_objects_v2(bucket_name):
self.client.remove_object(bucket_name, file.object_name)
except minio.error.ResponseError as err:
print(err)

def delete_bucket(self, bucket_name):
def _delete_bucket(self, bucket_name):
try:
self.delete_bucket_files(bucket_name)
self._delete_bucket_files(bucket_name)
self.client.remove_bucket(bucket_name)
except minio.error.ResponseError as err:
print(err)

def delete_bucket_event_notification(self, bucket_name):
def _delete_bucket_event_notification(self, bucket_name):
try:
notification = {'QueueConfigurations': []}
self.client.set_bucket_notification(bucket_name, notification)
except minio.error.ResponseError as err:
print(err)

def get_input_bucket_name(self):
return self.output_bucket if hasattr(self, 'input_bucket') else '{0}-in'.format(self.function_name)

def get_output_bucket_name(self):
return self.output_bucket if hasattr(self, 'output_bucket') else '{0}-out'.format(self.function_name)

Expand Down
20 changes: 10 additions & 10 deletions src/providers/onpremises/clients/onedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.

import src.utils as utils
import os
import logging
import requests

Expand All @@ -25,14 +24,15 @@ class OnedataClient():
cdmi_version_header = {'X-CDMI-Specification-Version': '1.1.1'}
cdmi_container_header = {'Content-Type': 'application/cdmi-container'}

def __init__(self, function_args):
def __init__(self, function_args, onedata_id):
self.function_name = function_args['name']
self.onedata_id = onedata_id
self.endpoint = utils.get_environment_variable("OPENFAAS_ENDPOINT")
if 'envVars' in function_args and 'OUTPUT_BUCKET' in function_args['envVars']:
self.output_bucket = function_args['envVars']['OUTPUT_BUCKET'].strip('/ ')
self.oneprovider_host = function_args['envVars']['ONEPROVIDER_HOST']
self.onedata_access_token = function_args['envVars']['ONEDATA_ACCESS_TOKEN']
self.onedata_space = function_args['envVars']['ONEDATA_SPACE'].strip('/ ')
if 'envVars' in function_args and 'STORAGE_PATH_OUTPUT_'.format(onedata_id) in function_args['envVars']:
self.output_bucket = function_args['envVars']['STORAGE_PATH_OUTPUT_'.format(onedata_id)].strip('/ ')
self.oneprovider_host = function_args['envVars']['STORAGE_AUTH_ONEDATA_{}_HOST'.format(onedata_id)]
self.onedata_access_token = function_args['envVars']['STORAGE_AUTH_ONEDATA_{}_TOKEN'.format(onedata_id)]
self.onedata_space = function_args['envVars']['STORAGE_AUTH_ONEDATA_{}_SPACE'.format(onedata_id)].strip('/ ')

@utils.lazy_property
def onedata_auth_header(self):
Expand Down Expand Up @@ -75,13 +75,13 @@ def folder_exists(self, folder_name):
return False

def create_input_folder(self):
self.create_folder('{0}-in'.format(self.function_name))
self._create_folder('{0}-in'.format(self.function_name))

def create_output_folder(self):
if not hasattr(self, 'output_bucket'):
self.create_folder('{0}-out'.format(self.function_name))
self._create_folder('{0}-out'.format(self.function_name))

def create_folder(self, folder_name):
def _create_folder(self, folder_name):
url = 'https://{0}{1}{2}/{3}/'.format(self.oneprovider_host, self.cdmi_path, self.onedata_space, folder_name)
headers = {**self.cdmi_version_header, **self.cdmi_container_header, **self.onedata_auth_header}
try:
Expand Down
Loading

0 comments on commit 46552aa

Please sign in to comment.