Skip to content

Commit

Permalink
Merge pull request #117 from grycap/devel
Browse files Browse the repository at this point in the history
Devel
  • Loading branch information
srisco authored Jul 12, 2021
2 parents 6988a11 + b88c3ee commit 0cbc948
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 19 deletions.
14 changes: 14 additions & 0 deletions examples/imagemagick/imagemagick.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
functions:
oscar:
- oscar-cluster:
name: grayify
memory: 1Gi
cpu: '1.0'
image: grycap/imagemagick
script: grayify.sh
input:
- storage_provider: minio
path: grayify/in
output:
- storage_provider: minio
path: grayify/out
9 changes: 9 additions & 0 deletions examples/mask-detector-workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Mask detector workflow

This example is based on the [SCAR's Mask detector hybrid workflow](https://github.com/grycap/scar/tree/master/examples/mask-detector-workflow), but, in this case, both services will be executed on top of an OSCAR cluster.

The easy way to deploy the services is via [oscar-cli](https://github.com/grycap/oscar-cli), but [you can also use the UI](https://grycap.github.io/oscar/usage.html).

![oscar-cli-apply.gif](img/oscar-cli-apply.gif)

To trigger the workflow you only have to upload a video into the `input` folder of the `mask-detector` MinIO bucket, result images will be stored into the `result` folder of the same bucket when the workflow execution has been completed.
16 changes: 16 additions & 0 deletions examples/mask-detector-workflow/blurry-faces.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

VIDEO_NAME=`basename "$INPUT_FILE_PATH"`
SUBFOLDER_NAME=`echo "$VIDEO_NAME" | cut -f 1 -d '.'`
OUTPUT_SUBFOLDER="$TMP_OUTPUT_DIR/$SUBFOLDER_NAME"

mkdir "$OUTPUT_SUBFOLDER"

echo "SCRIPT: Analyzing file '$INPUT_FILE_PATH', saving the output images in '$OUTPUT_SUBFOLDER'"

ffmpeg -i "$INPUT_FILE_PATH" -vf fps=12/60 "$OUTPUT_SUBFOLDER/img%d.jpg"

for IMAGE in "$OUTPUT_SUBFOLDER"/*
do
python auto_blur_image.py -i "$IMAGE" -o "$IMAGE"
done
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
26 changes: 26 additions & 0 deletions examples/mask-detector-workflow/mask-detector-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
functions:
oscar:
- my_oscar:
name: anon-and-split
memory: 2Gi
cpu: '1.0'
image: grycap/blurry-faces
script: blurry-faces.sh
input:
- storage_provider: minio
path: mask-detector/input
output:
- storage_provider: minio
path: mask-detector/intermediate
- my_oscar:
name: mask-detector
memory: 1Gi
cpu: '1.0'
script: mask-detector.sh
image: grycap/mask-detector-yolo:mini
input:
- storage_provider: minio
path: mask-detector/intermediate
output:
- storage_provider: minio
path: mask-detector/result
13 changes: 13 additions & 0 deletions examples/mask-detector-workflow/mask-detector.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

SUBFOLDER_NAME=`basename "$(dirname "$STORAGE_OBJECT_KEY")"`

mkdir "$TMP_OUTPUT_DIR/$SUBFOLDER_NAME"

IMAGE_NAME=`basename "$INPUT_FILE_PATH"`
OUTPUT_IMAGE="$TMP_OUTPUT_DIR/$SUBFOLDER_NAME/$IMAGE_NAME"

echo "SCRIPT: Analyzing file '$INPUT_FILE_PATH', saving the output image in '$OUTPUT_IMAGE'"

python mask-detector-image.py --image "$INPUT_FILE_PATH" --output "$OUTPUT_IMAGE"

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

functions:
oscar:
- oscar-cluster:
name: plants
memory: 1Gi
cpu: '1.0'
image: grycap/oscar-theano-plants
script: plants.sh
input:
- storage_provider: minio
path: plants/in
output:
- storage_provider: minio
path: plants/out
14 changes: 14 additions & 0 deletions examples/radiomics/radiomics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
functions:
oscar:
- oscar-cluster:
name: radiomics
memory: 1Gi
cpu: '1.0'
image: grycap/oscar-radiomics
script: user-script.sh
input:
- storage_provider: minio
path: radiomics/in
output:
- storage_provider: minio
path: radiomics/out
26 changes: 26 additions & 0 deletions examples/video-process/video-process.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
functions:
oscar:
- oscar-cluster:
name: split-video
memory: 1Gi
cpu: '1.0'
image: grycap/ffmpeg
script: split-video.sh
input:
- storage_provider: minio
path: video-process/in
output:
- storage_provider: minio
path: video-process/med
- oscar-cluster:
name: darknet
memory: 1Gi
cpu: '1.0'
image: grycap/darknet-v3
script: yolov3-object-detection.sh
input:
- storage_provider: minio
path: video-process/med
output:
- storage_provider: minio
path: video-process/out
6 changes: 4 additions & 2 deletions pkg/backends/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import (
type KubeBackend struct {
kubeClientset *kubernetes.Clientset
namespace string
config *types.Config
}

// MakeKubeBackend makes a KubeBackend with the provided k8s clientset
func MakeKubeBackend(kubeClientset *kubernetes.Clientset, cfg *types.Config) *KubeBackend {
return &KubeBackend{
kubeClientset: kubeClientset,
namespace: cfg.ServicesNamespace,
config: cfg,
}
}

Expand Down Expand Up @@ -80,7 +82,7 @@ func (k *KubeBackend) CreateService(service types.Service) error {
}

// Create podSpec from the service
podSpec, err := service.ToPodSpec()
podSpec, err := service.ToPodSpec(k.config)
if err != nil {
// Delete the previously created configMap
if delErr := deleteServiceConfigMap(service.Name, k.namespace, k.kubeClientset); delErr != nil {
Expand Down Expand Up @@ -144,7 +146,7 @@ func (k *KubeBackend) UpdateService(service types.Service) error {
}

// Create podSpec from the service
podSpec, err := service.ToPodSpec()
podSpec, err := service.ToPodSpec(k.config)
if err != nil {
// Restore the old configMap
_, resErr := k.kubeClientset.CoreV1().ConfigMaps(k.namespace).Update(context.TODO(), oldCm, metav1.UpdateOptions{})
Expand Down
6 changes: 4 additions & 2 deletions pkg/backends/openfaas.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type OpenfaasBackend struct {
namespace string
gatewayEndpoint string
scaler *utils.OpenfaasScaler
config *types.Config
}

// MakeOpenfaasBackend makes a OpenfaasBackend from the provided k8S clientset and config
Expand All @@ -62,6 +63,7 @@ func MakeOpenfaasBackend(kubeClientset *kubernetes.Clientset, kubeConfig *rest.C
namespace: cfg.ServicesNamespace,
gatewayEndpoint: fmt.Sprintf("gateway.%s:%d", cfg.OpenfaasNamespace, cfg.OpenfaasPort),
scaler: utils.NewOFScaler(kubeClientset, cfg),
config: cfg,
}
}

Expand Down Expand Up @@ -168,7 +170,7 @@ func (of *OpenfaasBackend) CreateService(service types.Service) error {
}

// Create podSpec from the service
podSpec, err := service.ToPodSpec()
podSpec, err := service.ToPodSpec(of.config)
if err != nil {
// Delete the function
delErr := of.ofClientset.OpenfaasV1().Functions(of.namespace).Delete(context.TODO(), service.Name, metav1.DeleteOptions{})
Expand Down Expand Up @@ -245,7 +247,7 @@ func (of *OpenfaasBackend) UpdateService(service types.Service) error {
}

// Create podSpec from the service
podSpec, err := service.ToPodSpec()
podSpec, err := service.ToPodSpec(of.config)
if err != nil {
// Restore the old configMap
_, resErr := of.kubeClientset.CoreV1().ConfigMaps(of.namespace).Update(context.TODO(), oldCm, metav1.UpdateOptions{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
}

// Get podSpec from the service
podSpec, err := service.ToPodSpec()
podSpec, err := service.ToPodSpec(cfg)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
Expand Down
77 changes: 71 additions & 6 deletions pkg/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const (
defaultOpenfaasScalerEnable = false
defaultOpenfaasScalerInterval = "2m"
defaultOpenfaasScalerInactivityDuration = "10m"
defaultWatchdogMaxInflight = 1
defaultWatchdogWriteDebug = true
defaultWatchdogExecTimeout = 0
defaultWatchdogReadTimeout = 300
defaultWatchdogWriteTimeout = 300
)

// Config stores the configuration for the OSCAR server
Expand Down Expand Up @@ -91,6 +96,21 @@ type Config struct {
// OpenfaasScalerInactivityDuration
OpenfaasScalerInactivityDuration string `json:"-"`

// WatchdogMaxInflight
WatchdogMaxInflight int `json:"-"`

// WatchdogWriteDebug
WatchdogWriteDebug bool `json:"-"`

// WatchdogExecTimeout
WatchdogExecTimeout int `json:"-"`

// WatchdogReadTimeout
WatchdogReadTimeout int `json:"-"`

// WatchdogWriteTimeout
WatchdogWriteTimeout int `json:"-"`

// HTTP timeout for reading the payload (default: 300)
ReadTimeout time.Duration `json:"-"`

Expand Down Expand Up @@ -156,7 +176,7 @@ func ReadConfig() (*Config, error) {
if len(os.Getenv("MINIO_ENDPOINT")) > 0 {
config.MinIOProvider.Endpoint = os.Getenv("MINIO_ENDPOINT")
if _, err = url.Parse(config.MinIOProvider.Endpoint); err != nil {
return nil, fmt.Errorf("The MINIO_ENDPOINT value is not valid. Error: %s", err)
return nil, fmt.Errorf("The MINIO_ENDPOINT value is not valid. Error: %v", err)
}
} else {
config.MinIOProvider.Endpoint = defaultMinIOEndpoint
Expand Down Expand Up @@ -197,7 +217,7 @@ func ReadConfig() (*Config, error) {
if len(os.Getenv("OPENFAAS_PORT")) > 0 {
config.OpenfaasPort, err = strconv.Atoi(os.Getenv("OPENFAAS_PORT"))
if err != nil {
return nil, fmt.Errorf("The OPENFAAS_PORT value is not valid. Error: %s", err)
return nil, fmt.Errorf("The OPENFAAS_PORT value is not valid. Error: %v", err)
}
} else {
config.OpenfaasPort = defaultOpenfaasPort
Expand All @@ -212,7 +232,7 @@ func ReadConfig() (*Config, error) {
if len(os.Getenv("OPENFAAS_PROMETHEUS_PORT")) > 0 {
config.OpenfaasPrometheusPort, err = strconv.Atoi(os.Getenv("OPENFAAS_PROMETHEUS_PORT"))
if err != nil {
return nil, fmt.Errorf("The OPENFAAS_PORT value is not valid. Error: %s", err)
return nil, fmt.Errorf("The OPENFAAS_PORT value is not valid. Error: %v", err)
}
} else {
config.OpenfaasPrometheusPort = defaultOpenfaasPrometheusPort
Expand Down Expand Up @@ -240,10 +260,55 @@ func ReadConfig() (*Config, error) {
}
}

if len(os.Getenv("WATCHDOG_MAX_INFLIGHT")) > 0 {
config.WatchdogMaxInflight, err = strconv.Atoi(os.Getenv("WATCHDOG_MAX_INFLIGHT"))
if err != nil {
return nil, fmt.Errorf("The WATCHDOG_MAX_INFLIGHT value is not valid. Error: %v", err)
}
} else {
config.WatchdogMaxInflight = defaultWatchdogMaxInflight
}

if len(os.Getenv("WATCHDOG_WRITE_DEBUG")) > 0 {
config.WatchdogWriteDebug, err = strconv.ParseBool(os.Getenv("WATCHDOG_WRITE_DEBUG"))
if err != nil {
return nil, fmt.Errorf("The WATCHDOG_WRITE_DEBUG value must be a boolean")
}
} else {
config.WatchdogWriteDebug = defaultWatchdogWriteDebug
}

if len(os.Getenv("WATCHDOG_EXEC_TIMEOUT")) > 0 {
config.WatchdogExecTimeout, err = strconv.Atoi(os.Getenv("WATCHDOG_EXEC_TIMEOUT"))
if err != nil {
return nil, fmt.Errorf("The WATCHDOG_EXEC_TIMEOUT value is not valid. Error: %v", err)
}
} else {
config.WatchdogExecTimeout = defaultWatchdogExecTimeout
}

if len(os.Getenv("WATCHDOG_READ_TIMEOUT")) > 0 {
config.WatchdogReadTimeout, err = strconv.Atoi(os.Getenv("WATCHDOG_READ_TIMEOUT"))
if err != nil {
return nil, fmt.Errorf("The WATCHDOG_READ_TIMEOUT value is not valid. Error: %v", err)
}
} else {
config.WatchdogReadTimeout = defaultWatchdogReadTimeout
}

if len(os.Getenv("WATCHDOG_WRITE_TIMEOUT")) > 0 {
config.WatchdogWriteTimeout, err = strconv.Atoi(os.Getenv("WATCHDOG_WRITE_TIMEOUT"))
if err != nil {
return nil, fmt.Errorf("The WATCHDOG_WRITE_TIMEOUT value is not valid. Error: %v", err)
}
} else {
config.WatchdogWriteTimeout = defaultWatchdogWriteTimeout
}

if len(os.Getenv("READ_TIMEOUT")) > 0 {
config.ReadTimeout, err = parseSeconds(os.Getenv("READ_TIMEOUT"))
if err != nil {
return nil, fmt.Errorf("The READ_TIMEOUT value is not valid. Error: %s", err)
return nil, fmt.Errorf("The READ_TIMEOUT value is not valid. Error: %v", err)
}
} else {
config.ReadTimeout = defaultTimeout
Expand All @@ -252,7 +317,7 @@ func ReadConfig() (*Config, error) {
if len(os.Getenv("WRITE_TIMEOUT")) > 0 {
config.WriteTimeout, err = parseSeconds(os.Getenv("WRITE_TIMEOUT"))
if err != nil {
return nil, fmt.Errorf("The WRITE_TIMEOUT value is not valid. Error: %s", err)
return nil, fmt.Errorf("The WRITE_TIMEOUT value is not valid. Error: %v", err)
}
} else {
config.WriteTimeout = defaultTimeout
Expand All @@ -261,7 +326,7 @@ func ReadConfig() (*Config, error) {
if len(os.Getenv("OSCAR_SERVICE_PORT")) > 0 {
config.ServicePort, err = strconv.Atoi(os.Getenv("OSCAR_SERVICE_PORT"))
if err != nil {
return nil, fmt.Errorf("The OSCAR_SERVICE_PORT value is not valid. Error: %s", err)
return nil, fmt.Errorf("The OSCAR_SERVICE_PORT value is not valid. Error: %v", err)
}
} else {
config.ServicePort = defaultServicePort
Expand Down
Loading

0 comments on commit 0cbc948

Please sign in to comment.