From fce47879d02c7cab4a4f54be5fdde4f38eb83c3b Mon Sep 17 00:00:00 2001 From: Mario Winkler Date: Mon, 7 Oct 2024 12:09:13 +0200 Subject: [PATCH] Update registration harvester worker --- worker/worker/config.py | 42 ++++- worker/worker/log_utils.py | 8 + worker/worker/tasks/sentinel.py | 41 +++-- workflows/sentinel-registration-hourly.bpmn | 183 ++++++++++++++++++++ 4 files changed, 253 insertions(+), 21 deletions(-) create mode 100644 workflows/sentinel-registration-hourly.bpmn diff --git a/worker/worker/config.py b/worker/worker/config.py index 3e40db8..91adf1c 100644 --- a/worker/worker/config.py +++ b/worker/worker/config.py @@ -1,7 +1,7 @@ import os from typing import get_type_hints, Union from dotenv import load_dotenv -from worker.tasks.sentinel import sentinel_check_data, sentinel_log_data +from worker.tasks.sentinel import sentinel_discover_data, sentinel_download_data, sentinel_unzip, sentinel_check_integrity, sentinel_extract_metadata, sentinel_register_metadata # Load configuration # The value of a variable is the first of the values found in: @@ -26,22 +26,54 @@ class HarvesterConfig: # Name in lower case to skip mapping of env variables default_subscriptions = { - "sentinel_check_data": { - "callback_handler": sentinel_check_data, + "sentinel_discover_data": { + "callback_handler": sentinel_discover_data, "lock_duration": "PT1M", "number_of_retries": 5, "scope_type": None, "wait_period_seconds": 1, "number_of_tasks": 1 }, - "sentinel_log_data": { - "callback_handler": sentinel_log_data, + "sentinel_download_data": { + "callback_handler": sentinel_download_data, "lock_duration": "PT1M", "number_of_retries": 5, "scope_type": None, "wait_period_seconds": 1, "number_of_tasks": 1 }, + "sentinel_unzip": { + "callback_handler": sentinel_unzip, + "lock_duration": "PT1M", + "number_of_retries": 5, + "scope_type": None, + "wait_period_seconds": 1, + "number_of_tasks": 1 + }, + "sentinel_check_integrity": { + "callback_handler": sentinel_check_integrity, + "lock_duration": "PT1M", + "number_of_retries": 5, + "scope_type": None, + "wait_period_seconds": 1, + "number_of_tasks": 1 + }, + "sentinel_extract_metadata": { + "callback_handler": sentinel_extract_metadata, + "lock_duration": "PT1M", + "number_of_retries": 5, + "scope_type": None, + "wait_period_seconds": 1, + "number_of_tasks": 1 + }, + "sentinel_register_metadata": { + "callback_handler": sentinel_register_metadata, + "lock_duration": "PT1M", + "number_of_retries": 5, + "scope_type": None, + "wait_period_seconds": 1, + "number_of_tasks": 1 + }, } """ diff --git a/worker/worker/log_utils.py b/worker/worker/log_utils.py index 054f87d..7c39823 100644 --- a/worker/worker/log_utils.py +++ b/worker/worker/log_utils.py @@ -8,6 +8,14 @@ def configure_logging(): datefmt="%Y-%m-%dT%H:%M:%S" ) +def log_with_job(message, job=None, log_level='info', **kwargs): + log_function = __get_log_function(log_level) + + if job is not None: + log_function(f"[BPMN_TASK: {job.element.id}] {message}", **kwargs) + else: + log_function(message, **kwargs) + def log_with_context(message, context=None, log_level='info', **kwargs): context = context if context is not None else {} log_function = __get_log_function(log_level) diff --git a/worker/worker/tasks/sentinel.py b/worker/worker/tasks/sentinel.py index 224f306..ce318e2 100644 --- a/worker/worker/tasks/sentinel.py +++ b/worker/worker/tasks/sentinel.py @@ -1,14 +1,9 @@ -from worker.log_utils import configure_logging, log_with_context +from worker.log_utils import configure_logging, log_with_context, log_with_job configure_logging() -def sentinel_check_data(job, worker_result_builder): - log_context = { - "JOB": job.id, - "PROCESS_INSTANCE": job.process_instance_id, - "TASK": job.element_name, - } - log_with_context("sentinel_check_data", log_context) +def sentinel_discover_data(job, worker_result_builder): + log_with_job(message="Discovering new sentinel data ...", job=job) # job variables # for v in job.variables: @@ -24,13 +19,27 @@ def sentinel_check_data(job, worker_result_builder): return result -def sentinel_log_data(job, worker_result_builder): - log_context = { - "JOB": job.id, - "PROCESS_INSTANCE": job.process_instance_id, - "TASK": job.element_name, - } - log_with_context("sentinel_log_data", log_context) - +def sentinel_download_data(job, worker_result_builder): + log_with_job(message="Downloading data ...", job=job) + result = worker_result_builder.success() + return result + +def sentinel_unzip(job, worker_result_builder): + log_with_job(message="Unzipping ...", job=job) + result = worker_result_builder.success() + return result + +def sentinel_check_integrity(job, worker_result_builder): + log_with_job(message="Checking integrity ...", job=job) + result = worker_result_builder.success() + return result + +def sentinel_extract_metadata(job, worker_result_builder): + log_with_job(message="Extracting metadata and creating STAC item ...", job=job) + result = worker_result_builder.success() + return result + +def sentinel_register_metadata(job, worker_result_builder): + log_with_job(message="Registering STAC item ...", job=job) result = worker_result_builder.success() return result diff --git a/workflows/sentinel-registration-hourly.bpmn b/workflows/sentinel-registration-hourly.bpmn new file mode 100644 index 0000000..eeab8cd --- /dev/null +++ b/workflows/sentinel-registration-hourly.bpmn @@ -0,0 +1,183 @@ + + + + + + + + + + + + + + discoverSentinelScenes + bpmnTask_11 + bpmnTask_13 + bpmnTask_15 + bpmnTask_17 + bpmnTask_19 + startEachHour + bpmnEndEvent_21 + bpmnSequenceFlow_12 + bpmnSequenceFlow_14 + bpmnSequenceFlow_16 + bpmnSequenceFlow_18 + bpmnSequenceFlow_20 + bpmnSequenceFlow_22 + bpmnSequenceFlow_3 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + R/P0Y0M0DT1H0M0S + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file