From 3ae83281a9c46b889e8a21b9a75ab185ddc9f6e3 Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Wed, 6 Dec 2023 13:54:37 +0100 Subject: [PATCH] add subworkflow tasks to workflows (#1445) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Anders Albert <60234212+doctrino@users.noreply.github.com> Co-authored-by: HÃ¥kon V. Treider --- CHANGELOG.md | 4 ++ cognite/client/_version.py | 2 +- cognite/client/data_classes/__init__.py | 2 + cognite/client/data_classes/workflows.py | 62 +++++++++++++++-- pyproject.toml | 2 +- .../test_api/test_workflows.py | 67 +++++++++++++++---- 6 files changed, 121 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3935bc970..ddfb7708f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ Changes are grouped as follows - `Removed` for now removed features. - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. + +## [7.5.3] - 2023-12-06 +### Added +- Support for `subworkflow` tasks in `workflows`. ## [7.5.2] - 2023-12-05 ### Fixed diff --git a/cognite/client/_version.py b/cognite/client/_version.py index 1787025460..1b44126097 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "7.5.2" +__version__ = "7.5.3" __api_subversion__ = "V20220125" diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index c8cf6bfc13..b361c777a0 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -214,6 +214,7 @@ DynamicTaskParameters, FunctionTaskOutput, FunctionTaskParameters, + SubworkflowTaskParameters, TransformationTaskOutput, TransformationTaskParameters, Workflow, @@ -413,6 +414,7 @@ "TransformationTaskParameters", "CDFTaskParameters", "DynamicTaskParameters", + "SubworkflowTaskParameters", "FunctionTaskOutput", "TransformationTaskOutput", "CDFTaskOutput", diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index f4332e0b9c..7ea2d53574 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -95,8 +95,11 @@ def as_external_ids(self) -> list[str]: return [workflow.external_id for workflow in self.data] +ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow"] + + class WorkflowTaskParameters(CogniteObject, ABC): - task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic"]] + task_type: ClassVar[ValidTaskType] @classmethod def load_parameters(cls, data: dict) -> WorkflowTaskParameters: @@ -116,8 +119,10 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: return CDFTaskParameters._load(parameters) elif type_ == "dynamic": return DynamicTaskParameters._load(parameters) + elif type_ == "subworkflow": + return SubworkflowTaskParameters._load(parameters) else: - raise ValueError(f"Unknown task type: {type_}. Expected 'function', 'transformation', 'cdf, or 'dynamic'") + raise ValueError(f"Unknown task type: {type_}. Expected {ValidTaskType}") class FunctionTaskParameters(WorkflowTaskParameters): @@ -288,6 +293,35 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } +class SubworkflowTaskParameters(WorkflowTaskParameters): + """ + The subworkflow task parameters are used to specify a subworkflow task. + + When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for conveniece. It takes the tasks parameter which is an array of + function, transformation, and cdf task definitions. This array needs to be statically set on the worklow definition (if it needs to be defined at runtime, use a + dynamic task). + + Args: + tasks (list[WorkflowTask]): The tasks belonging to the subworkflow. + """ + + task_type = "subworkflow" + + def __init__(self, tasks: list[WorkflowTask]) -> None: + self.tasks = tasks + + @classmethod + def _load(cls: type[Self], resource: dict, cognite_client: CogniteClient | None = None) -> Self: + subworkflow: dict[str, Any] = resource[cls.task_type] + + return cls( + [WorkflowTask._load(task) for task in subworkflow["tasks"]], + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {self.task_type: {"tasks": [task.dump(camel_case) for task in self.tasks]}} + + class DynamicTaskParameters(WorkflowTaskParameters): """ The dynamic task parameters are used to specify a dynamic task. @@ -377,7 +411,7 @@ def __init__( self.depends_on = depends_on @property - def type(self) -> Literal["function", "transformation", "cdf", "dynamic"]: + def type(self) -> ValidTaskType: return self.parameters.task_type @classmethod @@ -433,6 +467,8 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput: return CDFTaskOutput.load(data) elif task_type == "dynamic": return DynamicTaskOutput.load(data) + elif task_type == "subworkflow": + return SubworkflowTaskOutput.load(data) else: raise ValueError(f"Unknown task type: {task_type}") @@ -539,6 +575,24 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return {} +class SubworkflowTaskOutput(WorkflowTaskOutput): + """ + The subworkflow task output is used to specify the output of a subworkflow task. + """ + + task_type: ClassVar[str] = "subworkflow" + + def __init__(self) -> None: + ... + + @classmethod + def load(cls, data: dict[str, Any]) -> SubworkflowTaskOutput: + return cls() + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return {} + + class WorkflowTaskExecution(CogniteObject): """ This class represents a task execution. @@ -578,7 +632,7 @@ def __init__( self.reason_for_incompletion = reason_for_incompletion @property - def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic"]: + def task_type(self) -> ValidTaskType: return self.input.task_type @classmethod diff --git a/pyproject.toml b/pyproject.toml index 249781f1c7..986ba7d802 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "7.5.2" +version = "7.5.3" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index 0f1993e9fd..872b751a7f 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -9,6 +9,7 @@ from cognite.client.data_classes.workflows import ( CDFTaskParameters, FunctionTaskParameters, + SubworkflowTaskParameters, TransformationTaskParameters, Workflow, WorkflowDefinitionUpsert, @@ -70,13 +71,37 @@ def workflow_version_list(cognite_client: CogniteClient) -> WorkflowVersionList: workflow_definition=WorkflowDefinitionUpsert( tasks=[ WorkflowTask( - external_id=f"{workflow_id}-2-task1", + external_id="subworkflow1", + parameters=SubworkflowTaskParameters( + tasks=[ + WorkflowTask( + external_id="s1-task1", + parameters=CDFTaskParameters( + resource_path="/dummy/no/real/resource/path", + method="GET", + body={"limit": 1}, + ), + ), + WorkflowTask( + external_id="s1-task2", + parameters=CDFTaskParameters( + resource_path="/dummy/no/real/resource/path", + method="GET", + body={"limit": 1}, + ), + ), + ] + ), + ), + WorkflowTask( + external_id="task1", parameters=CDFTaskParameters( resource_path="/dummy/no/real/resource/path", method="GET", body={"limit": 1}, ), - ) + depends_on=["subworkflow1"], + ), ], description=None, ), @@ -122,7 +147,10 @@ def handle(client, data: dict): return output cognite_client.functions.create(name="Multiply", external_id=external_id, function_handle=handle) - pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True) + pytest.skip( + "Function need to be redeployed, skipping tests that need it", + allow_module_level=True, + ) @pytest.fixture @@ -171,7 +199,9 @@ def workflow_execution_list( return executions # Creating at least one execution result = cognite_client.workflows.executions.trigger( - add_multiply_workflow.workflow_external_id, add_multiply_workflow.version, {"a": 5, "b": 6} + add_multiply_workflow.workflow_external_id, + add_multiply_workflow.version, + {"a": 5, "b": 6}, ) t0 = time.time() while result.status != "completed": @@ -230,10 +260,17 @@ def test_upsert_delete(self, cognite_client: CogniteClient) -> None: workflow_definition=WorkflowDefinitionUpsert( tasks=[ WorkflowTask( - external_id="integration_test-workflow_definitions-test_create_delete-task1", - parameters=FunctionTaskParameters( - external_id="integration_test-workflow_definitions-test_create_delete-task1-function", - data={"a": 1, "b": 2}, + external_id="integration_test-workflow_definitions-test_create_delete-subworkflow1", + parameters=SubworkflowTaskParameters( + tasks=[ + WorkflowTask( + external_id="integration_test-workflow_definitions-test_create_delete-task1", + parameters=FunctionTaskParameters( + external_id="integration_test-workflow_definitions-test_create_delete-task1-function", + data={"a": 1, "b": 2}, + ), + ) + ] ), ) ], @@ -274,14 +311,16 @@ def test_list_workflow_version_limit( def test_delete_non_existing_raise(self, cognite_client: CogniteClient) -> None: with pytest.raises(CogniteAPIError) as e: cognite_client.workflows.versions.delete( - ("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=False + ("integration_test-non_existing_workflow_version", "1"), + ignore_unknown_ids=False, ) assert "not found" in str(e.value) def test_delete_non_existing(self, cognite_client: CogniteClient) -> None: cognite_client.workflows.versions.delete( - ("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=True + ("integration_test-non_existing_workflow_version", "1"), + ignore_unknown_ids=True, ) def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList) -> None: @@ -299,7 +338,9 @@ def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> class TestWorkflowExecutions: def test_list_workflow_executions( - self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList + self, + cognite_client: CogniteClient, + workflow_execution_list: WorkflowExecutionList, ) -> None: workflow_ids = set(w.as_workflow_id() for w in workflow_execution_list) @@ -311,7 +352,9 @@ def test_list_workflow_executions( assert all(w.as_workflow_id() in workflow_ids for w in listed) def test_retrieve_workflow_execution_detailed( - self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList + self, + cognite_client: CogniteClient, + workflow_execution_list: WorkflowExecutionList, ) -> None: retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list[0].id)