diff --git a/src/worker/common/config.py b/src/worker/common/config.py index aa19ee8..d8e5fa5 100644 --- a/src/worker/common/config.py +++ b/src/worker/common/config.py @@ -25,6 +25,7 @@ class WorkerConfig: FLOWABLE_REST_PASSWORD: str = "eoepca" FLOWABLE_HOST_CACERT: str = "etc/eoepca-ca-chain.pem" FLOWABLE_USE_TLS: bool = True + LOG_LEVEL: str = "INFO" """ Map environment variables to class fields according to these rules: diff --git a/src/worker/common/log_utils.py b/src/worker/common/log_utils.py index cd7d7d9..919dda2 100644 --- a/src/worker/common/log_utils.py +++ b/src/worker/common/log_utils.py @@ -1,9 +1,9 @@ import logging - +from worker.common.config import Config def configure_logging(): logging.basicConfig( - level=logging.INFO, + level=__get_log_level(Config.LOG_LEVEL), format="%(asctime)s.%(msecs)03d [%(levelname)s] [%(thread)d] %(message)s", handlers=[logging.StreamHandler()], datefmt="%Y-%m-%dT%H:%M:%S", @@ -14,10 +14,18 @@ def log_with_job(message, job=None, log_level="info", **kwargs): log_function = __get_log_function(log_level) if job is not None: - log_function(f"[BPMN_TASK: {job.element_name}] {message}", **kwargs) + log_function(f"[JOB: {job.id} BPMN_TASK: {job.element_name}] {message}", **kwargs) else: log_function(message, **kwargs) +def log_variable(variable, job=None, log_level="info", **kwargs): + log_function = __get_log_function(log_level) + + message = f"[TASK_VARIABLE] name={variable.name} value='{variable.value}' type={variable.type}" + if job is not None: + log_with_job(message=message, job=job) + else: + log_function(msg=message) def log_with_context(message, context=None, log_level="info", **kwargs): context = context if context is not None else {} @@ -38,7 +46,10 @@ def __get_log_context_prefix(context): log_context_prefix += f"[{k}:{v}]" return log_context_prefix +def __get_log_level(log_level: str): + switcher = {"info": logging.INFO, "debug": logging.DEBUG, "warning": logging.WARNING, "error": logging.ERROR} + return switcher.get(log_level.lower(), logging.INFO) -def __get_log_function(log_level): - switcher = {"info": logging.info, "warning": logging.warning, "error": logging.error} - return switcher.get(log_level, logging.info) +def __get_log_function(log_level: str): + switcher = {"info": logging.info, "debug": logging.debug, "warning": logging.warning, "error": logging.error} + return switcher.get(log_level.lower(), logging.info) diff --git a/src/worker/sentinel/tasks.py b/src/worker/sentinel/tasks.py index 891dbe7..90ee7c6 100644 --- a/src/worker/sentinel/tasks.py +++ b/src/worker/sentinel/tasks.py @@ -1,51 +1,113 @@ -from worker.common.log_utils import configure_logging, log_with_job +import json +from worker.common.log_utils import configure_logging, log_with_job, log_variable configure_logging() def sentinel_discover_data(job, worker_result_builder): log_with_job(message="Discovering new sentinel data ...", job=job) - # job variables - # for v in job.variables: - # log_variable(v) - # execute task and build result + # discovering scenes + scenes = [] + scene1 = { + "scene": { + "name": "scene1" + } + } + scene2 = { + "scene": { + "name": "scene2" + } + } + scenes.append(scene1) + #scenes.append(scene2) + + # build result result = worker_result_builder.success() + result.variable_json(name="scenes", value=scenes) - # result variables - # result.variable_string("zip_file_path", "/path/to/zip") - # for v in result._variables: - # log_variable(v) + for v in result._variables: + log_variable(variable=v, job=job, log_level="info") return result +def sentinel_order_data(job, worker_result_builder): + log_with_job(message="Ordering data ...", job=job) + result = worker_result_builder.success() + return result def sentinel_download_data(job, worker_result_builder): log_with_job(message="Downloading data ...", job=job) + + # get job variables + for v in job.variables: + log_variable(variable=v, job=job, log_level="info") + result = worker_result_builder.success() return result - def sentinel_unzip(job, worker_result_builder): log_with_job(message="Unzipping ...", job=job) + + # get job variables + for v in job.variables: + log_variable(variable=v, job=job, log_level="info") + result = worker_result_builder.success() return result def sentinel_check_integrity(job, worker_result_builder): log_with_job(message="Checking integrity ...", job=job) + + # get job variables + for v in job.variables: + log_variable(variable=v, job=job, log_level="info") + + job_vars = {} + for v in job.variables: + job_vars[v.name] = v.value + + scene = job_vars["scene"] + log_with_job(f"Input variable: scene={scene}") + + scene['collection'] = "sentinel-1" + #scene['collection'] = "" + log_with_job(f"Output variable: scene={scene}") result = worker_result_builder.success() + result.variable_string(name="collection", value=scene['collection']) + return result def sentinel_extract_metadata(job, worker_result_builder): log_with_job(message="Extracting metadata and creating STAC item ...", job=job) + + # get job variables + for v in job.variables: + log_variable(variable=v, job=job, log_level="info") + result = worker_result_builder.success() return result def sentinel_register_metadata(job, worker_result_builder): log_with_job(message="Registering STAC item ...", job=job) + + # get job variables + for v in job.variables: + log_variable(variable=v, job=job, log_level="info") + + result = worker_result_builder.success() + return result + +def sentinel_inventory_update(job, worker_result_builder): + log_with_job(message="Updating inventory ...", job=job) + + # get job variables + for v in job.variables: + log_variable(variable=v, job=job, log_level="info") + result = worker_result_builder.success() return result @@ -59,6 +121,14 @@ def sentinel_register_metadata(job, worker_result_builder): "wait_period_seconds": 1, "number_of_tasks": 1, }, + "sentinel_order_data": { + "callback_handler": sentinel_order_data, + "lock_duration": "PT1M", + "number_of_retries": 5, + "scope_type": None, + "wait_period_seconds": 1, + "number_of_tasks": 1, + }, "sentinel_download_data": { "callback_handler": sentinel_download_data, "lock_duration": "PT1M", @@ -99,4 +169,12 @@ def sentinel_register_metadata(job, worker_result_builder): "wait_period_seconds": 1, "number_of_tasks": 1, }, + "sentinel_inventory_update": { + "callback_handler": sentinel_inventory_update, + "lock_duration": "PT1M", + "number_of_retries": 5, + "scope_type": None, + "wait_period_seconds": 1, + "number_of_tasks": 10, + }, } diff --git a/workflows/sentinel-registration-hourly.bpmn b/workflows/sentinel-registration-hourly.bpmn index d72f617..7fd8ce4 100644 --- a/workflows/sentinel-registration-hourly.bpmn +++ b/workflows/sentinel-registration-hourly.bpmn @@ -1,39 +1,44 @@ - - - + + + + BPMNDiagram 2024-10-24T08:10:03.801Z - 2024-10-24T11:22:48.559Z + 2024-10-25T12:59:08.992Z - bpmnSequenceFlow_8 - bpmnSequenceFlow_5 - bpmnSequenceFlow_9 - bpmnSequenceFlow_10 - bpmnSequenceFlow_2 - bpmnSequenceFlow_3 bpmnCallActivity_6 bpmnTask_1 + bpmnTask_9 bpmnGateway_4 bpmnStartEvent_3 bpmnStartEvent_4 + bpmnStartEvent_9 bpmnEndEvent_6 + bpmnSequenceFlow_8 + bpmnSequenceFlow_5 + bpmnSequenceFlow_12 + bpmnSequenceFlow_9 + bpmnSequenceFlow_10 + bpmnSequenceFlow_2 + bpmnSequenceFlow_3 + bpmnSequenceFlow_11 - + CallActivity - + @@ -43,6 +48,12 @@ Task + + + ExternalWorkerTask + Task + + Exclusive_Databased_Gateway @@ -66,6 +77,14 @@ + + + false + StartMessageEvent + true + + + EndNoneEvent @@ -82,19 +101,24 @@ SequenceFlow + + + SequenceFlow + + SequenceFlow true - #{!S(scenes).elements().isEmpty()} + ${var:isNotEmpty(scenes)} SequenceFlow true - #{S(scenes).elements().isEmpty()} + ${var:isEmpty(scenes)} @@ -106,14 +130,19 @@ SequenceFlow + + + SequenceFlow + + - - + + - - + + @@ -121,6 +150,9 @@ + + + @@ -136,6 +168,12 @@ + + + + + + @@ -150,6 +188,11 @@ + + + + + @@ -163,7 +206,7 @@ - + @@ -176,13 +219,17 @@ - - + + + + + + - + - + diff --git a/workflows/sentinel-scene-ingestion.bpmn b/workflows/sentinel-scene-ingestion.bpmn index 5077d56..1580ffd 100644 --- a/workflows/sentinel-scene-ingestion.bpmn +++ b/workflows/sentinel-scene-ingestion.bpmn @@ -1,260 +1,245 @@ - + - + + + + + - + - - - + BPMNDiagram + 2024-10-25T09:20:17.915Z + 2024-10-25T15:11:36.706Z - - bpmnTask_1 - bpmnTask_3 - bpmnTask_4 - bpmnTask_9 - bpmnTask_10 - bpmnTask_11 - bpmnGateway_5 - startnoneevent1 - bpmnEndEvent_12 + bpmnSequenceFlow_6 bpmnSequenceFlow_7 bpmnSequenceFlow_8 bpmnSequenceFlow_14 bpmnSequenceFlow_15 - bpmnAssociation_29 bpmnSequenceFlow_16 - bpmnAssociation_33 bpmnSequenceFlow_13 bpmnSequenceFlow_18 bpmnSequenceFlow_2 + bpmnTask_1 + bpmnTask_3 + bpmnTask_4 + bpmnTask_9 + bpmnTask_10 + bpmnTask_11 + bpmnGateway_5 + startnoneevent1 + bpmnEndEvent_12 - - - Download scene - - - - - - - - - - - - - - Check fileĀ  integrity - - - - - - - - Extract Metadata - - - - - - - - Register Metadata - - - - - - - - Update inventory - - - - - - - - - - - - - Start scene ingestion - - - - - - - - - End scene ingestion - - - - - - + SequenceFlow - + SequenceFlow - + SequenceFlow - + SequenceFlow - + SequenceFlow - + SequenceFlow - + SequenceFlow + ${var:isNotEmpty(collection)} - + - if no collection given, skip metadata part - - - + SequenceFlow + true + ${var:isEmpty(collection)} - + SequenceFlow - + + + ExternalWorkerTask + Task + + + + + ExternalWorkerTask + Task + + + + + ExternalWorkerTask + Task + + + + + ExternalWorkerTask + Task + + + + + ExternalWorkerTask + Task + + + + + ExternalWorkerTask + Task + + + - - + Exclusive_Databased_Gateway - - + + - - + false + StartNoneEvent + true - + + + + EndNoneEvent + true + + - - - + + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - + - - + + - + - - - + + + + + - - - + + + - - - + + + - - - + + + - - - + + + - - - - - - - - + + + - - - + + + - - - + + + + + + + + - - - + + + + + - - - - + + + + + + + + + + + - - - + + + - \ No newline at end of file +