From 272152ed99dcd4f8ab3390881a22fde3007b7811 Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Thu, 26 Oct 2023 10:19:35 +0200 Subject: [PATCH 01/10] add subworkflow tasks to workflows --- cognite/client/data_classes/workflows.py | 63 ++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 6c602c96cf..4851e97c74 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -94,7 +94,7 @@ def as_external_ids(self) -> list[str]: class WorkflowTaskParameters(CogniteResource, ABC): - task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic"]] + task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]] @classmethod def load_parameters(cls, data: dict) -> WorkflowTaskParameters: @@ -114,8 +114,12 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: return CDFTaskParameters._load(parameters) elif type_ == "dynamic": return DynamicTaskParameters._load(parameters) + elif type == "subworkflow": + return SubworkflowTaskParameters._load(parameters) else: - raise ValueError(f"Unknown task type: {type_}. Expected 'function', 'transformation', 'cdf, or 'dynamic'") + raise ValueError( + f"Unknown task type: {type_}. Expected 'function', 'transformation', 'cdf, 'dynamic' or 'subworkflow'" + ) class FunctionTaskParameters(WorkflowTaskParameters): @@ -277,6 +281,37 @@ def dump(self, camel_case: bool = False) -> dict[str, Any]: } +class SubworkflowTaskParameters(WorkflowTaskParameters): + """ + The subworkflow task parameters are used to specify a subworkflow task. + + When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for conveniece. It takes the tasks parameter which is an array of + function, transformation, and cdf task definitions. This array needs to be statically set on the worklow definition (if it needs to be defined at runtime, use a + dynamic task). + + Args: + tasks (list[WorkflowTask]): The tasks belonging to the subworkflow. + """ + + task_type = "subworkflow" + + def __init__(self, tasks: list[WorkflowTask]) -> None: + self.tasks = tasks + + @classmethod + def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient | None = None) -> Self: + resource = json.loads(resource) if isinstance(resource, str) else resource + + subworkflow: dict[str, Any] = resource[cls.task_type] + + return cls( + [WorkflowTask._load(task) for task in subworkflow["tasks"]], + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return {self.task_type: {"tasks": [task.dump(camel_case) for task in self.tasks]}} + + class DynamicTaskParameters(WorkflowTaskParameters): """ The dynamic task parameters are used to specify a dynamic task. @@ -363,7 +398,7 @@ def __init__( self.depends_on = depends_on @property - def type(self) -> Literal["function", "transformation", "cdf", "dynamic"]: + def type(self) -> Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]: return self.parameters.task_type @classmethod @@ -418,6 +453,8 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput: return CDFTaskOutput.load(data) elif task_type == "dynamic": return DynamicTaskOutput.load(data) + elif task_type == "subworkflow": + return SubworkflowTaskOutput.load(data) else: raise ValueError(f"Unknown task type: {task_type}") @@ -524,6 +561,24 @@ def dump(self, camel_case: bool = False) -> dict[str, Any]: return {} +class SubworkflowTaskOutput(WorkflowTaskOutput): + """ + The subworkflow task output is used to specify the output of a subworkflow task. + """ + + task_type: ClassVar[str] = "subworkflow" + + def __init__(self) -> None: + ... + + @classmethod + def load(cls, data: dict[str, Any]) -> SubworkflowTaskOutput: + return cls() + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return {} + + class WorkflowTaskExecution(CogniteResource): """ This class represents a task execution. @@ -563,7 +618,7 @@ def __init__( self.reason_for_incompletion = reason_for_incompletion @property - def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic"]: + def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]: return self.input.task_type @classmethod From 81ad3492f16f37feb665d25bc5ecaa4db93e091f Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Thu, 26 Oct 2023 10:23:15 +0200 Subject: [PATCH 02/10] bump version --- CHANGELOG.md | 4 ++++ cognite/client/_version.py | 2 +- pyproject.toml | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3da05cf9fb..3a4ee5d5c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ Changes are grouped as follows - `Removed` for now removed features. - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. + +## [6.34.3] - 2023-10-26 +### Added +- Support for `subworkflow` tasks in `workflows`. ## [6.34.2] - 2023-10-23 ### Fixed diff --git a/cognite/client/_version.py b/cognite/client/_version.py index 92bee28a82..9e4edfbe9d 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "6.34.2" +__version__ = "6.34.3" __api_subversion__ = "V20220125" diff --git a/pyproject.toml b/pyproject.toml index afcf2eb0cf..03057122bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "6.34.2" +version = "6.34.3" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" From f462bb4280db619737553b74fbc674d3b75c523a Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Wed, 22 Nov 2023 15:47:11 +0100 Subject: [PATCH 03/10] make default camel case true --- cognite/client/data_classes/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 070e7f282b..0dadbdcb6b 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -320,7 +320,7 @@ def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient | [WorkflowTask._load(task) for task in subworkflow["tasks"]], ) - def dump(self, camel_case: bool = False) -> dict[str, Any]: + def dump(self, camel_case: bool = True) -> dict[str, Any]: return {self.task_type: {"tasks": [task.dump(camel_case) for task in self.tasks]}} From 39f7eba0235584575a748c9ce281290442bff369 Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Wed, 22 Nov 2023 16:28:36 +0100 Subject: [PATCH 04/10] add test --- .../test_api/test_workflows.py | 57 ++++++++++++++++--- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index a922d64fe7..fbc3f5c7bb 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -9,6 +9,7 @@ from cognite.client.data_classes.workflows import ( CDFTaskParameters, FunctionTaskParameters, + SubworkflowTaskParameters, TransformationTaskParameters, Workflow, WorkflowDefinitionUpsert, @@ -70,13 +71,37 @@ def workflow_version_list(cognite_client: CogniteClient) -> WorkflowVersionList: workflow_definition=WorkflowDefinitionUpsert( tasks=[ WorkflowTask( - external_id=f"{workflow_id}-2-task1", + external_id="subworkflow1", + parameters=SubworkflowTaskParameters( + tasks=[ + WorkflowTask( + external_id="s1-task1", + parameters=CDFTaskParameters( + resource_path="/dummy/no/real/resource/path", + method="GET", + body={"limit": 1}, + ), + ), + WorkflowTask( + external_id="s1-task2", + parameters=CDFTaskParameters( + resource_path="/dummy/no/real/resource/path", + method="GET", + body={"limit": 1}, + ), + ), + ] + ), + ), + WorkflowTask( + external_id="task1", parameters=CDFTaskParameters( resource_path="/dummy/no/real/resource/path", method="GET", body={"limit": 1}, ), - ) + depends_on=["subworkflow1"], + ), ], description=None, ), @@ -106,7 +131,10 @@ def handle(client, data: dict): return output cognite_client.functions.create(name="Add", external_id=external_id, function_handle=handle) - pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True) + pytest.skip( + "Function need to be redeployed, skipping tests that need it", + allow_module_level=True, + ) @pytest.fixture(scope="session") @@ -122,7 +150,10 @@ def handle(client, data: dict): return output cognite_client.functions.create(name="Multiply", external_id=external_id, function_handle=handle) - pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True) + pytest.skip( + "Function need to be redeployed, skipping tests that need it", + allow_module_level=True, + ) @pytest.fixture @@ -171,7 +202,9 @@ def workflow_execution_list( return executions # Creating at least one execution result = cognite_client.workflows.executions.trigger( - add_multiply_workflow.workflow_external_id, add_multiply_workflow.version, {"a": 5, "b": 6} + add_multiply_workflow.workflow_external_id, + add_multiply_workflow.version, + {"a": 5, "b": 6}, ) t0 = time.time() while result.status != "completed": @@ -274,14 +307,16 @@ def test_list_workflow_version_limit( def test_delete_non_existing_raise(self, cognite_client: CogniteClient) -> None: with pytest.raises(CogniteAPIError) as e: cognite_client.workflows.versions.delete( - ("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=False + ("integration_test-non_existing_workflow_version", "1"), + ignore_unknown_ids=False, ) assert "not found" in str(e.value) def test_delete_non_existing(self, cognite_client: CogniteClient) -> None: cognite_client.workflows.versions.delete( - ("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=True + ("integration_test-non_existing_workflow_version", "1"), + ignore_unknown_ids=True, ) def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList) -> None: @@ -299,7 +334,9 @@ def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> class TestWorkflowExecutions: def test_list_workflow_executions( - self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList + self, + cognite_client: CogniteClient, + workflow_execution_list: WorkflowExecutionList, ) -> None: workflow_ids = set(w.as_workflow_id() for w in workflow_execution_list) @@ -311,7 +348,9 @@ def test_list_workflow_executions( assert all(w.as_workflow_id() in workflow_ids for w in listed) def test_retrieve_workflow_execution_detailed( - self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList + self, + cognite_client: CogniteClient, + workflow_execution_list: WorkflowExecutionList, ) -> None: retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list[0].id) From d8d0078aa6e4034bc1ddd63204b6ecedc00b3086 Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Tue, 5 Dec 2023 16:33:08 +0100 Subject: [PATCH 05/10] import SubworkflowTaskParameters on data_classes --- cognite/client/data_classes/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index c8cf6bfc13..b361c777a0 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -214,6 +214,7 @@ DynamicTaskParameters, FunctionTaskOutput, FunctionTaskParameters, + SubworkflowTaskParameters, TransformationTaskOutput, TransformationTaskParameters, Workflow, @@ -413,6 +414,7 @@ "TransformationTaskParameters", "CDFTaskParameters", "DynamicTaskParameters", + "SubworkflowTaskParameters", "FunctionTaskOutput", "TransformationTaskOutput", "CDFTaskOutput", From b957cfe90d732d1c5f2e6630432aeb92243b8cf5 Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Tue, 5 Dec 2023 16:37:10 +0100 Subject: [PATCH 06/10] fix var name --- cognite/client/data_classes/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 0dadbdcb6b..873ec6662f 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -117,7 +117,7 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: return CDFTaskParameters._load(parameters) elif type_ == "dynamic": return DynamicTaskParameters._load(parameters) - elif type == "subworkflow": + elif type_ == "subworkflow": return SubworkflowTaskParameters._load(parameters) else: raise ValueError( From b9d3bb33a430bfa139f0ead79798c884ce0ef08f Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Wed, 6 Dec 2023 13:36:34 +0100 Subject: [PATCH 07/10] Update cognite/client/data_classes/workflows.py Co-authored-by: Anders Albert <60234212+doctrino@users.noreply.github.com> --- cognite/client/data_classes/workflows.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 873ec6662f..0f8b4a12c4 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -311,8 +311,7 @@ def __init__(self, tasks: list[WorkflowTask]) -> None: self.tasks = tasks @classmethod - def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient | None = None) -> Self: - resource = json.loads(resource) if isinstance(resource, str) else resource + def _load(cls: type[Self], resource: dict, cognite_client: CogniteClient | None = None) -> Self: subworkflow: dict[str, Any] = resource[cls.task_type] From 180daa8db411cb4e9b9db72c4acf9dcc60e6ba8c Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Wed, 6 Dec 2023 13:37:18 +0100 Subject: [PATCH 08/10] Update tests/tests_integration/test_api/test_workflows.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: HÃ¥kon V. Treider --- tests/tests_integration/test_api/test_workflows.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index fbc3f5c7bb..070709ac87 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -131,10 +131,7 @@ def handle(client, data: dict): return output cognite_client.functions.create(name="Add", external_id=external_id, function_handle=handle) - pytest.skip( - "Function need to be redeployed, skipping tests that need it", - allow_module_level=True, - ) + pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True) @pytest.fixture(scope="session") From 68a33d695e55aeead2ff1ed9b663b6ba5fcdbd66 Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Wed, 6 Dec 2023 13:40:29 +0100 Subject: [PATCH 09/10] review comments --- cognite/client/data_classes/workflows.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 873ec6662f..4c4f1ea59a 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -96,8 +96,11 @@ def as_external_ids(self) -> list[str]: return [workflow.external_id for workflow in self.data] +ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow"] + + class WorkflowTaskParameters(CogniteObject, ABC): - task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]] + task_type: ClassVar[ValidTaskType] @classmethod def load_parameters(cls, data: dict) -> WorkflowTaskParameters: @@ -120,9 +123,7 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: elif type_ == "subworkflow": return SubworkflowTaskParameters._load(parameters) else: - raise ValueError( - f"Unknown task type: {type_}. Expected 'function', 'transformation', 'cdf, 'dynamic' or 'subworkflow'" - ) + raise ValueError(f"Unknown task type: {type_}. Expected {ValidTaskType}") class FunctionTaskParameters(WorkflowTaskParameters): @@ -413,7 +414,7 @@ def __init__( self.depends_on = depends_on @property - def type(self) -> Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]: + def type(self) -> ValidTaskType: return self.parameters.task_type @classmethod @@ -634,7 +635,7 @@ def __init__( self.reason_for_incompletion = reason_for_incompletion @property - def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]: + def task_type(self) -> ValidTaskType: return self.input.task_type @classmethod From 4e2b88d31b4a3cbd6138afb997e95152ffa2ac5e Mon Sep 17 00:00:00 2001 From: Jaime Silva Date: Wed, 6 Dec 2023 13:47:33 +0100 Subject: [PATCH 10/10] update test --- .../tests_integration/test_api/test_workflows.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py index a0e8e3cab3..872b751a7f 100644 --- a/tests/tests_integration/test_api/test_workflows.py +++ b/tests/tests_integration/test_api/test_workflows.py @@ -260,10 +260,17 @@ def test_upsert_delete(self, cognite_client: CogniteClient) -> None: workflow_definition=WorkflowDefinitionUpsert( tasks=[ WorkflowTask( - external_id="integration_test-workflow_definitions-test_create_delete-task1", - parameters=FunctionTaskParameters( - external_id="integration_test-workflow_definitions-test_create_delete-task1-function", - data={"a": 1, "b": 2}, + external_id="integration_test-workflow_definitions-test_create_delete-subworkflow1", + parameters=SubworkflowTaskParameters( + tasks=[ + WorkflowTask( + external_id="integration_test-workflow_definitions-test_create_delete-task1", + parameters=FunctionTaskParameters( + external_id="integration_test-workflow_definitions-test_create_delete-task1-function", + data={"a": 1, "b": 2}, + ), + ) + ] ), ) ],