Skip to content

Commit

Permalink
Improve handling of job variables
Browse files Browse the repository at this point in the history
  • Loading branch information
mario-winkler committed Oct 28, 2024
1 parent 9dd02f2 commit 735145f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies = [
"requests",
"fastapi[standard]",
"registration-library @ git+https://github.com/EOEPCA/registration-library",
"flowable.external-worker-client @ git+https://github.com/EOEPCA/eoepca-flowable-external-client-python@main"
"flowable.external-worker-client @ git+https://github.com/EOEPCA/eoepca-flowable-external-client-python"
]

[project.optional-dependencies]
Expand Down
7 changes: 7 additions & 0 deletions src/worker/common/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from flowable.external_worker_client.external_worker_acquire_job_response import ExternalWorkerAcquireJobResponse
from flowable.external_worker_client.worker_result import WorkResult
from flowable.external_worker_client import WorkerResultBuilder

ExternalJob = ExternalWorkerAcquireJobResponse
JobResultBuilder = WorkerResultBuilder
JobResult = WorkResult
96 changes: 39 additions & 57 deletions src/worker/sentinel/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from worker.common.log_utils import configure_logging, log_with_job, log_variable
from worker.common.log_utils import configure_logging, log_with_job
from worker.common.types import ExternalJob, JobResultBuilder, JobResult

configure_logging()


def sentinel_discover_data(job, worker_result_builder):
def sentinel_discover_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Discovering new sentinel data ...", job=job)

log_with_job(f"Process Instance Id: {job.process_instance_id}", job=job)

# get order id
order_id = job.get_variable("order_id")
if order_id is None:
order_id = job.process_instance_id

# discovering scenes
scenes = []
scene1 = {"scene": {"name": "scene1"}}
Expand All @@ -14,97 +22,71 @@ def sentinel_discover_data(job, worker_result_builder):
# scenes.append(scene2)

# build result
result = worker_result_builder.success()
result.variable_json(name="scenes", value=scenes)

for v in result._variables:
log_variable(variable=v, job=job, log_level="info")
return result.success().variable_json(name="scenes", value=scenes)

return result


def sentinel_order_data(job, worker_result_builder):
def sentinel_order_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Ordering data ...", job=job)
result = worker_result_builder.success()
return result
return result.success()


def sentinel_download_data(job, worker_result_builder):
def sentinel_download_data(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Downloading data ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")
log_with_job(job.get_variable("scene"), job)

result = worker_result_builder.success()
return result
return result.success()


def sentinel_unzip(job, worker_result_builder):
def sentinel_unzip(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Unzipping ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

result = worker_result_builder.success()
return result
return result.success()


def sentinel_check_integrity(job, worker_result_builder):
def sentinel_check_integrity(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Checking integrity ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

job_vars = {}
for v in job.variables:
job_vars[v.name] = v.value

scene = job_vars["scene"]
log_with_job(f"Input variable: scene={scene}")
scene = job.get_variable("scene")
log_with_job(f"Input variable: scene={scene}", job)

scene["collection"] = "sentinel-1"
# scene['collection'] = ""
log_with_job(f"Output variable: scene={scene}")
result = worker_result_builder.success()
result.variable_string(name="collection", value=scene["collection"])
# scene["collection"] = "sentinel-1"
scene["collection"] = ""
log_with_job(f"Output variable: scene={scene}", job)

return result
return (
result.success()
.variable_string(name="collection", value=scene["collection"])
.variable_json(name="scene", value=scene)
)


def sentinel_extract_metadata(job, worker_result_builder):
def sentinel_extract_metadata(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Extracting metadata and creating STAC item ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")
log_with_job(f"scene={job.get_variable("scene")}", job)
log_with_job(f"collection={job.get_variable("collection")}", job)

result = worker_result_builder.success()
return result
return result.success()


def sentinel_register_metadata(job, worker_result_builder):
def sentinel_register_metadata(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Registering STAC item ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")

result = worker_result_builder.success()
return result
return result.success()


def sentinel_inventory_update(job, worker_result_builder):
def sentinel_inventory_update(job: ExternalJob, result: JobResultBuilder, config: dict) -> JobResult:
log_with_job(message="Updating inventory ...", job=job)

# get job variables
for v in job.variables:
log_variable(variable=v, job=job, log_level="info")
log_with_job(f"scene={job.get_variable("scene")}", job)
log_with_job(f"collection={job.get_variable("collection")}", job)

result = worker_result_builder.success()
return result
return result.success()


tasks_config = {
Expand Down

0 comments on commit 735145f

Please sign in to comment.