From 72b474cf122e8dcf8ad44a4b4b9b2ac32f5edc5c Mon Sep 17 00:00:00 2001 From: Bert Verstraete Date: Wed, 4 Oct 2023 12:10:41 +0200 Subject: [PATCH] Workflows: dynamic task fixes (#1395) Co-authored-by: Dmytro Donukis Co-authored-by: Dmytro Donukis <110604171+ddonukis@users.noreply.github.com> --- CHANGELOG.md | 4 + cognite/client/_api/workflows.py | 2 +- cognite/client/_version.py | 2 +- cognite/client/data_classes/workflows.py | 143 ++- pyproject.toml | 2 +- .../data/workflow_execution.json | 987 ++++++++++++++++++ .../test_data_classes/test_workflows.py | 136 ++- 7 files changed, 1211 insertions(+), 65 deletions(-) create mode 100644 tests/tests_unit/test_data_classes/data/workflow_execution.json diff --git a/CHANGELOG.md b/CHANGELOG.md index aa6ac587fc..9dbac261ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [6.28.5] - 2023-10-03 +### Fixed +- Bugfix for serialization of Workflows' `DynamicTasksParameters` during `workflows.versions.upsert` and `workflows.execution.retrieve_detailed` + ## [6.28.4] - 2023-10-03 ### Fixed - Overload data_set/create for improved type safety diff --git a/cognite/client/_api/workflows.py b/cognite/client/_api/workflows.py index bd3f8712eb..db0c81dfee 100644 --- a/cognite/client/_api/workflows.py +++ b/cognite/client/_api/workflows.py @@ -279,7 +279,7 @@ def upsert(self, version: WorkflowVersionUpsert, mode: Literal["replace"] = "rep >>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import WorkflowVersionUpsert, WorkflowDefinitionUpsert, WorkflowTask, FunctionTaskParameters >>> c = CogniteClient() - >>> new_version =WorkflowVersionUpsert( + >>> new_version = WorkflowVersionUpsert( ... workflow_external_id="my_workflow", ... version="1", ... workflow_definition=WorkflowDefinitionUpsert( diff --git a/cognite/client/_version.py b/cognite/client/_version.py index 22dd31b2e6..4bf19bdfa6 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "6.28.4" +__version__ = "6.28.5" __api_subversion__ = "V20220125" diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index c63e0ca95e..8953fbbb60 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -94,7 +94,7 @@ def as_external_ids(self) -> list[str]: class WorkflowTaskParameters(CogniteResource, ABC): - task_type: ClassVar[str] + task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic"]] @classmethod def load_parameters(cls, data: dict) -> WorkflowTaskParameters: @@ -124,7 +124,7 @@ class FunctionTaskParameters(WorkflowTaskParameters): Args: external_id (str): The external ID of the function to be called. data (dict | str | None): The data to be passed to the function. Defaults to None. The data can be used to specify the input to the function from previous tasks or the workflow input. See the tip below for more information. - is_async_complete (bool): Whether the function is asynchronous. Defaults to False. + is_async_complete (bool | None): Whether the function is asynchronous. Defaults to None, which the API will interpret as False. If a function is asynchronous, you need to call the client.workflows.tasks.update() endpoint to update the status of the task. While synchronous tasks update the status automatically. @@ -136,6 +136,7 @@ class FunctionTaskParameters(WorkflowTaskParameters): - `${workflow.input}`: The workflow input. - `${.output}`: The output of the task with the given external id. - `${.input}`: The input of the task with the given external id. + - `${.input.someKey}`: A specific key within the input of the task with the given external id. For example, if you have a workflow containing two tasks, and the external_id of the first task is `task1` then, you can specify the data for the second task as follows: @@ -154,13 +155,13 @@ class FunctionTaskParameters(WorkflowTaskParameters): ... ) """ - task_type: ClassVar[str] = "function" + task_type = "function" def __init__( self, external_id: str, data: dict | str | None = None, - is_async_complete: bool = False, + is_async_complete: bool | None = None, ) -> None: self.external_id = external_id self.data = data @@ -174,8 +175,7 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None return cls( external_id=function["externalId"], data=function.get("data"), - # Allow default to come from the API. - is_async_complete=resource.get("isAsyncComplete"), # type: ignore[arg-type] + is_async_complete=resource.get("isAsyncComplete") or resource.get("asyncComplete"), ) def dump(self, camel_case: bool = False) -> dict[str, Any]: @@ -187,8 +187,9 @@ def dump(self, camel_case: bool = False) -> dict[str, Any]: output: dict[str, Any] = { "function": function, - "isAsyncComplete": self.is_async_complete, } + if self.is_async_complete is not None: + output[("isAsyncComplete" if camel_case else "is_async_complete")] = self.is_async_complete return output @@ -201,7 +202,7 @@ class TransformationTaskParameters(WorkflowTaskParameters): """ - task_type: ClassVar[str] = "transformation" + task_type = "transformation" def __init__(self, external_id: str) -> None: self.external_id = external_id @@ -244,7 +245,7 @@ class CDFTaskParameters(WorkflowTaskParameters): """ - task_type: ClassVar[str] = "cdf" + task_type = "cdf" def __init__( self, @@ -272,7 +273,7 @@ def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient | def dump(self, camel_case: bool = False) -> dict[str, Any]: output = super().dump(camel_case) return { - ("cdfRequest" if camel_case else "cdfRequest"): output, + ("cdfRequest" if camel_case else "cdf_request"): output, } @@ -280,21 +281,51 @@ class DynamicTaskParameters(WorkflowTaskParameters): """ The dynamic task parameters are used to specify a dynamic task. - When the tasks and their order of execution are determined at runtime, we use dynamic tasks. It takes the tasks parameter, - which is an array of function, transformation, and cdf task definitions. - This array should then be generated and returned by a previous step in the workflow, for instance, + When the tasks and their order of execution are determined at runtime, we use dynamic tasks. It takes the tasks parameter which is a Reference to + an array of function, transformation, and cdf task definitions. This array should be generated and returned by a previous step in the workflow, for instance, a Cognite Function task. - Args: - dynamic (list[WorkflowTask] | str): The dynamic task to be called. The dynamic task is a string that is evaluated - during the workflow's execution. + Tip: + You can reference data from other tasks or the workflow. You do this by following the format + `${prefix.jsonPath}` in the expression. Some valid option are: + + - `${workflow.input}`: The workflow input. + - `${.output}`: The output of the task with the given external id. + - `${.input}`: The input of the task with the given external id. + - `${.input.someKey}`: A specific key within the input of the task with the given external id. + Args: + tasks (list[WorkflowTask] | str): The tasks to be dynamically executed. The dynamic task is a string that is evaluated + during the workflow's execution. When calling Version Upsert, the tasks parameter must be a Reference string. + When calling Execution details, the tasks parameter will be a list of WorkflowTask objects. """ - task_type: ClassVar[str] = "dynamic" + task_type = "dynamic" + + def __init__(self, tasks: list[WorkflowTask] | str) -> None: + self.tasks = tasks + + @classmethod + def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient | None = None) -> Self: + resource = json.loads(resource) if isinstance(resource, str) else resource + + dynamic: dict[str, Any] = resource[cls.task_type] - def __init__(self, dynamic: list[WorkflowTask] | str) -> None: - self.dynamic = dynamic + # can either be a reference string (i.e., in case of WorkflowDefinitions) + if isinstance(dynamic["tasks"], str): + return cls(dynamic["tasks"]) + + # or can be resolved to a list of Tasks (i.e., during or after execution) + return cls( + [WorkflowTask._load(task) for task in dynamic["tasks"]], + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return { + self.task_type: { + "tasks": self.tasks if isinstance(self.tasks, str) else [task.dump(camel_case) for task in self.tasks] + } + } class WorkflowTask(CogniteResource): @@ -331,6 +362,10 @@ def __init__( self.timeout = timeout self.depends_on = depends_on + @property + def type(self) -> Literal["function", "transformation", "cdf", "dynamic"]: + return self.parameters.task_type + @classmethod def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowTask: resource = json.loads(resource) if isinstance(resource, str) else resource @@ -342,15 +377,13 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None # Allow default to come from the API. retries=resource.get("retries"), # type: ignore[arg-type] timeout=resource.get("timeout"), # type: ignore[arg-type] - depends_on=[dep["externalId"] for dep in resource.get("depends_on", [])] or None, + depends_on=[dep["externalId"] for dep in resource.get("dependsOn", [])] or None, ) def dump(self, camel_case: bool = False) -> dict[str, Any]: - type_ = self.parameters.task_type - output: dict[str, Any] = { ("externalId" if camel_case else "external_id"): self.external_id, - "type": type_, + "type": self.type, "parameters": self.parameters.dump(camel_case), "retries": self.retries, "timeout": self.timeout, @@ -418,12 +451,9 @@ def load(cls, data: dict[str, Any]) -> FunctionTaskOutput: def dump(self, camel_case: bool = False) -> dict[str, Any]: return { - "output": { - ("callId" if camel_case else "call_id"): self.call_id, - ("functionId" if camel_case else "function_id"): self.function_id, - "response": self.response, - }, - ("taskType" if camel_case else "task_type"): self.task_type, + ("callId" if camel_case else "call_id"): self.call_id, + ("functionId" if camel_case else "function_id"): self.function_id, + "response": self.response, } @@ -446,10 +476,7 @@ def load(cls, data: dict[str, Any]) -> TransformationTaskOutput: return cls(output["jobId"]) def dump(self, camel_case: bool = False) -> dict[str, Any]: - return { - "output": {("jobId" if camel_case else "job_id"): self.job_id}, - ("taskType" if camel_case else "task_type"): self.task_type, - } + return {("jobId" if camel_case else "job_id"): self.job_id} class CDFTaskOutput(WorkflowTaskOutput): @@ -474,37 +501,27 @@ def load(cls, data: dict[str, Any]) -> CDFTaskOutput: def dump(self, camel_case: bool = False) -> dict[str, Any]: return { - "output": { - "response": self.response, - ("statusCode" if camel_case else "status_code"): self.status_code, - }, - ("taskType" if camel_case else "task_type"): self.task_type, + "response": self.response, + ("statusCode" if camel_case else "status_code"): self.status_code, } class DynamicTaskOutput(WorkflowTaskOutput): """ The dynamic task output is used to specify the output of a dynamic task. - - Args: - dynamic_tasks (list[WorkflowTask]): The dynamic tasks to be created on the fly. """ task_type: ClassVar[str] = "dynamic" - def __init__(self, dynamic_tasks: list[WorkflowTask]) -> None: - self.dynamic_tasks = dynamic_tasks + def __init__(self) -> None: + ... @classmethod def load(cls, data: dict[str, Any]) -> DynamicTaskOutput: - output = data["output"] - return cls([WorkflowTask._load(task) for task in output["dynamicTasks"]]) + return cls() def dump(self, camel_case: bool = False) -> dict[str, Any]: - return { - "output": {"dynamicTasks": [task.dump(camel_case) for task in self.dynamic_tasks]}, - ("taskType" if camel_case else "task_type"): self.task_type, - } + return {} class WorkflowTaskExecution(CogniteResource): @@ -521,7 +538,6 @@ class WorkflowTaskExecution(CogniteResource): start_time (int | None): The start time of the task execution. Unix timestamp in milliseconds. Defaults to None. end_time (int | None): The end time of the task execution. Unix timestamp in milliseconds. Defaults to None. reason_for_incompletion (str | None): Provides the reason if the workflow did not complete successfully. Defaults to None. - """ def __init__( @@ -546,6 +562,10 @@ def __init__( self.end_time = end_time self.reason_for_incompletion = reason_for_incompletion + @property + def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic"]: + return self.input.task_type + @classmethod def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowTaskExecution: resource = json.loads(resource) if isinstance(resource, str) else resource @@ -563,8 +583,15 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None def dump(self, camel_case: bool = False) -> dict[str, Any]: output: dict[str, Any] = super().dump(camel_case) - task_type_key = "taskType" if camel_case else "task_type" - output[task_type_key] = self.output.task_type + output["input"] = self.input.dump(camel_case) + output["status"] = self.status.upper() + output[("taskType" if camel_case else "task_type")] = self.task_type + # API uses isAsyncComplete and asyncComplete inconsistently: + if self.task_type == "function": + if (is_async_complete := output["input"].get("isAsyncComplete")) is not None: + output["input"]["asyncComplete"] = is_async_complete + del output["input"]["isAsyncComplete"] + output["output"] = self.output.dump(camel_case) return output @@ -814,6 +841,8 @@ class WorkflowExecutionDetailed(WorkflowExecution): start_time (int | None): The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None. end_time (int | None): The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None. reason_for_incompletion (str | None): Provides the reason if the workflow did not complete successfully. Defaults to None. + input (dict | None): Input arguments the workflow was triggered with. + metadata (dict | None): Metadata set when the workflow was triggered. """ def __init__( @@ -828,12 +857,16 @@ def __init__( start_time: int | None = None, end_time: int | None = None, reason_for_incompletion: str | None = None, + input: dict | None = None, + metadata: dict | None = None, ) -> None: super().__init__( id, workflow_external_id, status, created_time, version, start_time, end_time, reason_for_incompletion ) self.workflow_definition = workflow_definition self.executed_tasks = executed_tasks + self.input = input + self.metadata = metadata @classmethod def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowExecutionDetailed: @@ -852,6 +885,8 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None reason_for_incompletion=resource.get("reasonForIncompletion"), workflow_definition=WorkflowDefinition._load(resource["workflowDefinition"]), executed_tasks=[WorkflowTaskExecution._load(task) for task in resource["executedTasks"]], + input=resource.get("input"), + metadata=resource.get("metadata"), ) def dump(self, camel_case: bool = False) -> dict[str, Any]: @@ -862,6 +897,10 @@ def dump(self, camel_case: bool = False) -> dict[str, Any]: output[("executedTasks" if camel_case else "executed_tasks")] = [ task.dump(camel_case) for task in self.executed_tasks ] + if self.input: + output["input"] = self.input + if self.metadata: + output["metadata"] = self.metadata return output def as_execution(self) -> WorkflowExecution: diff --git a/pyproject.toml b/pyproject.toml index 4ddf92634f..fa894dda4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "6.28.4" +version = "6.28.5" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/tests/tests_unit/test_data_classes/data/workflow_execution.json b/tests/tests_unit/test_data_classes/data/workflow_execution.json new file mode 100644 index 0000000000..523ebe9015 --- /dev/null +++ b/tests/tests_unit/test_data_classes/data/workflow_execution.json @@ -0,0 +1,987 @@ +{ + "id": "7b6bf517-4812-4874-b227-fa7db36830a3", + "workflowExternalId": "TestWorkflowTypeBidProcess", + "workflowDefinition": { + "hash": "8AE17296EE6BCCD0B7D9C184E100A5F98069553C", + "tasks": [ + { + "externalId": "testTaskDispatcher", + "type": "function", + "parameters": { + "function": { + "externalId": "bid_process_task_dispatcher", + "data": { + "workflowType": "TestWorkflowType", + "applicationVersion": "123456", + "testProcessEventExternalId": "${workflow.input.triggerEvent.externalId}" + } + } + }, + "retries": 2, + "timeout": 300, + "dependsOn": [] + }, + { + "externalId": "applicationExecution", + "type": "dynamic", + "description": "Run a collection of preprocessor and app runs concurrently", + "parameters": { + "dynamic": { + "tasks": "${testTaskDispatcher.output.response.testTasks}" + } + }, + "retries": 0, + "timeout": 3600, + "dependsOn": [ + { + "externalId": "testTaskDispatcher" + } + ] + }, + { + "externalId": "testMatrixCalculation", + "type": "dynamic", + "description": "Run a collection of test matrix calcs concurrently.", + "parameters": { + "dynamic": { + "tasks": "${testTaskDispatcher.output.response.partialBidMatrixTasks}" + } + }, + "retries": 0, + "timeout": 3600, + "dependsOn": [ + { + "externalId": "testTaskDispatcher" + }, + { + "externalId": "applicationExecution" + } + ] + }, + { + "externalId": "totalTestMatrixCalculation", + "type": "function", + "parameters": { + "function": { + "externalId": "${testTaskDispatcher.output.response.totalTestMatrix.functionExternalId}", + "data": "${testTaskDispatcher.output.response.totalTestMatrix.data}" + } + }, + "retries": 2, + "timeout": 300, + "dependsOn": [ + { + "externalId": "testMatrixCalculation" + } + ] + } + ] + }, + "version": "latest", + "status": "COMPLETED", + "engineExecutionId": "4afb51bd-2015-4630-b567-fd3b46523e84", + "executedTasks": [ + { + "id": "38b3e696-adcb-4bf8-9217-747449f55289", + "externalId": "testTaskDispatcher", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "applicationVersion": "123456", + "workflowType": "TestWorkflowType", + "testProcessEventExternalId": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "bid_process_task_dispatcher" + } + }, + "output": { + "callId": 1453249902969082, + "functionId": 8863311186661323, + "response": { + "testTasks": [ + { + "externalId": "preprocessor_b0b3d8e1-e3a2-4529-a178-24b2b430886c", + "parameters": { + "function": { + "data": { + "datasetId": 4272372867810023, + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.190902Z_112945" + }, + "externalId": "preprocessor" + }, + "isAsyncComplete": false + }, + "retries": 2, + "timeout": 600, + "type": "function" + }, + { + "dependsOn": [ + { + "externalId": "preprocessor_b0b3d8e1-e3a2-4529-a178-24b2b430886c" + } + ], + "externalId": "trigger-2_dcc1e79d-bb6b-4f0d-8e84-7846479bc677", + "parameters": { + "function": { + "data": { + "applicationVersion": "123456", + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.190902Z_112945" + }, + "externalId": "trigger-2" + }, + "isAsyncComplete": true + }, + "retries": 1, + "timeout": 3600, + "type": "function" + }, + { + "externalId": "preprocessor-1_1513aeee-04cf-43af-838c-c2e4323b3638", + "parameters": { + "function": { + "data": { + "datasetId": 4272372867810023, + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.191236Z_740acd" + }, + "externalId": "preprocessor-1" + }, + "isAsyncComplete": false + }, + "retries": 2, + "timeout": 600, + "type": "function" + }, + { + "dependsOn": [ + { + "externalId": "preprocessor-1_1513aeee-04cf-43af-838c-c2e4323b3638" + } + ], + "externalId": "trigger_3ace0bf8-9dbc-44cc-a937-cd1b775c02fc", + "parameters": { + "function": { + "data": { + "applicationVersion": "123456", + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.191236Z_740acd" + }, + "externalId": "trigger" + }, + "isAsyncComplete": true + }, + "retries": 1, + "timeout": 3600, + "type": "function" + } + ], + "partialBidMatrixTasks": [ + { + "externalId": "generate_test_matrix_03d5de1e-01a7-4de9-9f70-302b85d682a3", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_1a1896c2-c101-41b7-a818-01985fd692aa_Holen", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_77e0ba96-9d52-4fc3-b2e2-63e12d581c3b", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_dc0d195d-4cfc-4f0f-a9c6-76f9eafa374b_Lund", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_df17612f-cdca-479b-9589-a348d6dfaf5b", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_a3319f4d-58be-439d-931a-42d5eadc3b06_Lien_krv", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_56342599-0e72-4ff0-94f8-7bd41cf6cda8", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_4169d9ec-8bd2-484f-84a7-02127c775478_Landet", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_d1e3ba33-2c1d-475c-afeb-19ff8c907b11", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_75647fd5-9858-48da-9e41-a14c5190bd80_Dalby", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_34e9b702-11c8-4736-8584-44bf73e49eae", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_79e59dc0-e901-474a-9a57-8c89c592d4ee_Rull1", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_8c11940d-ef66-4eb3-afe8-a3923e9e1625", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_8ecf77d2-1702-4eb4-a86e-395d87801a69_Rull2", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_acf2518f-7464-4ac8-bc33-601b2e2a6200", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_068c9f10-c8d9-4e6c-82aa-839b5e6dd1df_Scott", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_cabbe8cb-ff28-4e09-8fa0-e518e507f00c", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_5eb65417-bfd8-4702-9579-fda7d1e73bcb_Strand_krv", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + } + ], + "totalTestMatrix": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_ff877fea-65fa-40f6-b536-7bc2c79bb4dc", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "functionExternalId": "calculate_total_test_matrix" + } + } + }, + "startTime": 1696240548031, + "endTime": 1696240580027 + }, + { + "id": "e9ca204f-2031-46f4-9567-162a54f5eb38", + "externalId": "applicationExecution", + "status": "COMPLETED", + "taskType": "dynamic", + "input": { + "dynamic": { + "tasks": [ + { + "externalId": "preprocessor_b0b3d8e1-e3a2-4529-a178-24b2b430886c", + "parameters": { + "function": { + "data": { + "datasetId": 4272372867810023, + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.190902Z_112945" + }, + "externalId": "preprocessor" + }, + "isAsyncComplete": false + }, + "retries": 2, + "timeout": 600, + "type": "function" + }, + { + "dependsOn": [ + { + "externalId": "preprocessor_b0b3d8e1-e3a2-4529-a178-24b2b430886c" + } + ], + "externalId": "trigger-2_dcc1e79d-bb6b-4f0d-8e84-7846479bc677", + "parameters": { + "function": { + "data": { + "applicationVersion": "123456", + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.190902Z_112945" + }, + "externalId": "trigger-2" + }, + "isAsyncComplete": true + }, + "retries": 1, + "timeout": 3600, + "type": "function" + }, + { + "externalId": "preprocessor-1_1513aeee-04cf-43af-838c-c2e4323b3638", + "parameters": { + "function": { + "data": { + "datasetId": 4272372867810023, + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.191236Z_740acd" + }, + "externalId": "preprocessor-1" + }, + "isAsyncComplete": false + }, + "retries": 2, + "timeout": 600, + "type": "function" + }, + { + "dependsOn": [ + { + "externalId": "preprocessor-1_1513aeee-04cf-43af-838c-c2e4323b3638" + } + ], + "externalId": "trigger_3ace0bf8-9dbc-44cc-a937-cd1b775c02fc", + "parameters": { + "function": { + "data": { + "applicationVersion": "123456", + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.191236Z_740acd" + }, + "externalId": "trigger" + }, + "isAsyncComplete": true + }, + "retries": 1, + "timeout": 3600, + "type": "function" + } + ] + } + }, + "output": {}, + "startTime": 1696240580101, + "endTime": 1696240580149 + }, + { + "id": "475d03b3-6e6c-44ad-b8bb-534e1e5560c4", + "externalId": "testMatrixCalculation", + "status": "COMPLETED", + "taskType": "dynamic", + "input": { + "dynamic": { + "tasks": [ + { + "externalId": "generate_test_matrix_03d5de1e-01a7-4de9-9f70-302b85d682a3", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_1a1896c2-c101-41b7-a818-01985fd692aa_Holen", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_77e0ba96-9d52-4fc3-b2e2-63e12d581c3b", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_dc0d195d-4cfc-4f0f-a9c6-76f9eafa374b_Lund", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_df17612f-cdca-479b-9589-a348d6dfaf5b", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_a3319f4d-58be-439d-931a-42d5eadc3b06_Lien_krv", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_56342599-0e72-4ff0-94f8-7bd41cf6cda8", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_4169d9ec-8bd2-484f-84a7-02127c775478_Landet", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_d1e3ba33-2c1d-475c-afeb-19ff8c907b11", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_75647fd5-9858-48da-9e41-a14c5190bd80_Dalby", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_34e9b702-11c8-4736-8584-44bf73e49eae", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_79e59dc0-e901-474a-9a57-8c89c592d4ee_Rull1", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_8c11940d-ef66-4eb3-afe8-a3923e9e1625", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_8ecf77d2-1702-4eb4-a86e-395d87801a69_Rull2", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_acf2518f-7464-4ac8-bc33-601b2e2a6200", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_068c9f10-c8d9-4e6c-82aa-839b5e6dd1df_Scott", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + }, + { + "externalId": "generate_test_matrix_cabbe8cb-ff28-4e09-8fa0-e518e507f00c", + "parameters": { + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_5eb65417-bfd8-4702-9579-fda7d1e73bcb_Strand_krv", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + }, + "isAsyncComplete": false + }, + "retries": 1, + "timeout": 600, + "type": "function" + } + ] + } + }, + "output": {}, + "startTime": 1696240747163, + "endTime": 1696240747181 + }, + { + "id": "3f41e58f-cf5d-4391-b276-4698384918fc", + "externalId": "totalTestMatrixCalculation", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_ff877fea-65fa-40f6-b536-7bc2c79bb4dc", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "calculate_total_test_matrix" + } + }, + "output": { + "callId": 6958901858387174, + "functionId": 3328142046480504, + "response": { + "asset_ids": [ + 958879706384559 + ] + } + }, + "startTime": 1696240819513, + "endTime": 1696240833889 + }, + { + "id": "f7a73a85-73df-4a84-bcd2-3a6a05b48673", + "externalId": "preprocessor_b0b3d8e1-e3a2-4529-a178-24b2b430886c", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.190902Z_112945", + "datasetId": 4272372867810023 + }, + "externalId": "preprocessor" + } + }, + "output": { + "callId": 7668632761093605, + "functionId": 2468751184837488, + "response": { + "cog_shop_case_file": { + "data_set_id": 4272372867810023, + "external_id": "cog_preprocessor/cogShop/Case_2023-10-02T09:56:18.190902Z_112945/65620938-de5b-4f58-98f7-16ee8c0476a6", + "id": 2595300370727380, + "name": "cogShop-Case_2023-10-02T09:56:18.190902Z_112945-case.yaml" + }, + "cog_shop_file_list": [ + { + "external_id": "SHOP_Fornebu_water_value_cut_file_reservoir_mapping", + "file_type": "ascii" + }, + { + "external_id": "SHOP_Fornebu_water_value_cut_file", + "file_type": "ascii" + } + ], + "shop_version": "14.4.3.0" + } + }, + "startTime": 1696240580647, + "endTime": 1696240604356 + }, + { + "id": "9d65f598-5cef-4476-b6be-cc04bb832163", + "externalId": "preprocessor-1_1513aeee-04cf-43af-838c-c2e4323b3638", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.191236Z_740acd", + "datasetId": 4272372867810023 + }, + "externalId": "preprocessor-1" + } + }, + "output": { + "callId": 1952977430849976, + "functionId": 162527280303730, + "response": { + "cog_shop_case_file": { + "data_set_id": 4272372867810023, + "external_id": "cog_preprocessor/cogShop/Case_2023-10-02T09:56:18.191236Z_740acd/68978016-aac4-461f-ae8b-e79451889b4a", + "id": 2718311250375720, + "name": "cogShop-Case_2023-10-02T09:56:18.191236Z_740acd-case.yaml" + }, + "cog_shop_file_list": [ + { + "external_id": "SHOP_Fornebu_water_value_cut_file_reservoir_mapping", + "file_type": "ascii" + }, + { + "external_id": "SHOP_Fornebu_water_value_cut_file", + "file_type": "ascii" + } + ], + "shop_version": "14.4.3.0" + } + }, + "startTime": 1696240580725, + "endTime": 1696240609457 + }, + { + "id": "f69f7b1c-6aea-4213-b5d0-deb2fbabe5a0", + "externalId": "trigger-2_dcc1e79d-bb6b-4f0d-8e84-7846479bc677", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": true, + "function": { + "data": { + "applicationVersion": "123456", + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.190902Z_112945" + }, + "externalId": "trigger-2" + } + }, + "output": { + "callId": 3675281144107549, + "functionId": 5835690711631694, + "response": { + "cogniteOrchestrationTaskId": "f69f7b1c-6aea-4213-b5d0-deb2fbabe5a0", + "requestBody": { + "applicationVersion": "123456", + "cogniteOrchestrationTaskId": "f69f7b1c-6aea-4213-b5d0-deb2fbabe5a0", + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.190902Z_112945" + }, + "requestUrl": "https://power-ops-api.staging.bluefield.cognite.ai/power-ops-staging/run-shop", + "responseBody": { + "message": "Shop run triggered" + }, + "responseStatus": 200 + } + }, + "startTime": 1696240605108, + "endTime": 1696240734843 + }, + { + "id": "eb240dbc-0062-4314-b4d6-fb51442d02e7", + "externalId": "trigger_3ace0bf8-9dbc-44cc-a937-cd1b775c02fc", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": true, + "function": { + "data": { + "applicationVersion": "123456", + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.191236Z_740acd" + }, + "externalId": "trigger" + } + }, + "output": { + "callId": 2967264911514077, + "functionId": 7745404375267235, + "response": { + "cogniteOrchestrationTaskId": "eb240dbc-0062-4314-b4d6-fb51442d02e7", + "requestBody": { + "applicationVersion": "123456", + "cogniteOrchestrationTaskId": "eb240dbc-0062-4314-b4d6-fb51442d02e7", + "testEventExternalId": "TEST_RUN_2023-10-02T09:56:18.191236Z_740acd" + }, + "requestUrl": "https://power-ops-api.staging.bluefield.cognite.ai/power-ops-staging/run-shop", + "responseBody": { + "message": "Shop run triggered" + }, + "responseStatus": 200 + } + }, + "startTime": 1696240610180, + "endTime": 1696240739918 + }, + { + "id": "f7ced93f-e686-4a15-9676-5b32b1f4e52c", + "externalId": "generate_test_matrix_03d5de1e-01a7-4de9-9f70-302b85d682a3", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_1a1896c2-c101-41b7-a818-01985fd692aa_Holen", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 8805488770050088, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240747371, + "endTime": 1696240795785 + }, + { + "id": "e326b41e-302a-4f29-847b-f0eb76fcde59", + "externalId": "generate_test_matrix_77e0ba96-9d52-4fc3-b2e2-63e12d581c3b", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_dc0d195d-4cfc-4f0f-a9c6-76f9eafa374b_Lund", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 5300981250266599, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240747711, + "endTime": 1696240794709 + }, + { + "id": "dc9a55a8-eeaf-4beb-9842-8767de7385ec", + "externalId": "generate_test_matrix_df17612f-cdca-479b-9589-a348d6dfaf5b", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_a3319f4d-58be-439d-931a-42d5eadc3b06_Lien_krv", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 4673999868714216, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240749725, + "endTime": 1696240799814 + }, + { + "id": "3ee0ed1f-dd5c-4334-a239-9e076c68230e", + "externalId": "generate_test_matrix_56342599-0e72-4ff0-94f8-7bd41cf6cda8", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_4169d9ec-8bd2-484f-84a7-02127c775478_Landet", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 1736067993713132, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240750389, + "endTime": 1696240800688 + }, + { + "id": "185a8630-0a60-4adc-b0bb-4fa27fd915df", + "externalId": "generate_test_matrix_d1e3ba33-2c1d-475c-afeb-19ff8c907b11", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_75647fd5-9858-48da-9e41-a14c5190bd80_Dalby", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 3044388486346549, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240750742, + "endTime": 1696240803588 + }, + { + "id": "51bcfd5e-3ba7-41a2-bc27-34eb9d18e8d0", + "externalId": "generate_test_matrix_34e9b702-11c8-4736-8584-44bf73e49eae", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_79e59dc0-e901-474a-9a57-8c89c592d4ee_Rull1", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 5421875616701543, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240751407, + "endTime": 1696240805996 + }, + { + "id": "11653aa1-391c-4efa-93be-13b0e05f9e9f", + "externalId": "generate_test_matrix_8c11940d-ef66-4eb3-afe8-a3923e9e1625", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_8ecf77d2-1702-4eb4-a86e-395d87801a69_Rull2", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 244237303353736, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240751758, + "endTime": 1696240808009 + }, + { + "id": "c17875c7-6f8f-4ada-84ef-43b4191428aa", + "externalId": "generate_test_matrix_acf2518f-7464-4ac8-bc33-601b2e2a6200", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_068c9f10-c8d9-4e6c-82aa-839b5e6dd1df_Scott", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 8400742455854509, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240752429, + "endTime": 1696240810695 + }, + { + "id": "2c3f97df-12a7-4d9e-a488-b3e2400d2434", + "externalId": "generate_test_matrix_cabbe8cb-ff28-4e09-8fa0-e518e507f00c", + "status": "COMPLETED", + "taskType": "function", + "input": { + "asyncComplete": false, + "function": { + "data": { + "event_external_id": "TEST_FUNCTION_CALL_5eb65417-bfd8-4702-9579-fda7d1e73bcb_Strand_krv", + "workflow_event_external_id": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "externalId": "generate_test_matrix" + } + }, + "output": { + "callId": 2097899365071295, + "functionId": 2774602009950522, + "response": null + }, + "startTime": 1696240752781, + "endTime": 1696240812630 + } + ], + "input": { + "triggerEvent": { + "externalId": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26" + }, + "version": "latest", + "snake_case_lets_go": "yes" + }, + "createdTime": 1696240547972, + "startTime": 1696240547886, + "endTime": 1696240836564, + "metadata": { + "supervisor": "Jimmy", + "best_number": 42 + } +} diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index 479e30db97..46de26abe0 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -1,3 +1,5 @@ +import json +from pathlib import Path from typing import Any import pytest @@ -5,9 +7,11 @@ from cognite.client.data_classes.workflows import ( CDFTaskOutput, DynamicTaskOutput, + DynamicTaskParameters, FunctionTaskOutput, FunctionTaskParameters, TransformationTaskOutput, + WorkflowExecutionDetailed, WorkflowIds, WorkflowTask, WorkflowTaskOutput, @@ -17,20 +21,19 @@ class TestWorkflowTaskOutput: @pytest.mark.parametrize( - "output", + ["output", "expected"], [ - FunctionTaskOutput(call_id=123, function_id=3456, response={"test": 1}), - DynamicTaskOutput( - dynamic_tasks=[ - WorkflowTask(external_id="abc", name="abc", parameters=FunctionTaskParameters(external_id="def")) - ] + ( + FunctionTaskOutput(call_id=123, function_id=3456, response={"test": 1}), + {"callId": 123, "functionId": 3456, "response": {"test": 1}}, ), - CDFTaskOutput(response={"test": 1}, status_code=200), - TransformationTaskOutput(job_id=789), + (DynamicTaskOutput(), {}), + (CDFTaskOutput(response={"test": 1}, status_code=200), {"response": {"test": 1}, "statusCode": 200}), + (TransformationTaskOutput(job_id=789), {"jobId": 789}), ], ) - def test_serialization(self, output: WorkflowTaskOutput): - assert WorkflowTaskOutput.load_output(output.dump(camel_case=True)).dump() == output.dump() + def test_serialization(self, output: WorkflowTaskOutput, expected: dict): + assert output.dump(camel_case=True) == expected class TestWorkflowId: @@ -71,3 +74,116 @@ class TestWorkflowIds: ) def test_load(self, resource: Any, expected: WorkflowIds): assert WorkflowIds._load(resource) == expected + + +class TestWorkflowExecutionDetailed: + @pytest.fixture(scope="class") + def execution_data(self) -> dict: + test_data = Path(__file__).parent / "data/workflow_execution.json" + with test_data.open() as f: + return json.load(f) + + def test_dump(self, execution_data: dict): + wf_execution = WorkflowExecutionDetailed._load(execution_data) + dumped = wf_execution.dump(camel_case=False) + dumped_camel = wf_execution.dump(camel_case=True) + assert dumped["metadata"] == {"supervisor": "Jimmy", "best_number": 42} + assert dumped["input"] == { + "triggerEvent": { + "externalId": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26", + }, + "version": "latest", + "snake_case_lets_go": "yes", + } + assert dumped["metadata"] == dumped_camel["metadata"] + assert dumped["input"] == dumped_camel["input"] + + def test_load(self, execution_data: dict): + wf_execution = WorkflowExecutionDetailed._load(execution_data) + assert wf_execution.id == "7b6bf517-4812-4874-b227-fa7db36830a3" + assert wf_execution.workflow_external_id == "TestWorkflowTypeBidProcess" + assert wf_execution.version == "latest" + assert wf_execution.status == "completed" + assert wf_execution.created_time == 1696240547972 + assert wf_execution.start_time == 1696240547886 + assert wf_execution.end_time == 1696240836564 + assert wf_execution.metadata == {"supervisor": "Jimmy", "best_number": 42} + assert wf_execution.input == { + "triggerEvent": { + "externalId": "TEST_test_7ca14a56-c807-4bd6-b287-64936078ef26", + }, + "version": "latest", + "snake_case_lets_go": "yes", + } + + def test_definition_parsed_correctly(self, execution_data: dict): + wf_execution = WorkflowExecutionDetailed._load(execution_data) + assert wf_execution.workflow_definition.hash_ == "8AE17296EE6BCCD0B7D9C184E100A5F98069553C" + + expected = [ + WorkflowTask( + external_id="testTaskDispatcher", + parameters=FunctionTaskParameters( + external_id="bid_process_task_dispatcher", + data={ + "workflowType": "TestWorkflowType", + "applicationVersion": "123456", + "testProcessEventExternalId": "${workflow.input.triggerEvent.externalId}", + }, + ), + retries=2, + timeout=300, + ), + WorkflowTask( + external_id="applicationExecution", + description="Run a collection of preprocessor and app runs concurrently", + parameters=DynamicTaskParameters(tasks="${testTaskDispatcher.output.response.testTasks}"), + retries=0, + timeout=3600, + depends_on=["testTaskDispatcher"], + ), + ] + assert len(wf_execution.workflow_definition.tasks) == 4 + + for expected_task, actual_task in zip(expected, wf_execution.workflow_definition.tasks): + assert actual_task.external_id == expected_task.external_id + assert actual_task.type == expected_task.type + assert actual_task.parameters.dump() == expected_task.parameters.dump() + assert actual_task.retries == expected_task.retries + assert actual_task.timeout == expected_task.timeout + assert actual_task.depends_on == expected_task.depends_on + + def test_executed_tasks_parsed_correctly(self, execution_data: dict): + wf_execution = WorkflowExecutionDetailed._load(execution_data) + + expected = [ + ("38b3e696-adcb-4bf8-9217-747449f55289", "function", "completed", 1453249902969082), + ("e9ca204f-2031-46f4-9567-162a54f5eb38", "dynamic", "completed", None), + ("475d03b3-6e6c-44ad-b8bb-534e1e5560c4", "dynamic", "completed", None), + ("3f41e58f-cf5d-4391-b276-4698384918fc", "function", "completed", 6958901858387174), + ("f7a73a85-73df-4a84-bcd2-3a6a05b48673", "function", "completed", 7668632761093605), + ("9d65f598-5cef-4476-b6be-cc04bb832163", "function", "completed", 1952977430849976), + ("f69f7b1c-6aea-4213-b5d0-deb2fbabe5a0", "function", "completed", 3675281144107549), + ("eb240dbc-0062-4314-b4d6-fb51442d02e7", "function", "completed", 2967264911514077), + ("f7ced93f-e686-4a15-9676-5b32b1f4e52c", "function", "completed", 8805488770050088), + ("e326b41e-302a-4f29-847b-f0eb76fcde59", "function", "completed", 5300981250266599), + ("dc9a55a8-eeaf-4beb-9842-8767de7385ec", "function", "completed", 4673999868714216), + ("3ee0ed1f-dd5c-4334-a239-9e076c68230e", "function", "completed", 1736067993713132), + ("185a8630-0a60-4adc-b0bb-4fa27fd915df", "function", "completed", 3044388486346549), + ("51bcfd5e-3ba7-41a2-bc27-34eb9d18e8d0", "function", "completed", 5421875616701543), + ("11653aa1-391c-4efa-93be-13b0e05f9e9f", "function", "completed", 244237303353736), + ("c17875c7-6f8f-4ada-84ef-43b4191428aa", "function", "completed", 8400742455854509), + ("2c3f97df-12a7-4d9e-a488-b3e2400d2434", "function", "completed", 2097899365071295), + ] + + for (exp_id, exp_type, exp_status, exp_call_id), actual_task in zip(expected, wf_execution.executed_tasks): + assert actual_task.id == exp_id + assert actual_task.task_type == exp_type + assert actual_task.status == exp_status + if actual_task.task_type == "function": + assert actual_task.output.call_id == exp_call_id + if actual_task.task_type == "dynamic": + assert actual_task.output.dump() == {} + + if actual_task.id == "38b3e696-adcb-4bf8-9217-747449f55289": + assert actual_task.dump(camel_case=True) == execution_data["executedTasks"][0]