From 0691b84b895404e9e5f527e044d0ec19df959415 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nina=20=C3=98deg=C3=A5rd?= Date: Tue, 19 Nov 2024 13:00:50 +0100 Subject: [PATCH] feat: add metadata to workflow triggers (#2002) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: HÃ¥kon V. Treider --- CHANGELOG.md | 14 +- cognite/client/_api/workflows.py | 9 +- cognite/client/_version.py | 2 +- cognite/client/data_classes/workflows.py | 26 +- pyproject.toml | 2 +- .../test_api/test_data_workflows.py | 644 ------------------ .../test_api/test_workflows.py | 598 ++++++++++++++++ 7 files changed, 635 insertions(+), 660 deletions(-) delete mode 100644 tests/tests_integration/test_api/test_data_workflows.py create mode 100644 tests/tests_integration/test_api/test_workflows.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bac1a994cd..159db290ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,23 +17,29 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. -## [7.67.0] - 2023-11-19 +## [7.67.1] - 2024-11-19 +### Added +- Workflow triggers support metadata field +### Fixed +- Workflow description is optional + +## [7.67.0] - 2024-11-19 ### Added - Convenience method `from_alias` to the UnitsAPI (`client.units.from_alias`) to help with looking up units by their aliases (similarity search is supported). -## [7.66.1] - 2023-11-18 +## [7.66.1] - 2024-11-18 ### Removed - The Core Data Model (v1) is now considered stable and the alpha warning has been removed. - Usage of `instance_id` in the FilesAPI is considered stable and the alpha warning has been removed. -## [7.66.0] - 2023-11-15 +## [7.66.0] - 2024-11-15 ### Added - User's trying to access a CDF project they do not have access to, will now be met with a more helpful exception: `CogniteProjectAccessError` will be raised and accessible projects on the given cluser will be listed, rather than just "401 - Unauthorized". -## [7.65.1] - 2023-11-14 +## [7.65.1] - 2024-11-14 ### Added - Workflows now support data sets diff --git a/cognite/client/_api/workflows.py b/cognite/client/_api/workflows.py index 5635d1d3f4..fec80b6183 100644 --- a/cognite/client/_api/workflows.py +++ b/cognite/client/_api/workflows.py @@ -63,11 +63,11 @@ def upsert( """`Create or update a trigger for a workflow. `_ Args: - workflow_trigger (WorkflowTriggerUpsert): The workflow trigger specitification. + workflow_trigger (WorkflowTriggerUpsert): The workflow trigger specification. client_credentials (ClientCredentials | dict | None): Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials. Returns: - WorkflowTrigger: The created or updated workflow trigger specification. + WorkflowTrigger: The created or updated workflow trigger specification. Examples: @@ -83,6 +83,7 @@ def upsert( ... workflow_external_id="my_workflow", ... workflow_version="1", ... input={"a": 1, "b": 2}, + ... metadata={"key": "value"}, ... ) ... ) @@ -133,7 +134,7 @@ def create( This method is deprecated, use '.upsert' instead. It will be completely removed October 2024. Args: - workflow_trigger (WorkflowTriggerCreate): The workflow trigger specitification. + workflow_trigger (WorkflowTriggerCreate): The workflow trigger specification. client_credentials (ClientCredentials | dict | None): Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials. Returns: @@ -507,7 +508,7 @@ def cancel(self, id: str, reason: str | None) -> WorkflowExecution: >>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.executions.run("foo", "1") - >>> client.workflows.executions.cancel(id="foo", reason="test cancelation") + >>> client.workflows.executions.cancel(id="foo", reason="test cancellation") """ response = self._post( url_path=f"{self._RESOURCE_PATH}/{id}/cancel", diff --git a/cognite/client/_version.py b/cognite/client/_version.py index 69b99467e7..7e6ca53b4e 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "7.67.0" +__version__ = "7.67.1" __api_subversion__ = "20230101" diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 5a7d9c29e1..af9be2b9f1 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -736,14 +736,14 @@ class WorkflowDefinitionCore(WriteableCogniteResource["WorkflowDefinitionUpsert" tasks (list[WorkflowTask]): The tasks of the workflow definition. description (str | None): The description of the workflow definition. Note that when updating a workflow definition description, it will always be overwritten also if it is set to None. Meaning if the - wokflow definition already has a description, and you want to keep it, you need to provide + workflow definition already has a description, and you want to keep it, you need to provide the description when updating it. """ def __init__( self, tasks: list[WorkflowTask], - description: str | None, + description: str | None = None, ) -> None: self.tasks = tasks self.description = description @@ -764,7 +764,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class WorkflowDefinitionUpsert(WorkflowDefinitionCore): """ - This class represents a workflow definition. This represents the write/update version of a workflow definiton. + This class represents a workflow definition. This represents the write/update version of a workflow definition. A workflow definition defines the tasks and order/dependencies of these tasks. @@ -772,14 +772,14 @@ class WorkflowDefinitionUpsert(WorkflowDefinitionCore): tasks (list[WorkflowTask]): The tasks of the workflow definition. description (str | None): The description of the workflow definition. Note that when updating a workflow definition description, it will always be overwritten also if it is set to None. Meaning if the - wokflow definition already has a description, and you want to keep it, you need to provide + workflow definition already has a description, and you want to keep it, you need to provide the description when updating it. """ def __init__( self, tasks: list[WorkflowTask], - description: str | None, + description: str | None = None, ) -> None: super().__init__(tasks, description) @@ -803,7 +803,7 @@ def as_write(self) -> WorkflowDefinitionUpsert: class WorkflowDefinition(WorkflowDefinitionCore): """ - This class represents a workflow definition. This represents the read version of a workflow definiton. + This class represents a workflow definition. This represents the read version of a workflow definition. A workflow definition defines the tasks and order/dependencies of these tasks. @@ -1352,6 +1352,7 @@ class WorkflowTriggerCore(WriteableCogniteResource["WorkflowTriggerUpsert"], ABC workflow_external_id (str): The external ID of the workflow. workflow_version (str): The version of the workflow. input (dict | None): The input data of the workflow version trigger. Defaults to None. + metadata (dict | None): Application specific metadata. Defaults to None. """ def __init__( @@ -1361,12 +1362,14 @@ def __init__( workflow_external_id: str, workflow_version: str, input: dict | None = None, + metadata: dict | None = None, ) -> None: self.external_id = external_id self.trigger_rule = trigger_rule self.workflow_external_id = workflow_external_id self.workflow_version = workflow_version self.input = input + self.metadata = metadata class WorkflowTriggerUpsert(WorkflowTriggerCore): @@ -1379,6 +1382,7 @@ class WorkflowTriggerUpsert(WorkflowTriggerCore): workflow_external_id (str): The external ID of the workflow. workflow_version (str): The version of the workflow. input (dict | None): The input data of the workflow version trigger. Defaults to None. + metadata (dict | None): Application specific metadata. Defaults to None. """ def dump(self, camel_case: bool = True) -> dict[str, Any]: @@ -1390,6 +1394,8 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } if self.input: item["input"] = self.input + if self.metadata: + item["metadata"] = self.metadata if camel_case: return convert_all_keys_to_camel_case(item) return item @@ -1402,6 +1408,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W workflow_version=resource["workflowVersion"], trigger_rule=WorkflowTriggerRule._load(resource["triggerRule"]), input=resource.get("input"), + metadata=resource.get("metadata"), ) def as_write(self) -> WorkflowTriggerUpsert: @@ -1422,6 +1429,7 @@ class WorkflowTrigger(WorkflowTriggerCore): workflow_external_id (str): The external ID of the workflow. workflow_version (str): The version of the workflow. input (dict | None): The input data passed to the workflow when an execution is started. Defaults to None. + metadata (dict | None): Application specific metadata. Defaults to None. created_time (int | None): The time when the workflow version trigger was created. Unix timestamp in milliseconds. Defaults to None. last_updated_time (int | None): The time when the workflow version trigger was last updated. Unix timestamp in milliseconds. Defaults to None. """ @@ -1433,6 +1441,7 @@ def __init__( workflow_external_id: str, workflow_version: str, input: dict | None = None, + metadata: dict | None = None, created_time: int | None = None, last_updated_time: int | None = None, ) -> None: @@ -1442,6 +1451,7 @@ def __init__( workflow_external_id=workflow_external_id, workflow_version=workflow_version, input=input, + metadata=metadata, ) self.created_time = created_time self.last_updated_time = last_updated_time @@ -1455,6 +1465,8 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } if self.input: item["input"] = self.input + if self.metadata: + item["metadata"] = self.metadata if self.created_time: item["created_time"] = self.created_time if self.last_updated_time: @@ -1471,6 +1483,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W workflow_version=resource["workflowVersion"], trigger_rule=WorkflowTriggerRule._load(resource["triggerRule"]), input=resource.get("input"), + metadata=resource.get("metadata"), created_time=resource.get("createdTime"), last_updated_time=resource.get("lastUpdatedTime"), ) @@ -1483,6 +1496,7 @@ def as_write(self) -> WorkflowTriggerUpsert: workflow_external_id=self.workflow_external_id, workflow_version=self.workflow_version, input=self.input, + metadata=self.metadata, ) diff --git a/pyproject.toml b/pyproject.toml index ad37a7bd68..fa5e0de08b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "7.67.0" +version = "7.67.1" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/tests/tests_integration/test_api/test_data_workflows.py b/tests/tests_integration/test_api/test_data_workflows.py deleted file mode 100644 index 7eb00651f1..0000000000 --- a/tests/tests_integration/test_api/test_data_workflows.py +++ /dev/null @@ -1,644 +0,0 @@ -from __future__ import annotations - -import time - -import pytest - -from cognite.client import CogniteClient -from cognite.client.data_classes import DataSet, Function -from cognite.client.data_classes.data_modeling import ViewId -from cognite.client.data_classes.data_modeling.query import NodeResultSetExpression, Select, SourceSelector -from cognite.client.data_classes.workflows import ( - CDFTaskParameters, - FunctionTaskParameters, - SubworkflowTaskParameters, - TransformationTaskParameters, - Workflow, - WorkflowDataModelingTriggerRule, - WorkflowDefinitionUpsert, - WorkflowExecutionList, - WorkflowList, - WorkflowScheduledTriggerRule, - WorkflowTask, - WorkflowTrigger, - WorkflowTriggerDataModelingQuery, - WorkflowTriggerUpsert, - WorkflowUpsert, - WorkflowVersion, - WorkflowVersionList, - WorkflowVersionUpsert, -) -from cognite.client.exceptions import CogniteAPIError -from cognite.client.utils._text import random_string - - -@pytest.fixture -def workflow_list(cognite_client: CogniteClient, data_set: DataSet) -> WorkflowList: - workflow1 = WorkflowUpsert( - external_id="integration_test-workflow1", - description="This is workflow for testing purposes", - data_set_id=data_set.id, - ) - workflow2 = WorkflowUpsert( - external_id="integration_test-workflow2", - description="This is workflow for testing purposes", - ) - workflows = [workflow1, workflow2] - listed = cognite_client.workflows.list() - existing = listed._external_id_to_item - call_list = False - for workflow in workflows: - if workflow.external_id not in existing: - call_list = True - cognite_client.workflows.upsert(workflow) - if call_list: - return cognite_client.workflows.list() - return listed - - -@pytest.fixture -def workflow_version_list(cognite_client: CogniteClient) -> WorkflowVersionList: - workflow_id = "integration_test-workflow_with_versions" - version1 = WorkflowVersionUpsert( - workflow_external_id=workflow_id, - version="1", - workflow_definition=WorkflowDefinitionUpsert( - tasks=[ - WorkflowTask( - external_id=f"{workflow_id}-1-task1", - parameters=TransformationTaskParameters( - external_id="None-existing-transformation", - ), - ) - ], - description=None, - ), - ) - version2 = WorkflowVersionUpsert( - workflow_external_id=workflow_id, - version="2", - workflow_definition=WorkflowDefinitionUpsert( - tasks=[ - WorkflowTask( - external_id="subworkflow1", - parameters=SubworkflowTaskParameters( - tasks=[ - WorkflowTask( - external_id="s1-task1", - parameters=CDFTaskParameters( - resource_path="/dummy/no/real/resource/path", - method="GET", - body={"limit": 1}, - ), - ), - WorkflowTask( - external_id="s1-task2", - parameters=CDFTaskParameters( - resource_path="/dummy/no/real/resource/path", - method="GET", - body={"limit": 1}, - ), - ), - ] - ), - ), - WorkflowTask( - external_id="task1", - parameters=CDFTaskParameters( - resource_path="/dummy/no/real/resource/path", - method="GET", - body={"limit": 1}, - ), - depends_on=["subworkflow1"], - ), - ], - description=None, - ), - ) - listed = cognite_client.workflows.versions.list(workflow_version_ids=workflow_id) - existing = {w.version for w in listed} - call_list = False - for version in [version1, version2]: - if version.version not in existing: - call_list = True - cognite_client.workflows.versions.upsert(version) - if call_list: - return cognite_client.workflows.versions.list(workflow_version_ids=workflow_id) - return listed - - -@pytest.fixture(scope="session") -def cdf_function_add(cognite_client: CogniteClient) -> Function: - external_id = "integration_test-workflow-cdf_function_add" - add_function = cognite_client.functions.retrieve(external_id=external_id) - if add_function is not None: - return add_function - - def handle(client, data: dict): - output = data.copy() - output["sum"] = output["a"] + output["b"] - return output - - cognite_client.functions.create(name="Add", external_id=external_id, function_handle=handle) - pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True) - - -@pytest.fixture(scope="session") -def cdf_function_multiply(cognite_client: CogniteClient) -> Function: - external_id = "integration_test-workflow-cdf_function_multiply" - multiply_function = cognite_client.functions.retrieve(external_id=external_id) - if multiply_function is not None: - return multiply_function - - def handle(client, data: dict): - output = data.copy() - output["product"] = output["a"] * output["b"] - return output - - cognite_client.functions.create(name="Multiply", external_id=external_id, function_handle=handle) - pytest.skip( - "Function need to be redeployed, skipping tests that need it", - allow_module_level=True, - ) - - -@pytest.fixture(scope="session") -def add_multiply_workflow( - cognite_client: CogniteClient, cdf_function_add: Function, cdf_function_multiply: Function -) -> WorkflowVersion: - workflow_id = "integration_test-workflow-add_multiply" - version = WorkflowVersionUpsert( - workflow_external_id=workflow_id, - version="1", - workflow_definition=WorkflowDefinitionUpsert( - description=None, - tasks=[ - WorkflowTask( - external_id=f"{workflow_id}-1-add", - parameters=FunctionTaskParameters( - external_id=cdf_function_add.external_id, - data={"a": 1, "b": 2}, - ), - ), - WorkflowTask( - external_id=f"{workflow_id}-1-multiply", - parameters=FunctionTaskParameters( - external_id=cdf_function_multiply.external_id, - data={"a": 3, "b": 4}, - is_async_complete=True, - ), - timeout=120, - retries=2, - ), - ], - ), - ) - - retrieved = cognite_client.workflows.versions.retrieve(version.workflow_external_id, version.version) - if retrieved is not None: - return retrieved - else: - return cognite_client.workflows.versions.upsert(version) - - -@pytest.fixture(scope="session") -def workflow_execution_list( - cognite_client: CogniteClient, add_multiply_workflow: WorkflowVersion -) -> WorkflowExecutionList: - executions = cognite_client.workflows.executions.list(workflow_version_ids=add_multiply_workflow.as_id(), limit=5) - if executions: - return executions - # Creating at least one execution - result = cognite_client.workflows.executions.run( - add_multiply_workflow.workflow_external_id, - add_multiply_workflow.version, - {"a": 5, "b": 6}, - ) - t0 = time.time() - while result.status != "completed": - result = cognite_client.workflows.executions.retrieve_detailed(result.id) - if result.status != "running": - break - try: - cognite_client.workflows.tasks.update(result.executed_tasks[1].id, "completed") - except CogniteAPIError as e: - if e.message == f"Task with id {result.executed_tasks[1].id} is already in a terminal state": - break - time.sleep(0.5) - if time.time() - t0 > 60: - raise TimeoutError("Workflow execution did not complete in time") - return cognite_client.workflows.executions.list(workflow_version_ids=add_multiply_workflow.as_id(), limit=5) - - -@pytest.fixture() -def clean_created_sessions(cognite_client: CogniteClient) -> None: - existing_active_sessions = cognite_client.iam.sessions.list(status="active", limit=-1) - yield None - current_sessions = cognite_client.iam.sessions.list(status="active", limit=-1) - existing_ids = {session.id for session in existing_active_sessions} - to_revoked = [session.id for session in current_sessions if session.id not in existing_ids] - if len(to_revoked) > 0: - cognite_client.iam.sessions.revoke(to_revoked) - - -@pytest.fixture() -def clean_created_workflow_triggers(cognite_client: CogniteClient) -> None: - existing_workflow_triggers = cognite_client.workflows.triggers.get_triggers(limit=-1) - yield None - current_workflow_triggers = cognite_client.workflows.triggers.get_triggers(limit=-1) - existing_external_ids = {trigger.external_id for trigger in existing_workflow_triggers} - to_clean = [ - trigger.external_id for trigger in current_workflow_triggers if trigger.external_id not in existing_external_ids - ] - for external_id in to_clean: - cognite_client.workflows.triggers.delete(external_id) - - -@pytest.fixture() -def workflow_scheduled_trigger(cognite_client: CogniteClient, add_multiply_workflow: WorkflowVersion) -> None: - trigger = cognite_client.workflows.triggers.create( - WorkflowTriggerUpsert( - external_id="integration_test-workflow-scheduled-trigger", - trigger_rule=WorkflowScheduledTriggerRule(cron_expression="* * * * *"), - workflow_external_id="integration_test-workflow-add_multiply", - workflow_version="1", - input={"a": 1, "b": 2}, - ) - ) - yield trigger - cognite_client.workflows.triggers.delete(trigger.external_id) - - -@pytest.fixture(scope="session") -def data_set(cognite_client: CogniteClient) -> DataSet: - return cognite_client.data_sets.list(limit=1)[0] - - -@pytest.fixture() -def workflow_data_modeling_trigger(cognite_client: CogniteClient, add_multiply_workflow: WorkflowVersion) -> None: - trigger = cognite_client.workflows.triggers.create( - WorkflowTriggerUpsert( - external_id="integration_test-workflow-data-modeling-trigger", - trigger_rule=WorkflowDataModelingTriggerRule( - data_modeling_query=WorkflowTriggerDataModelingQuery( - with_={"timeseries": NodeResultSetExpression()}, - select={ - "timeseries": Select( - sources=[SourceSelector(ViewId("cdf_cdm", "CogniteTimeSeries", "v1"), ["name"])] - ) - }, - ), - batch_size=500, - batch_timeout=300, - ), - workflow_external_id="integration_test-workflow-add_multiply", - workflow_version="1", - ) - ) - yield trigger - cognite_client.workflows.triggers.delete(trigger.external_id) - - -class TestWorkflows: - def test_upsert_delete(self, cognite_client: CogniteClient, data_set: DataSet) -> None: - workflow = WorkflowUpsert( - external_id="integration_test-test_create_delete" + random_string(5), - description="This is ephemeral workflow for testing purposes", - data_set_id=data_set.id, - ) - cognite_client.workflows.delete(workflow.external_id, ignore_unknown_ids=True) - - created_workflow: Workflow | None = None - try: - created_workflow = cognite_client.workflows.upsert(workflow) - - assert created_workflow.external_id == workflow.external_id - assert created_workflow.description == workflow.description - assert created_workflow.created_time is not None - assert created_workflow.data_set_id == data_set.id - finally: - if created_workflow is not None: - cognite_client.workflows.delete(created_workflow.external_id) - - def test_delete_non_existing_raise(self, cognite_client: CogniteClient) -> None: - with pytest.raises(CogniteAPIError) as e: - cognite_client.workflows.delete("integration_test-non_existing_workflow", ignore_unknown_ids=False) - - assert "workflows were not found" in str(e.value) - - def test_delete_non_existing(self, cognite_client: CogniteClient) -> None: - cognite_client.workflows.delete("integration_test-non_existing_workflow", ignore_unknown_ids=True) - - def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_list: WorkflowList) -> None: - retrieved = cognite_client.workflows.retrieve(workflow_list[0].external_id) - - assert retrieved == workflow_list[0] - - def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> None: - non_existing = cognite_client.workflows.retrieve("integration_test-non_existing_workflow") - - assert non_existing is None - - -class TestWorkflowVersions: - def test_upsert_delete(self, cognite_client: CogniteClient) -> None: - version = WorkflowVersionUpsert( - workflow_external_id="integration_test-workflow_versions-test_create_delete" + random_string(5), - version="1", - workflow_definition=WorkflowDefinitionUpsert( - tasks=[ - WorkflowTask( - external_id="integration_test-workflow_definitions-test_create_delete-subworkflow1" - + random_string(5), - parameters=SubworkflowTaskParameters( - tasks=[ - WorkflowTask( - external_id="integration_test-workflow_definitions-test_create_delete-task1" - + random_string(5), - parameters=FunctionTaskParameters( - external_id="integration_test-workflow_definitions-test_create_delete-task1-function", - data={"a": 1, "b": 2}, - ), - ) - ] - ), - ) - ], - description="This is ephemeral workflow definition for testing purposes", - ), - ) - cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True) - - created_version: WorkflowVersion | None = None - try: - created_version = cognite_client.workflows.versions.upsert(version) - - assert created_version.workflow_external_id == version.workflow_external_id - assert created_version.workflow_definition.description == version.workflow_definition.description - assert isinstance(created_version.workflow_definition.hash_, str) - finally: - if created_version is not None: - cognite_client.workflows.versions.delete( - created_version.as_id(), - ) - cognite_client.workflows.delete(created_version.workflow_external_id) - - def test_list_workflow_versions( - self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList - ) -> None: - listed = cognite_client.workflows.versions.list(workflow_version_list.as_ids()) - - assert len(listed) == len(workflow_version_list) - assert listed == workflow_version_list - - def test_list_workflow_version_limit( - self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList - ) -> None: - listed = cognite_client.workflows.versions.list(limit=1) - - assert len(listed) == 1 - - def test_delete_non_existing_raise(self, cognite_client: CogniteClient) -> None: - with pytest.raises(CogniteAPIError) as e: - cognite_client.workflows.versions.delete( - ("integration_test-non_existing_workflow_version", "1"), - ignore_unknown_ids=False, - ) - - assert "not found" in str(e.value) - - def test_delete_non_existing(self, cognite_client: CogniteClient) -> None: - cognite_client.workflows.versions.delete( - ("integration_test-non_existing_workflow_version", "1"), - ignore_unknown_ids=True, - ) - - def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList) -> None: - retrieve_id = workflow_version_list[0].as_id() - - retrieved = cognite_client.workflows.versions.retrieve(*retrieve_id.as_primitive()) - - assert retrieved == workflow_version_list[0] - - def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> None: - non_existing = cognite_client.workflows.versions.retrieve("integration_test-non_existing_workflow", "1") - - assert non_existing is None - - -class TestWorkflowExecutions: - def test_list_workflow_executions( - self, - cognite_client: CogniteClient, - workflow_execution_list: WorkflowExecutionList, - ) -> None: - workflow_ids = set(w.as_workflow_id() for w in workflow_execution_list) - - assert workflow_ids, "There should be at least one workflow execution to test list with" - listed = cognite_client.workflows.executions.list( - workflow_version_ids=list(workflow_ids), limit=len(workflow_execution_list) - ) - - assert len(listed) == len(workflow_execution_list) - assert all(w.as_workflow_id() in workflow_ids for w in listed) - - def test_list_workflow_executions_by_status( - self, - cognite_client: CogniteClient, - add_multiply_workflow: WorkflowVersion, - ) -> None: - listed_completed = cognite_client.workflows.executions.list( - workflow_version_ids=add_multiply_workflow.as_id(), statuses="completed", limit=3 - ) - for execution in listed_completed: - assert execution.status == "completed" - - listed_others = cognite_client.workflows.executions.list( - workflow_version_ids=add_multiply_workflow.as_id(), statuses=["running", "failed"], limit=3 - ) - for execution in listed_others: - assert execution.status in ["running", "failed"] - - def test_retrieve_workflow_execution_detailed( - self, - cognite_client: CogniteClient, - workflow_execution_list: WorkflowExecutionList, - ) -> None: - workflow_execution_completed = cognite_client.workflows.executions.list(statuses="completed", limit=1) - assert ( - workflow_execution_completed - ), "There should be at least one workflow execution to test retrieve detailed with" - retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_completed[0].id) - assert retrieved.as_execution().dump() == workflow_execution_completed[0].dump() - assert retrieved.executed_tasks - - def test_retrieve_non_existing_workflow_execution(self, cognite_client: CogniteClient) -> None: - non_existing = cognite_client.workflows.executions.retrieve_detailed( - "integration_test-non_existing_workflow_execution" - ) - - assert non_existing is None - - # Each trigger creates a new execution, so we need to clean up after each test to avoid - # running out of quota - @pytest.mark.usefixtures("clean_created_sessions") - def test_trigger_retrieve_detailed_update_update_task( - self, - cognite_client: CogniteClient, - add_multiply_workflow: WorkflowVersion, - ) -> None: - workflow_execution = cognite_client.workflows.executions.run( - add_multiply_workflow.workflow_external_id, - add_multiply_workflow.version, - ) - - async_task = add_multiply_workflow.workflow_definition.tasks[1] - assert isinstance(async_task.parameters, FunctionTaskParameters) - assert async_task.parameters.is_async_complete - - workflow_execution_detailed = cognite_client.workflows.executions.retrieve_detailed(workflow_execution.id) - async_task = workflow_execution_detailed.executed_tasks[1] - - async_task = cognite_client.workflows.tasks.update(async_task.id, "completed") - assert async_task.status == "completed" - - @pytest.mark.usefixtures("clean_created_sessions") - def test_trigger_cancel_retry_workflow( - self, cognite_client: CogniteClient, add_multiply_workflow: WorkflowVersion - ) -> None: - workflow_execution = cognite_client.workflows.executions.run( - add_multiply_workflow.workflow_external_id, - add_multiply_workflow.version, - ) - - cancelled_workflow_execution = cognite_client.workflows.executions.cancel( - id=workflow_execution.id, reason="test" - ) - assert cancelled_workflow_execution.status == "terminated" - assert cancelled_workflow_execution.reason_for_incompletion == "test" - - retried_workflow_execution = cognite_client.workflows.executions.retry(workflow_execution.id) - assert retried_workflow_execution.status == "running" - - -class TestWorkflowTriggers: - @pytest.mark.usefixtures("clean_created_sessions", "clean_created_workflow_triggers") - def test_create_update_delete_scheduled_trigger( - self, - cognite_client: CogniteClient, - workflow_scheduled_trigger: WorkflowTrigger, - ) -> None: - assert workflow_scheduled_trigger is not None - assert workflow_scheduled_trigger.external_id == "integration_test-workflow-scheduled-trigger" - assert workflow_scheduled_trigger.trigger_rule == WorkflowScheduledTriggerRule(cron_expression="* * * * *") - assert workflow_scheduled_trigger.workflow_external_id == "integration_test-workflow-add_multiply" - assert workflow_scheduled_trigger.workflow_version == "1" - assert workflow_scheduled_trigger.input == {"a": 1, "b": 2} - assert workflow_scheduled_trigger.created_time is not None - assert workflow_scheduled_trigger.last_updated_time is not None - - updated_trigger = cognite_client.workflows.triggers.upsert( - WorkflowTriggerUpsert( - external_id=workflow_scheduled_trigger.external_id, - trigger_rule=WorkflowScheduledTriggerRule(cron_expression="0 * * * *"), - workflow_external_id=workflow_scheduled_trigger.workflow_external_id, - workflow_version=workflow_scheduled_trigger.workflow_version, - input=workflow_scheduled_trigger.input, - ) - ) - assert updated_trigger is not None - assert updated_trigger.external_id == workflow_scheduled_trigger.external_id - assert updated_trigger.trigger_rule == WorkflowScheduledTriggerRule(cron_expression="0 * * * *") - assert updated_trigger.workflow_external_id == workflow_scheduled_trigger.workflow_external_id - assert updated_trigger.workflow_version == workflow_scheduled_trigger.workflow_version - assert updated_trigger.input == workflow_scheduled_trigger.input - assert updated_trigger.created_time == workflow_scheduled_trigger.created_time - assert updated_trigger.last_updated_time > workflow_scheduled_trigger.last_updated_time - - @pytest.mark.usefixtures("clean_created_sessions", "clean_created_workflow_triggers") - def test_create_update_delete_data_modeling_trigger( - self, - cognite_client: CogniteClient, - workflow_data_modeling_trigger: WorkflowTrigger, - ) -> None: - assert workflow_data_modeling_trigger is not None - assert workflow_data_modeling_trigger.external_id == "integration_test-workflow-data-modeling-trigger" - assert workflow_data_modeling_trigger.trigger_rule == WorkflowDataModelingTriggerRule( - data_modeling_query=WorkflowTriggerDataModelingQuery( - with_={"timeseries": NodeResultSetExpression()}, - select={ - "timeseries": Select( - sources=[SourceSelector(ViewId("cdf_cdm", "CogniteTimeSeries", "v1"), ["name"])] - ) - }, - ), - batch_size=500, - batch_timeout=300, - ) - assert workflow_data_modeling_trigger.workflow_external_id == "integration_test-workflow-add_multiply" - assert workflow_data_modeling_trigger.workflow_version == "1" - assert workflow_data_modeling_trigger.created_time is not None - assert workflow_data_modeling_trigger.last_updated_time is not None - - updated_trigger = cognite_client.workflows.triggers.upsert( - WorkflowTriggerUpsert( - external_id=workflow_data_modeling_trigger.external_id, - trigger_rule=WorkflowDataModelingTriggerRule( - data_modeling_query=WorkflowTriggerDataModelingQuery( - with_={"timeseries": NodeResultSetExpression()}, - select={ - "timeseries": Select( - sources=[SourceSelector(ViewId("cdf_cdm", "CogniteTimeSeries", "v1"), ["name"])] - ) - }, - ), - batch_size=100, - batch_timeout=100, - ), - workflow_external_id=workflow_data_modeling_trigger.workflow_external_id, - workflow_version=workflow_data_modeling_trigger.workflow_version, - ) - ) - assert updated_trigger is not None - assert updated_trigger.external_id == workflow_data_modeling_trigger.external_id - assert updated_trigger.trigger_rule == WorkflowDataModelingTriggerRule( - data_modeling_query=WorkflowTriggerDataModelingQuery( - with_={"timeseries": NodeResultSetExpression()}, - select={ - "timeseries": Select( - sources=[SourceSelector(ViewId("cdf_cdm", "CogniteTimeSeries", "v1"), ["name"])] - ) - }, - ), - batch_size=100, - batch_timeout=100, - ) - assert updated_trigger.workflow_external_id == workflow_data_modeling_trigger.workflow_external_id - assert updated_trigger.workflow_version == workflow_data_modeling_trigger.workflow_version - assert updated_trigger.created_time == workflow_data_modeling_trigger.created_time - assert updated_trigger.last_updated_time > workflow_data_modeling_trigger.last_updated_time - - @pytest.mark.usefixtures("clean_created_sessions", "clean_created_workflow_triggers") - def test_trigger_list( - self, - cognite_client: CogniteClient, - workflow_scheduled_trigger: WorkflowTrigger, - workflow_data_modeling_trigger: WorkflowTrigger, - ) -> None: - triggers = cognite_client.workflows.triggers.get_triggers() - external_ids = {trigger.external_id for trigger in triggers} - - assert workflow_scheduled_trigger.external_id in external_ids - assert workflow_data_modeling_trigger.external_id in external_ids - - @pytest.mark.usefixtures("clean_created_sessions", "clean_created_workflow_triggers") - def test_trigger_run_history( - self, - cognite_client: CogniteClient, - workflow_scheduled_trigger: WorkflowTrigger, - ) -> None: - history = cognite_client.workflows.triggers.get_trigger_run_history( - external_id=workflow_scheduled_trigger.external_id - ) - # it would take too long to wait for the trigger to run, so we just check that the history is not None - assert history is not None diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py new file mode 100644 index 0000000000..368247aa59 --- /dev/null +++ b/tests/tests_integration/test_api/test_workflows.py @@ -0,0 +1,598 @@ +from __future__ import annotations + +import time +import unittest +from datetime import datetime + +import pytest + +from cognite.client import CogniteClient +from cognite.client.data_classes import DataSet +from cognite.client.data_classes.data_modeling import ViewId +from cognite.client.data_classes.data_modeling.query import NodeResultSetExpression, Select, SourceSelector +from cognite.client.data_classes.workflows import ( + CDFTaskParameters, + FunctionTaskParameters, + SubworkflowReferenceParameters, + SubworkflowTaskParameters, + TransformationTaskParameters, + Workflow, + WorkflowDataModelingTriggerRule, + WorkflowDefinitionUpsert, + WorkflowExecutionList, + WorkflowList, + WorkflowScheduledTriggerRule, + WorkflowTask, + WorkflowTrigger, + WorkflowTriggerDataModelingQuery, + WorkflowTriggerUpsert, + WorkflowUpsert, + WorkflowVersion, + WorkflowVersionId, + WorkflowVersionList, + WorkflowVersionUpsert, +) +from cognite.client.exceptions import CogniteAPIError +from cognite.client.utils._text import random_string + + +@pytest.fixture(scope="session") +def data_set(cognite_client: CogniteClient) -> DataSet: + return cognite_client.data_sets.list(limit=1)[0] + + +@pytest.fixture +def new_workflow(cognite_client: CogniteClient, data_set: DataSet): + workflow = WorkflowUpsert( + external_id=f"integration_test-workflow_{random_string(5)}", + data_set_id=data_set.id, + ) + yield cognite_client.workflows.upsert(workflow) + cognite_client.workflows.delete(workflow.external_id, ignore_unknown_ids=True) + assert cognite_client.workflows.retrieve(workflow.external_id) is None + + +@pytest.fixture(scope="class") +def workflow_list(cognite_client: CogniteClient, data_set: DataSet): + workflow_1 = WorkflowUpsert( + external_id=f"integration_test-workflow_1_{random_string(5)}", + description="This workflow is for testing purposes", + data_set_id=data_set.id, + ) + workflow_2 = WorkflowUpsert( + external_id=f"integration_test-workflow_2_{random_string(5)}", + description="This workflow is for testing purposes", + ) + for workflow in [workflow_1, workflow_2]: + cognite_client.workflows.upsert(workflow) + yield cognite_client.workflows.list() + cognite_client.workflows.delete([workflow_1.external_id, workflow_2.external_id], ignore_unknown_ids=True) + assert cognite_client.workflows.retrieve(workflow_1.external_id) is None + assert cognite_client.workflows.retrieve(workflow_2.external_id) is None + + +@pytest.fixture +def new_workflow_version(cognite_client: CogniteClient, new_workflow: Workflow): + version = WorkflowVersionUpsert( + workflow_external_id=new_workflow.external_id, + version="1", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id=f"{new_workflow.external_id}-1-task1", + parameters=CDFTaskParameters( + resource_path="/timeseries", + method="GET", + ), + ), + ], + ), + ) + yield cognite_client.workflows.versions.upsert(version) + cognite_client.workflows.versions.delete((new_workflow.external_id, version.version), ignore_unknown_ids=True) + assert cognite_client.workflows.versions.retrieve(new_workflow.external_id, version.version) is None + + +@pytest.fixture +def async_workflow_version(cognite_client: CogniteClient, new_workflow: Workflow): + version = WorkflowVersionUpsert( + workflow_external_id=new_workflow.external_id, + version="1", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id=f"{new_workflow.external_id}-1-multiply", + parameters=FunctionTaskParameters( + external_id="non-existing-function-async-resolve", + data={"a": 3, "b": 4}, + is_async_complete=True, + ), + timeout=120, + retries=2, + ), + ], + ), + ) + yield cognite_client.workflows.versions.upsert(version) + cognite_client.workflows.versions.delete((new_workflow.external_id, version.version), ignore_unknown_ids=True) + assert cognite_client.workflows.versions.retrieve(new_workflow.external_id, version.version) is None + + +@pytest.fixture +def workflow_version_list(cognite_client: CogniteClient, new_workflow: Workflow): + version_1 = WorkflowVersionUpsert( + workflow_external_id=new_workflow.external_id, + version="1", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id=f"{new_workflow.external_id}-1-task1", + parameters=CDFTaskParameters( + resource_path="/timeseries", + method="GET", + ), + ) + ], + ), + ) + version_2 = WorkflowVersionUpsert( + workflow_external_id=new_workflow.external_id, + version="2", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id="subworkflow1", + parameters=SubworkflowTaskParameters( + tasks=[ + WorkflowTask( + external_id="s1-task1", + parameters=FunctionTaskParameters( + external_id="non-existing-function", + data={"a": 3, "b": 4}, + is_async_complete=True, + ), + timeout=120, + retries=2, + ), + WorkflowTask( + external_id="s1-task2", + parameters=TransformationTaskParameters( + external_id="non-existing-transformation", + ), + ), + ] + ), + ), + WorkflowTask( + external_id="task1", + parameters=SubworkflowReferenceParameters( + workflow_external_id=new_workflow.external_id, + version="1", + ), + depends_on=["subworkflow1"], + ), + ], + ), + ) + for version in [version_1, version_2]: + cognite_client.workflows.versions.upsert(version) + yield cognite_client.workflows.versions.list(workflow_version_ids=new_workflow.external_id) + cognite_client.workflows.versions.delete( + [(new_workflow.external_id, version_1.version), (new_workflow.external_id, version_2.version)], + ignore_unknown_ids=True, + ) + assert cognite_client.workflows.versions.retrieve(new_workflow.external_id, version_1.version) is None + assert cognite_client.workflows.versions.retrieve(new_workflow.external_id, version_2.version) is None + + +@pytest.fixture +def workflow_execution_list(cognite_client: CogniteClient, new_workflow_version: WorkflowVersion): + run_1 = cognite_client.workflows.executions.run( + new_workflow_version.workflow_external_id, + new_workflow_version.version, + input={"a": 5, "b": 6}, + metadata={"test": "integration_completed"}, + ) + total_sleep = 0.0 + while run_1.status == "running" and total_sleep < 30: + time.sleep(0.5) + total_sleep += 0.5 + run_1 = cognite_client.workflows.executions.retrieve_detailed(run_1.id).as_execution() + + run_2 = cognite_client.workflows.executions.run( + new_workflow_version.workflow_external_id, + new_workflow_version.version, + input={"a": 5, "b": 6}, + metadata={"test": "integration_cancelled"}, + ) + run_2 = cognite_client.workflows.executions.cancel(id=run_2.id, reason="test cancel") + return WorkflowExecutionList([run_1, run_2]) + + +@pytest.fixture(scope="class") +def workflow_setup_pre_trigger(cognite_client: CogniteClient): + workflow = WorkflowUpsert( + external_id=f"integration_test-workflow_{random_string(5)}", + ) + cognite_client.workflows.upsert(workflow) + version = WorkflowVersionUpsert( + workflow_external_id=workflow.external_id, + version="1", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id=f"{workflow.external_id}-1-task1", + parameters=CDFTaskParameters( + resource_path="/timeseries", + method="GET", + ), + ), + ], + ), + ) + cognite_client.workflows.versions.upsert(version) + yield version + cognite_client.workflows.versions.delete((workflow.external_id, version.version), ignore_unknown_ids=True) + assert cognite_client.workflows.versions.retrieve(workflow.external_id, version.version) is None + cognite_client.workflows.delete(workflow.external_id, ignore_unknown_ids=True) + assert cognite_client.workflows.retrieve(workflow.external_id) is None + + +@pytest.fixture(scope="class") +def workflow_scheduled_trigger(cognite_client: CogniteClient, workflow_setup_pre_trigger: WorkflowVersion): + version = workflow_setup_pre_trigger + trigger = cognite_client.workflows.triggers.upsert( + WorkflowTriggerUpsert( + external_id=f"scheduled-trigger_{version.workflow_external_id}", + trigger_rule=WorkflowScheduledTriggerRule(cron_expression="* * * * *"), + workflow_external_id=version.workflow_external_id, + workflow_version=version.version, + input={"a": 1, "b": 2}, + metadata={"test": "integration_schedule"}, + ) + ) + # have to sleep until workflow is triggered because it's the only way to properly test get_trigger_run_history + now = datetime.now() + seconds_til_next_minute = 60 - now.second + 5 + time.sleep(seconds_til_next_minute) + + yield trigger + cognite_client.workflows.triggers.delete(trigger.external_id) + assert cognite_client.workflows.retrieve(trigger.external_id) is None + + +@pytest.fixture(scope="class") +def workflow_data_modeling_trigger(cognite_client: CogniteClient, workflow_setup_pre_trigger: WorkflowVersion): + version = workflow_setup_pre_trigger + trigger = cognite_client.workflows.triggers.create( + WorkflowTriggerUpsert( + external_id=f"data-modeling-trigger_{version.workflow_external_id}", + trigger_rule=WorkflowDataModelingTriggerRule( + data_modeling_query=WorkflowTriggerDataModelingQuery( + with_={"timeseries": NodeResultSetExpression()}, + select={ + "timeseries": Select( + sources=[SourceSelector(ViewId("cdf_cdm", "CogniteTimeSeries", "v1"), ["name"])] + ) + }, + ), + batch_size=500, + batch_timeout=300, + ), + workflow_external_id=version.workflow_external_id, + workflow_version=version.version, + ) + ) + yield trigger + cognite_client.workflows.triggers.delete(trigger.external_id) + assert cognite_client.workflows.retrieve(trigger.external_id) is None + + +class TestWorkflows: + def test_upsert_preexisting(self, cognite_client: CogniteClient, new_workflow: Workflow) -> None: + new_workflow.description = "Updated description for testing purposes" + updated_workflow = cognite_client.workflows.upsert(new_workflow.as_write()) + + assert updated_workflow.external_id == new_workflow.external_id + assert updated_workflow.description == new_workflow.description + assert updated_workflow.data_set_id == new_workflow.data_set_id + + def test_delete_multiple_non_existing_raise(self, cognite_client: CogniteClient, new_workflow: Workflow) -> None: + with pytest.raises(CogniteAPIError, match="workflows were not found"): + cognite_client.workflows.delete( + [new_workflow.external_id, "integration_test-non_existing_workflow"], ignore_unknown_ids=False + ) + assert cognite_client.workflows.retrieve(new_workflow.external_id) is not None + + def test_delete_multiple_non_existing(self, cognite_client: CogniteClient, new_workflow: Workflow) -> None: + cognite_client.workflows.delete( + [new_workflow.external_id, "integration_test-non_existing_workflow"], ignore_unknown_ids=True + ) + assert cognite_client.workflows.retrieve(new_workflow.external_id) is None + + def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_list: WorkflowList) -> None: + retrieved = cognite_client.workflows.retrieve(workflow_list[0].external_id) + assert retrieved == workflow_list[0] + + def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> None: + non_existing = cognite_client.workflows.retrieve("integration_test-non_existing_workflow") + assert non_existing is None + + def test_list_workflows(self, cognite_client: CogniteClient, workflow_list: WorkflowList) -> None: + listed = cognite_client.workflows.list(limit=-1) + assert len(listed) >= len(workflow_list) + assert workflow_list._external_id_to_item.keys() <= listed._external_id_to_item.keys() + + +class TestWorkflowVersions: + def test_upsert_preexisting(self, cognite_client: CogniteClient, new_workflow_version: WorkflowVersion) -> None: + new_workflow_version.workflow_definition.description = "Updated description for testing purposes" + updated_version = cognite_client.workflows.versions.upsert(new_workflow_version.as_write()) + + assert updated_version.workflow_external_id == new_workflow_version.workflow_external_id + assert updated_version.version == new_workflow_version.version + assert updated_version.workflow_definition.description == new_workflow_version.workflow_definition.description + + def test_list_workflow_versions( + self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList + ) -> None: + wf_xid = workflow_version_list[0].workflow_external_id + listed_by_wf_xid = cognite_client.workflows.versions.list(wf_xid) + listed_by_wf_version_id = cognite_client.workflows.versions.list(WorkflowVersionId(wf_xid)) + listed_by_as_ids = cognite_client.workflows.versions.list(workflow_version_list.as_ids()) + + ids_tuples = [wid.as_primitive() for wid in workflow_version_list.as_ids()] + listed_by_tuples = cognite_client.workflows.versions.list(ids_tuples) + + unittest.TestCase().assertCountEqual(workflow_version_list, listed_by_wf_xid) + unittest.TestCase().assertCountEqual(listed_by_wf_xid, listed_by_wf_version_id) + unittest.TestCase().assertCountEqual(listed_by_wf_version_id, listed_by_as_ids) + unittest.TestCase().assertCountEqual(listed_by_as_ids, listed_by_tuples) + + listed_limit = cognite_client.workflows.versions.list(limit=1) + assert len(listed_limit) == 1 + + def test_delete_non_existing_raise( + self, cognite_client: CogniteClient, new_workflow_version: WorkflowVersion + ) -> None: + with pytest.raises(CogniteAPIError, match="not found"): + cognite_client.workflows.versions.delete( + [ + (new_workflow_version.workflow_external_id, new_workflow_version.version), + (new_workflow_version.workflow_external_id, "non_existing_version"), + ], + ignore_unknown_ids=False, + ) + assert cognite_client.workflows.versions.retrieve(*new_workflow_version.as_id().as_primitive()) is not None + + def test_delete_non_existing(self, cognite_client: CogniteClient, new_workflow_version: WorkflowVersion) -> None: + cognite_client.workflows.versions.delete( + [ + (new_workflow_version.workflow_external_id, new_workflow_version.version), + (new_workflow_version.workflow_external_id, "non_existing_version"), + ], + ignore_unknown_ids=True, + ) + assert cognite_client.workflows.versions.retrieve(*new_workflow_version.as_id().as_primitive()) is None + + def test_retrieve_workflow(self, cognite_client: CogniteClient, new_workflow_version: WorkflowVersion) -> None: + retrieved = cognite_client.workflows.versions.retrieve(*new_workflow_version.as_id().as_primitive()) + assert retrieved == new_workflow_version + + def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> None: + non_existing = cognite_client.workflows.versions.retrieve("integration_test-non_existing_workflow", "1") + assert non_existing is None + + +class TestWorkflowExecutions: + def test_list_workflow_executions( + self, + cognite_client: CogniteClient, + workflow_execution_list: WorkflowExecutionList, + ) -> None: + listed = cognite_client.workflows.executions.list( + workflow_version_ids=workflow_execution_list[0].as_workflow_id() + ) + + unittest.TestCase().assertCountEqual(listed, workflow_execution_list) + + def test_list_workflow_executions_by_status( + self, + cognite_client: CogniteClient, + workflow_execution_list: WorkflowExecutionList, + ) -> None: + listed_completed = cognite_client.workflows.executions.list(statuses=["completed", "terminated"]) + for execution in listed_completed: + assert execution.status in ["completed", "terminated"] + + def test_retrieve_workflow_execution_detailed( + self, + cognite_client: CogniteClient, + workflow_execution_list: WorkflowExecutionList, + ) -> None: + retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list[0].id) + assert retrieved.as_execution().dump() == workflow_execution_list[0].dump() + assert retrieved.executed_tasks + assert retrieved.metadata == {"test": "integration_completed"} + + def test_retrieve_non_existing_workflow_execution(self, cognite_client: CogniteClient) -> None: + non_existing = cognite_client.workflows.executions.retrieve_detailed( + "integration_test-non_existing_workflow_execution" + ) + + assert non_existing is None + + def test_trigger_retrieve_detailed_update_async_task( + self, + cognite_client: CogniteClient, + async_workflow_version: WorkflowVersion, + ) -> None: + workflow_execution = cognite_client.workflows.executions.run( + async_workflow_version.workflow_external_id, + async_workflow_version.version, + ) + + async_task = async_workflow_version.workflow_definition.tasks[0] + assert isinstance(async_task.parameters, FunctionTaskParameters) + assert async_task.parameters.is_async_complete + + workflow_execution_detailed = cognite_client.workflows.executions.retrieve_detailed(workflow_execution.id) + async_task = workflow_execution_detailed.executed_tasks[0] + + async_task = cognite_client.workflows.tasks.update(async_task.id, "completed") + assert async_task.status == "completed" + time.sleep(5) + assert cognite_client.workflows.executions.retrieve_detailed(workflow_execution.id).status == "completed" + + def test_trigger_cancel_retry_workflow( + self, cognite_client: CogniteClient, new_workflow_version: WorkflowVersion + ) -> None: + workflow_execution = cognite_client.workflows.executions.run( + new_workflow_version.workflow_external_id, + new_workflow_version.version, + ) + + cancelled_workflow_execution = cognite_client.workflows.executions.cancel( + id=workflow_execution.id, reason="test" + ) + assert cancelled_workflow_execution.status == "terminated" + assert cancelled_workflow_execution.reason_for_incompletion == "test" + + retried_workflow_execution = cognite_client.workflows.executions.retry(workflow_execution.id) + assert retried_workflow_execution.status == "running" + + +class TestWorkflowTriggers: + def test_create_update_preexisting_scheduled_trigger( + self, + cognite_client: CogniteClient, + workflow_scheduled_trigger: WorkflowTrigger, + ) -> None: + assert workflow_scheduled_trigger is not None + assert workflow_scheduled_trigger.external_id.startswith("scheduled-trigger_integration_test-workflow") + assert workflow_scheduled_trigger.trigger_rule == WorkflowScheduledTriggerRule(cron_expression="* * * * *") + assert workflow_scheduled_trigger.workflow_external_id.startswith("integration_test-workflow_") + assert workflow_scheduled_trigger.workflow_version == "1" + assert workflow_scheduled_trigger.input == {"a": 1, "b": 2} + assert workflow_scheduled_trigger.metadata == {"test": "integration_schedule"} + assert workflow_scheduled_trigger.created_time is not None + assert workflow_scheduled_trigger.last_updated_time is not None + + updated_trigger = cognite_client.workflows.triggers.upsert( + WorkflowTriggerUpsert( + external_id=workflow_scheduled_trigger.external_id, + trigger_rule=WorkflowScheduledTriggerRule(cron_expression="0 * * * *"), + workflow_external_id=workflow_scheduled_trigger.workflow_external_id, + workflow_version=workflow_scheduled_trigger.workflow_version, + input=workflow_scheduled_trigger.input, + ) + ) + assert updated_trigger is not None + assert updated_trigger.external_id == workflow_scheduled_trigger.external_id + assert updated_trigger.trigger_rule == WorkflowScheduledTriggerRule(cron_expression="0 * * * *") + assert updated_trigger.workflow_external_id == workflow_scheduled_trigger.workflow_external_id + assert updated_trigger.workflow_version == workflow_scheduled_trigger.workflow_version + assert updated_trigger.input == workflow_scheduled_trigger.input + assert updated_trigger.metadata == {} + assert updated_trigger.created_time == workflow_scheduled_trigger.created_time + assert updated_trigger.last_updated_time > workflow_scheduled_trigger.last_updated_time + + def test_create_update_delete_data_modeling_trigger( + self, + cognite_client: CogniteClient, + workflow_data_modeling_trigger: WorkflowTrigger, + ) -> None: + assert workflow_data_modeling_trigger is not None + assert workflow_data_modeling_trigger.external_id.startswith("data-modeling-trigger_integration_test-workflow") + assert workflow_data_modeling_trigger.trigger_rule == WorkflowDataModelingTriggerRule( + data_modeling_query=WorkflowTriggerDataModelingQuery( + with_={"timeseries": NodeResultSetExpression()}, + select={ + "timeseries": Select( + sources=[SourceSelector(ViewId("cdf_cdm", "CogniteTimeSeries", "v1"), ["name"])] + ) + }, + ), + batch_size=500, + batch_timeout=300, + ) + assert workflow_data_modeling_trigger.workflow_external_id.startswith("integration_test-workflow_") + assert workflow_data_modeling_trigger.workflow_version == "1" + assert workflow_data_modeling_trigger.created_time is not None + assert workflow_data_modeling_trigger.last_updated_time is not None + updated_trigger = cognite_client.workflows.triggers.upsert( + WorkflowTriggerUpsert( + external_id=workflow_data_modeling_trigger.external_id, + trigger_rule=WorkflowDataModelingTriggerRule( + data_modeling_query=WorkflowTriggerDataModelingQuery( + with_={"timeseries": NodeResultSetExpression()}, + select={ + "timeseries": Select( + sources=[SourceSelector(ViewId("cdf_cdm", "CogniteTimeSeries", "v1"), ["name"])] + ) + }, + ), + batch_size=100, + batch_timeout=100, + ), + workflow_external_id=workflow_data_modeling_trigger.workflow_external_id, + workflow_version=workflow_data_modeling_trigger.workflow_version, + ) + ) + assert updated_trigger is not None + assert updated_trigger.external_id == workflow_data_modeling_trigger.external_id + assert updated_trigger.trigger_rule == WorkflowDataModelingTriggerRule( + data_modeling_query=WorkflowTriggerDataModelingQuery( + with_={"timeseries": NodeResultSetExpression()}, + select={ + "timeseries": Select( + sources=[SourceSelector(ViewId("cdf_cdm", "CogniteTimeSeries", "v1"), ["name"])] + ) + }, + ), + batch_size=100, + batch_timeout=100, + ) + assert updated_trigger.workflow_external_id == workflow_data_modeling_trigger.workflow_external_id + assert updated_trigger.workflow_version == workflow_data_modeling_trigger.workflow_version + assert updated_trigger.created_time == workflow_data_modeling_trigger.created_time + assert updated_trigger.last_updated_time > workflow_data_modeling_trigger.last_updated_time + + def test_trigger_list( + self, + cognite_client: CogniteClient, + workflow_scheduled_trigger: WorkflowTrigger, + workflow_data_modeling_trigger: WorkflowTrigger, + ) -> None: + listed = cognite_client.workflows.triggers.get_triggers(limit=-1) + assert workflow_scheduled_trigger.external_id in listed._external_id_to_item + assert workflow_data_modeling_trigger.external_id in listed._external_id_to_item + + def test_trigger_run_history( + self, + cognite_client: CogniteClient, + workflow_scheduled_trigger: WorkflowTrigger, + ) -> None: + history = cognite_client.workflows.triggers.get_trigger_run_history( + external_id=workflow_scheduled_trigger.external_id + ) + assert len(history) > 0 + assert history[0].external_id == workflow_scheduled_trigger.external_id + assert history[0].workflow_external_id == workflow_scheduled_trigger.workflow_external_id + assert history[0].workflow_version == workflow_scheduled_trigger.workflow_version + + detailed = cognite_client.workflows.executions.retrieve_detailed(history[0].workflow_execution_id) + assert detailed is not None + assert detailed.metadata == workflow_scheduled_trigger.metadata + # version gets appended to input when executed by a trigger + workflow_scheduled_trigger.input["version"] = workflow_scheduled_trigger.workflow_version + assert detailed.input == workflow_scheduled_trigger.input + + def test_trigger_run_history_non_existing( + self, + cognite_client: CogniteClient, + ) -> None: + with pytest.raises(CogniteAPIError, match="Workflow trigger not found."): + cognite_client.workflows.triggers.get_trigger_run_history( + external_id="integration_test-non_existing_trigger" + )