Skip to content

Commit

Permalink
Update Sentinel workflows and worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mario-winkler committed Oct 25, 2024
1 parent 7e6251f commit 9f0fb80
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 213 deletions.
1 change: 1 addition & 0 deletions src/worker/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class WorkerConfig:
FLOWABLE_REST_PASSWORD: str = "eoepca"
FLOWABLE_HOST_CACERT: str = "etc/eoepca-ca-chain.pem"
FLOWABLE_USE_TLS: bool = True
LOG_LEVEL: str = "INFO"

"""
Map environment variables to class fields according to these rules:
Expand Down
23 changes: 17 additions & 6 deletions src/worker/common/log_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging

from worker.common.config import Config

def configure_logging():
logging.basicConfig(
level=logging.INFO,
level=__get_log_level(Config.LOG_LEVEL),
format="%(asctime)s.%(msecs)03d [%(levelname)s] [%(thread)d] %(message)s",
handlers=[logging.StreamHandler()],
datefmt="%Y-%m-%dT%H:%M:%S",
Expand All @@ -14,10 +14,18 @@ def log_with_job(message, job=None, log_level="info", **kwargs):
log_function = __get_log_function(log_level)

if job is not None:
log_function(f"[BPMN_TASK: {job.element_name}] {message}", **kwargs)
log_function(f"[JOB: {job.id} BPMN_TASK: {job.element_name}] {message}", **kwargs)
else:
log_function(message, **kwargs)

def log_variable(variable, job=None, log_level="info", **kwargs):
log_function = __get_log_function(log_level)

message = f"[TASK_VARIABLE] name={variable.name} value='{variable.value}' type={variable.type}"
if job is not None:
log_with_job(message=message, job=job)
else:
log_function(msg=message)

def log_with_context(message, context=None, log_level="info", **kwargs):
context = context if context is not None else {}
Expand All @@ -38,7 +46,10 @@ def __get_log_context_prefix(context):
log_context_prefix += f"[{k}:{v}]"
return log_context_prefix

def __get_log_level(log_level: str):
switcher = {"info": logging.INFO, "debug": logging.DEBUG, "warning": logging.WARNING, "error": logging.ERROR}
return switcher.get(log_level.lower(), logging.INFO)

def __get_log_function(log_level):
switcher = {"info": logging.info, "warning": logging.warning, "error": logging.error}
return switcher.get(log_level, logging.info)
def __get_log_function(log_level: str):
switcher = {"info": logging.info, "debug": logging.debug, "warning": logging.warning, "error": logging.error}
return switcher.get(log_level.lower(), logging.info)
98 changes: 88 additions & 10 deletions src/worker/sentinel/tasks.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,113 @@
from worker.common.log_utils import configure_logging, log_with_job
import json
from worker.common.log_utils import configure_logging, log_with_job, log_variable

configure_logging()


def sentinel_discover_data(job, worker_result_builder):
log_with_job(message="Discovering new sentinel data ...", job=job)
# job variables
# for v in job.variables:
# log_variable(v)

# execute task and build result
# discovering scenes
scenes = []
scene1 = {
"scene": {
"name": "scene1"
}
}
scene2 = {
"scene": {
"name": "scene2"
}
}
scenes.append(scene1)
#scenes.append(scene2)

# build result
result = worker_result_builder.success()
result.variable_json(name="scenes", value=scenes)

# result variables
# result.variable_string("zip_file_path", "/path/to/zip")
# for v in result._variables:
# log_variable(v)
for v in result._variables:
log_variable(variable=v, job=job, log_level="info")

return result

def sentinel_order_data(job, worker_result_builder):
log_with_job(message="Ordering data ...", job=job)
result = worker_result_builder.success()
return result

def sentinel_download_data(job, worker_result_builder):
log_with_job(message="Downloading data ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

result = worker_result_builder.success()
return result


def sentinel_unzip(job, worker_result_builder):
log_with_job(message="Unzipping ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

result = worker_result_builder.success()
return result


def sentinel_check_integrity(job, worker_result_builder):
log_with_job(message="Checking integrity ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

job_vars = {}
for v in job.variables:
job_vars[v.name] = v.value

scene = job_vars["scene"]
log_with_job(f"Input variable: scene={scene}")

scene['collection'] = "sentinel-1"
#scene['collection'] = ""
log_with_job(f"Output variable: scene={scene}")
result = worker_result_builder.success()
result.variable_string(name="collection", value=scene['collection'])

return result


def sentinel_extract_metadata(job, worker_result_builder):
log_with_job(message="Extracting metadata and creating STAC item ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

result = worker_result_builder.success()
return result


def sentinel_register_metadata(job, worker_result_builder):
log_with_job(message="Registering STAC item ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

result = worker_result_builder.success()
return result

def sentinel_inventory_update(job, worker_result_builder):
log_with_job(message="Updating inventory ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

result = worker_result_builder.success()
return result

Expand All @@ -59,6 +121,14 @@ def sentinel_register_metadata(job, worker_result_builder):
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_order_data": {
"callback_handler": sentinel_order_data,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_download_data": {
"callback_handler": sentinel_download_data,
"lock_duration": "PT1M",
Expand Down Expand Up @@ -99,4 +169,12 @@ def sentinel_register_metadata(job, worker_result_builder):
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_inventory_update": {
"callback_handler": sentinel_inventory_update,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 10,
},
}
Loading

0 comments on commit 9f0fb80

Please sign in to comment.