From 5f338a9b28092db601b64e0426aaaa732985fed9 Mon Sep 17 00:00:00 2001 From: Markus Kunze Date: Wed, 13 Nov 2024 10:05:26 +0100 Subject: [PATCH] Add TaskHandler (includes YAML configuration) --- etc/config.yaml | 23 ++++ src/worker/common/manager.py | 30 +++++ src/worker/common/task_handler.py | 21 ++++ src/worker/sentinel/main.py | 7 -- src/worker/sentinel/tasks.py | 200 +++++++++++++++--------------- 5 files changed, 176 insertions(+), 105 deletions(-) create mode 100644 etc/config.yaml create mode 100644 src/worker/common/task_handler.py diff --git a/etc/config.yaml b/etc/config.yaml new file mode 100644 index 0000000..748ac5d --- /dev/null +++ b/etc/config.yaml @@ -0,0 +1,23 @@ +flowable: + +worker: + topics: + sentinel_discover_data: + module: worker.sentinel.tasks + handler: SentinelDiscoverHandler + sentinel_download_data: + module: worker.sentinel.tasks + handler: SentinelDownloadHandler + + handlers: + SentinelDiscoverHandler: + subscription_config: + number_of_retries: 10 + + SentinelDownloadHandler: + subscription_config: + number_of_retries: 20 + handler_config: + download_timeout: 10 + base_dir: /data/sentinel + diff --git a/src/worker/common/manager.py b/src/worker/common/manager.py index 6da8473..3264021 100644 --- a/src/worker/common/manager.py +++ b/src/worker/common/manager.py @@ -1,4 +1,7 @@ import logging +import yaml +import importlib +from pathlib import Path from datetime import datetime from worker.common.log_utils import configure_logging from worker.common.client import flowableClient @@ -13,6 +16,33 @@ class SubscriptionManager: def __init__(self): self.client = flowableClient self.subscriptions = {} + self._subscribe_handlers_from_config() + + + def _subscribe_handlers_from_config(self): + # Load config + config_path = Path(__file__).parent.parent.parent.parent / "etc/config.yaml" + with open(config_path) as f: + config_all = yaml.safe_load(f) + config_worker = config_all["worker"] + + # Create handlers map using config + handler_instances = {} + for topic, handler_config in config_worker["topics"].items(): + module = importlib.import_module(handler_config["module"]) + handler_class = getattr(module, handler_config["handler"]) + handler = handler_class(config_worker["handlers"]) + handler_instances[topic] = handler + + # Subscribe handlers + for topic, handler in handler_instances.items(): + self.subscribe( + topic=topic, + settings={ + "callback_handler": handler.execute, + **handler.subscription_config, + } + ) def subscriptions_info(self): subscriptions = {} diff --git a/src/worker/common/task_handler.py b/src/worker/common/task_handler.py new file mode 100644 index 0000000..76329d2 --- /dev/null +++ b/src/worker/common/task_handler.py @@ -0,0 +1,21 @@ +from worker.common.types import ExternalJob, JobResultBuilder, JobResult + +class TaskHandler: + def __init__(self, handlers_config: dict): + self.log_context = {} + handler_name = self.__class__.__name__ + self.config_all = handlers_config.get(handler_name, {}) + + # Merge with base config + self.subscription_config = { + "lock_duration": "PT1M", + "number_of_retries": 5, + "scope_type": None, + "wait_period_seconds": 1, + "number_of_tasks": 1, + **self.config_all.get("subscription_config", {}), + } + self.handler_config = self.config_all.get("handler_config", {}) + + def execute(self, job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult: + raise NotImplementedError \ No newline at end of file diff --git a/src/worker/sentinel/main.py b/src/worker/sentinel/main.py index d1cb983..a845b80 100644 --- a/src/worker/sentinel/main.py +++ b/src/worker/sentinel/main.py @@ -3,11 +3,9 @@ from worker.common.config import Config from worker.common.manager import SubscriptionManager from worker.common.log_utils import configure_logging -from worker.sentinel.tasks import tasks_config manager = SubscriptionManager() - @asynccontextmanager async def lifespan(app: FastAPI): configure_logging() @@ -19,13 +17,8 @@ async def lifespan(app: FastAPI): # end all subs before fastapi server shutdown manager.unsubscribe_all() - app = FastAPI(lifespan=lifespan) -for topic in tasks_config: - manager.subscribe(topic=topic, settings=tasks_config[topic]) - - @app.get("/subscriptions") def get_subscriptions(): return {"subscriptions": manager.subscriptions_info()} diff --git a/src/worker/sentinel/tasks.py b/src/worker/sentinel/tasks.py index ffd32fa..3a0bce9 100644 --- a/src/worker/sentinel/tasks.py +++ b/src/worker/sentinel/tasks.py @@ -1,59 +1,62 @@ +import os import datetime +import time +import json from dateutil.parser import parse from worker.common.log_utils import configure_logging, log_with_context from worker.common.types import ExternalJob, JobResultBuilder, JobResult from worker.common.client import flowableClient +from registration_library.providers import esa_cdse as cdse +from worker.common.task_handler import TaskHandler configure_logging() - -def sentinel_discover_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult: - """ - Searches for new data since last workflow execution - - Variables needed: - start_time - end_time - order_id - - Variables set: - scenes: List of scenes found - """ - - log_context = {"JOB": job.id, "BPMN_TASK": job.element_name} - log_with_context("Discovering new sentinel data ...", log_context) - - # Workflow variables - start_time = job.get_variable("start_time") - end_time = job.get_variable("end_time") - order_id = job.get_variable("order_id") - if order_id is None: - order_id = job.process_instance_id - - if start_time is None and end_time is None: - history = flowableClient.get_process_instance_history(job.process_instance_id) - if "startTime" in history: - current_time = parse(history["startTime"]) # 2024-03-17T01:02:22.487+0000 - log_with_context("use startTime from workflow: %s" % current_time, log_context) - else: - current_time = datetime.datetime.now() - log_with_context("use datetime.now() as startTime: %s" % current_time, log_context) - end_time = datetime.datetime(current_time.year, current_time.month, current_time.day, current_time.hour) - start_time = end_time - datetime.timedelta(hours=1) - start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - - log_with_context(f"Search interval: {start_time} - {end_time}", log_context) - - # discovering scenes - scenes = [] - scene1 = {"scene": {"name": "scene1"}} - # scene2 = {"scene": {"name": "scene2"}} - scenes.append(scene1) - # scenes.append(scene2) - - # build result - return result.success().variable_json(name="scenes", value=scenes) +class SentinelDiscoverHandler(TaskHandler): + def execute(self, job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult : + """ + Searches for new data since last workflow execution + + Variables needed: + start_time + end_time + order_id + + Variables set: + scenes: List of scenes found + """ + + log_context = {"JOB": job.id, "BPMN_TASK": job.element_name} + log_with_context("Discovering new sentinel data ...", log_context) + + # Workflow variables + start_time = job.get_variable("start_time") + end_time = job.get_variable("end_time") + order_id = job.get_variable("order_id") + if order_id is None: + order_id = job.process_instance_id + + if start_time is None and end_time is None: + history = flowableClient.get_process_instance_history(job.process_instance_id) + if "startTime" in history: + current_time = parse(history["startTime"]) # 2024-03-17T01:02:22.487+0000 + log_with_context("use startTime from workflow: %s" % current_time, log_context) + else: + current_time = datetime.datetime.now() + log_with_context("use datetime.now() as startTime: %s" % current_time, log_context) + end_time = datetime.datetime(current_time.year, current_time.month, current_time.day, current_time.hour) + start_time = end_time - datetime.timedelta(hours=1) + start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + log_with_context(f"Search interval: {start_time} - {end_time}", log_context) + + # Discovering scenes + try: + scenes = cdse.search_scenes_ingestion(date_from=start_time, date_to=end_time, filters=None) + except Exception as e: + log_with_context(f"Error searching scenes: {e}", log_context) + return result.error(f"Error searching scenes: {e}") + + return result.success().variable_json(name="scenes", value=scenes) def sentinel_download_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult: @@ -114,53 +117,54 @@ def sentinel_register_metadata(job: ExternalJob, result: JobResultBuilder, confi return result.success() -tasks_config = { - "sentinel_discover_data": { - "callback_handler": sentinel_discover_data, - "lock_duration": "PT1M", - "number_of_retries": 5, - "scope_type": None, - "wait_period_seconds": 1, - "number_of_tasks": 1, - }, - "sentinel_download_data": { - "callback_handler": sentinel_download_data, - "lock_duration": "PT1M", - "number_of_retries": 5, - "scope_type": None, - "wait_period_seconds": 1, - "number_of_tasks": 1, - }, - "sentinel_unzip": { - "callback_handler": sentinel_unzip, - "lock_duration": "PT1M", - "number_of_retries": 5, - "scope_type": None, - "wait_period_seconds": 1, - "number_of_tasks": 1, - }, - "sentinel_check_integrity": { - "callback_handler": sentinel_check_integrity, - "lock_duration": "PT1M", - "number_of_retries": 5, - "scope_type": None, - "wait_period_seconds": 1, - "number_of_tasks": 1, - }, - "sentinel_extract_metadata": { - "callback_handler": sentinel_extract_metadata, - "lock_duration": "PT1M", - "number_of_retries": 5, - "scope_type": None, - "wait_period_seconds": 1, - "number_of_tasks": 1, - }, - "sentinel_register_metadata": { - "callback_handler": sentinel_register_metadata, - "lock_duration": "PT1M", - "number_of_retries": 5, - "scope_type": None, - "wait_period_seconds": 1, - "number_of_tasks": 1, - }, -} +# tasks_config = { +# "sentinel_discover_data": { +# "callback_handler": sentinel_discover_data, +# "lock_duration": "PT1M", +# "number_of_retries": 5, +# "scope_type": None, +# "wait_period_seconds": 1, +# "number_of_tasks": 1, +# }, + # "sentinel_download_data": { + # "callback_handler": sentinel_download_data, + # "lock_duration": "PT1M", + # "number_of_retries": 5, + # "scope_type": None, + # "wait_period_seconds": 1, + # "number_of_tasks": 1, + # }, + + # "sentinel_unzip": { + # "callback_handler": sentinel_unzip, + # "lock_duration": "PT1M", + # "number_of_retries": 5, + # "scope_type": None, + # "wait_period_seconds": 1, + # "number_of_tasks": 1, + # }, + # "sentinel_check_integrity": { + # "callback_handler": sentinel_check_integrity, + # "lock_duration": "PT1M", + # "number_of_retries": 5, + # "scope_type": None, + # "wait_period_seconds": 1, + # "number_of_tasks": 1, + # }, + # "sentinel_extract_metadata": { + # "callback_handler": sentinel_extract_metadata, + # "lock_duration": "PT1M", + # "number_of_retries": 5, + # "scope_type": None, + # "wait_period_seconds": 1, + # "number_of_tasks": 1, + # }, + # "sentinel_register_metadata": { + # "callback_handler": sentinel_register_metadata, + # "lock_duration": "PT1M", + # "number_of_retries": 5, + # "scope_type": None, + # "wait_period_seconds": 1, + # "number_of_tasks": 1, + # }, +# }