Skip to content

Commit

Permalink
Update BPMN models and tasks implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mario-winkler committed Nov 6, 2024
1 parent 735145f commit edd43bc
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 271 deletions.
21 changes: 21 additions & 0 deletions src/worker/common/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from flowable.external_worker_client import ExternalWorkerClient
from worker.common.config import Config
from requests.auth import HTTPBasicAuth


def customize_session(session):
if Config.FLOWABLE_USE_TLS and session is not None:
session.verify = Config.FLOWABLE_HOST_CACERT
return session
else:
return None


flowableClient = ExternalWorkerClient(
flowable_host=Config.FLOWABLE_HOST,
auth=HTTPBasicAuth(
Config.FLOWABLE_REST_USER,
Config.FLOWABLE_REST_PASSWORD,
),
customize_session=customize_session,
)
23 changes: 3 additions & 20 deletions src/worker/common/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,6 @@ def configure_logging():
)


def log_with_job(message, job=None, log_level="info", **kwargs):
log_function = __get_log_function(log_level)

if job is not None:
log_function(f"[JOB: {job.id} BPMN_TASK: {job.element_name}] {message}", **kwargs)
else:
log_function(message, **kwargs)


def log_variable(variable, job=None, log_level="info", **kwargs):
log_function = __get_log_function(log_level)

message = f"[TASK_VARIABLE] name={variable.name} value='{variable.value}' type={variable.type}"
if job is not None:
log_with_job(message=message, job=job)
else:
log_function(msg=message)


def log_with_context(message, context=None, log_level="info", **kwargs):
context = context if context is not None else {}
log_function = __get_log_function(log_level)
Expand All @@ -44,9 +25,11 @@ def log_with_context(message, context=None, log_level="info", **kwargs):
def __get_log_context_prefix(context):
log_context_prefix = ""
if context:
log_context_prefix = "["
for k, v in context.items():
if v is not None:
log_context_prefix += f"[{k}:{v}]"
log_context_prefix += f"{k}: {v}"
log_context_prefix += "]"
return log_context_prefix


Expand Down
21 changes: 2 additions & 19 deletions src/worker/common/manager.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,17 @@
import logging
from datetime import datetime
from flowable.external_worker_client import ExternalWorkerClient
from requests.auth import HTTPBasicAuth
from worker.common.config import Config
from worker.common.log_utils import configure_logging
from worker.common.client import flowableClient

logger = logging.getLogger()
configure_logging()


def customize_session(session):
if Config.FLOWABLE_USE_TLS and session is not None:
session.verify = Config.FLOWABLE_HOST_CACERT
return session
else:
return None


class SubscriptionManager:
""" """

def __init__(self):
self.client = ExternalWorkerClient(
flowable_host=Config.FLOWABLE_HOST,
auth=HTTPBasicAuth(
Config.FLOWABLE_REST_USER,
Config.FLOWABLE_REST_PASSWORD,
),
customize_session=customize_session,
)
self.client = flowableClient
self.subscriptions = {}

def subscriptions_info(self):
Expand Down
95 changes: 52 additions & 43 deletions src/worker/sentinel/tasks.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,50 @@
from worker.common.log_utils import configure_logging, log_with_job
import datetime
from dateutil.parser import parse
from worker.common.log_utils import configure_logging, log_with_context
from worker.common.types import ExternalJob, JobResultBuilder, JobResult
from worker.common.client import flowableClient

configure_logging()


def sentinel_discover_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Discovering new sentinel data ...", job=job)
"""
Searches for new data since last workflow execution
log_with_job(f"Process Instance Id: {job.process_instance_id}", job=job)
Variables needed:
start_time
end_time
order_id
# get order id
Variables set:
scenes: List of scenes found
"""

log_context = {"JOB": job.id, "BPMN_TASK": job.element_name}
log_with_context("Discovering new sentinel data ...", log_context)

# Workflow variables
start_time = job.get_variable("start_time")
end_time = job.get_variable("end_time")
order_id = job.get_variable("order_id")
if order_id is None:
order_id = job.process_instance_id

if start_time is None and end_time is None:
history = flowableClient.get_process_instance_history(job.process_instance_id)
if "startTime" in history:
current_time = parse(history["startTime"]) # 2024-03-17T01:02:22.487+0000
log_with_context("use startTime from workflow: %s" % current_time, log_context)
else:
current_time = datetime.datetime.now()
log_with_context("use datetime.now() as startTime: %s" % current_time, log_context)
end_time = datetime.datetime(current_time.year, current_time.month, current_time.day, current_time.hour)
start_time = end_time - datetime.timedelta(hours=1)
start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

log_with_context(f"Search interval: {start_time} - {end_time}", log_context)

# discovering scenes
scenes = []
scene1 = {"scene": {"name": "scene1"}}
Expand All @@ -25,36 +56,34 @@ def sentinel_discover_data(job: ExternalJob, result: JobResultBuilder, config: d
return result.success().variable_json(name="scenes", value=scenes)


def sentinel_order_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Ordering data ...", job=job)
return result.success()


def sentinel_download_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Downloading data ...", job=job)
log_context = {"JOB": job.id, "BPMN_TASK": job.element_name}
log_with_context("Downloading data ...", log_context)

# get job variables
log_with_job(job.get_variable("scene"), job)
log_with_context(job.get_variable("scene"), log_context)

return result.success()


def sentinel_unzip(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Unzipping ...", job=job)
log_context = {"JOB": job.id, "BPMN_TASK": job.element_name}
log_with_context("Unzipping ...", log_context)

return result.success()


def sentinel_check_integrity(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Checking integrity ...", job=job)
log_context = {"JOB": job.id, "BPMN_TASK": job.element_name}
log_with_context("Checking integrity ...", log_context)

# get job variables
scene = job.get_variable("scene")
log_with_job(f"Input variable: scene={scene}", job)
log_with_context(f"Input variable: scene={scene}", log_context)

# scene["collection"] = "sentinel-1"
scene["collection"] = ""
log_with_job(f"Output variable: scene={scene}", job)
log_with_context(f"Output variable: scene={scene}", log_context)

return (
result.success()
Expand All @@ -64,27 +93,23 @@ def sentinel_check_integrity(job: ExternalJob, result: JobResultBuilder, config:


def sentinel_extract_metadata(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Extracting metadata and creating STAC item ...", job=job)
log_context = {"JOB": job.id, "BPMN_TASK": job.element_name}
log_with_context("Extracting metadata and creating STAC item ...", log_context)

# get job variables
log_with_job(f"scene={job.get_variable("scene")}", job)
log_with_job(f"collection={job.get_variable("collection")}", job)
log_with_context(f"scene={job.get_variable("scene")}", log_context)
log_with_context(f"collection={job.get_variable("collection")}", log_context)

return result.success()


def sentinel_register_metadata(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Registering STAC item ...", job=job)

return result.success()


def sentinel_inventory_update(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Updating inventory ...", job=job)
log_context = {"JOB": job.id, "BPMN_TASK": job.element_name}
log_with_context("Registering STAC item ...", log_context)

# get job variables
log_with_job(f"scene={job.get_variable("scene")}", job)
log_with_job(f"collection={job.get_variable("collection")}", job)
log_with_context(f"scene={job.get_variable("scene")}", log_context)
log_with_context(f"collection={job.get_variable("collection")}", log_context)

return result.success()

Expand All @@ -98,14 +123,6 @@ def sentinel_inventory_update(job: ExternalJob, result: JobResultBuilder, config
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_order_data": {
"callback_handler": sentinel_order_data,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_download_data": {
"callback_handler": sentinel_download_data,
"lock_duration": "PT1M",
Expand Down Expand Up @@ -146,12 +163,4 @@ def sentinel_inventory_update(job: ExternalJob, result: JobResultBuilder, config
"wait_period_seconds": 1,
"number_of_tasks": 1,
},
"sentinel_inventory_update": {
"callback_handler": sentinel_inventory_update,
"lock_duration": "PT1M",
"number_of_retries": 5,
"scope_type": None,
"wait_period_seconds": 1,
"number_of_tasks": 10,
},
}
Loading

0 comments on commit edd43bc

Please sign in to comment.