From a50067bcec4aa29d9b1ce11f3180b02d9fd7f37c Mon Sep 17 00:00:00 2001 From: Bert Verstraete Date: Wed, 5 Jun 2024 11:19:10 +0200 Subject: [PATCH] [CDF-21657] Add status filter to workflow executions list (#1793) Allows filtering data workflow executions by status --- CHANGELOG.md | 4 ++++ cognite/client/_api/workflows.py | 9 +++++++- cognite/client/_version.py | 2 +- cognite/client/data_classes/workflows.py | 22 ++++++++++--------- pyproject.toml | 2 +- .../test_api/test_data_workflows.py | 17 ++++++++++++++ 6 files changed, 43 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13680eb016..2d02abbe14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [7.49.0] - 2024-06-05 +### Added +- `WorkfowExecutionAPI.list` now allows filtering by execution status. + ## [7.48.1] - 2024-06-04 ### Fixed - A bug introduced in `7.45.0` that would short-circuit raw datapoint queries too early when a lot of time series was diff --git a/cognite/client/_api/workflows.py b/cognite/client/_api/workflows.py index 8403efdca8..16339fea2b 100644 --- a/cognite/client/_api/workflows.py +++ b/cognite/client/_api/workflows.py @@ -14,6 +14,7 @@ WorkflowExecutionList, WorkflowIds, WorkflowList, + WorkflowStatus, WorkflowTaskExecution, WorkflowUpsert, WorkflowVersion, @@ -191,6 +192,7 @@ def list( workflow_version_ids: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None = None, created_time_start: int | None = None, created_time_end: int | None = None, + statuses: WorkflowStatus | MutableSequence[WorkflowStatus] | None = None, limit: int = DEFAULT_LIMIT_READ, ) -> WorkflowExecutionList: """`List workflow executions in the project. `_ @@ -199,9 +201,9 @@ def list( workflow_version_ids (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None): Workflow version id or list of workflow version ids to filter on. created_time_start (int | None): Filter out executions that was created before this time. Time is in milliseconds since epoch. created_time_end (int | None): Filter out executions that was created after this time. Time is in milliseconds since epoch. + statuses (WorkflowStatus | MutableSequence[WorkflowStatus] | None): Workflow status or list of workflow statuses to filter on. limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None to return all items. - Returns: WorkflowExecutionList: The requested workflow executions. @@ -230,6 +232,11 @@ def list( filter_["createdTimeStart"] = created_time_start if created_time_end is not None: filter_["createdTimeEnd"] = created_time_end + if statuses is not None: + if isinstance(statuses, MutableSequence): + filter_["status"] = [status.upper() for status in statuses] + else: # Assume it is a stringy type + filter_["status"] = [statuses.upper()] return self._list( method="POST", diff --git a/cognite/client/_version.py b/cognite/client/_version.py index 351b8967d8..77306f4221 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "7.48.1" +__version__ = "7.49.0" __api_subversion__ = "20230101" diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 0db5355a60..2e18f9a7ff 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -21,7 +21,7 @@ if TYPE_CHECKING: from cognite.client import CogniteClient -WorkflowStatus: TypeAlias = Literal[ +TaskStatus: TypeAlias = Literal[ "in_progress", "cancelled", "failed", @@ -32,6 +32,8 @@ "skipped", ] +WorkflowStatus: TypeAlias = Literal["completed", "failed", "running", "terminated", "timed_out"] + class WorkflowCore(WriteableCogniteResource["WorkflowUpsert"], ABC): def __init__(self, external_id: str, description: str | None) -> None: @@ -617,7 +619,7 @@ class WorkflowTaskExecution(CogniteObject): Args: id (str): The server generated id of the task execution. external_id (str): The external ID provided by the client. Must be unique for the resource type. - status (WorkflowStatus): The status of the task execution. + status (TaskStatus): The status of the task execution. input (WorkflowTaskParameters): The input parameters of the task execution. output (WorkflowTaskOutput): The output of the task execution. version (str | None): The version of the task execution. Defaults to None. @@ -630,7 +632,7 @@ def __init__( self, id: str, external_id: str, - status: WorkflowStatus, + status: TaskStatus, input: WorkflowTaskParameters, output: WorkflowTaskOutput, version: str | None = None, @@ -657,7 +659,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W return cls( id=resource["id"], external_id=resource["externalId"], - status=cast(WorkflowStatus, to_snake_case(resource["status"])), + status=cast(TaskStatus, to_snake_case(resource["status"])), input=WorkflowTaskParameters.load_parameters(resource), output=WorkflowTaskOutput.load_output(resource), version=resource.get("version"), @@ -940,7 +942,7 @@ class WorkflowExecution(CogniteResource): Args: id (str): The server generated id of the workflow execution. workflow_external_id (str): The external ID of the workflow. - status (Literal["running", "completed", "failed", "timed_out", "terminated", "paused"]): The status of the workflow execution. + status (WorkflowStatus): The status of the workflow execution. created_time (int): The time when the workflow execution was created. Unix timestamp in milliseconds. version (str | None): The version of the workflow. Defaults to None. start_time (int | None): The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None. @@ -953,7 +955,7 @@ def __init__( self, id: str, workflow_external_id: str, - status: Literal["running", "completed", "failed", "timed_out", "terminated", "paused"], + status: WorkflowStatus, created_time: int, version: str | None = None, start_time: int | None = None, @@ -984,7 +986,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W workflow_external_id=resource["workflowExternalId"], version=resource.get("version"), status=cast( - Literal["running", "completed", "failed", "timed_out", "terminated", "paused"], + WorkflowStatus, to_snake_case(resource["status"]), ), created_time=resource["createdTime"], @@ -1014,7 +1016,7 @@ class WorkflowExecutionDetailed(WorkflowExecution): id (str): The server generated id of the workflow execution. workflow_external_id (str): The external ID of the workflow. workflow_definition (WorkflowDefinition): The workflow definition of the workflow. - status (Literal["running", "completed", "failed", "timed_out", "terminated", "paused"]): The status of the workflow execution. + status (WorkflowStatus): The status of the workflow execution. executed_tasks (list[WorkflowTaskExecution]): The executed tasks of the workflow execution. created_time (int): The time when the workflow execution was created. Unix timestamp in milliseconds. version (str | None): The version of the workflow. Defaults to None. @@ -1030,7 +1032,7 @@ def __init__( id: str, workflow_external_id: str, workflow_definition: WorkflowDefinition, - status: Literal["running", "completed", "failed", "timed_out", "terminated", "paused"], + status: WorkflowStatus, executed_tasks: list[WorkflowTaskExecution], created_time: int, version: str | None = None, @@ -1055,7 +1057,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W workflow_external_id=resource["workflowExternalId"], version=resource.get("version"), status=cast( - Literal["running", "completed", "failed", "timed_out", "terminated", "paused"], + WorkflowStatus, to_snake_case(resource["status"]), ), created_time=resource["createdTime"], diff --git a/pyproject.toml b/pyproject.toml index 9b9ac3cb03..9d2d6dd4d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "7.48.1" +version = "7.49.0" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/tests/tests_integration/test_api/test_data_workflows.py b/tests/tests_integration/test_api/test_data_workflows.py index 197bd5e10e..355e1e4b17 100644 --- a/tests/tests_integration/test_api/test_data_workflows.py +++ b/tests/tests_integration/test_api/test_data_workflows.py @@ -370,6 +370,23 @@ def test_list_workflow_executions( assert len(listed) == len(workflow_execution_list) assert all(w.as_workflow_id() in workflow_ids for w in listed) + def test_list_workflow_executions_by_status( + self, + cognite_client: CogniteClient, + add_multiply_workflow: WorkflowVersion, + ) -> None: + listed_completed = cognite_client.workflows.executions.list( + workflow_version_ids=add_multiply_workflow.as_id(), statuses="completed", limit=3 + ) + for execution in listed_completed: + assert execution.status == "completed" + + listed_others = cognite_client.workflows.executions.list( + workflow_version_ids=add_multiply_workflow.as_id(), statuses=["running", "failed"], limit=3 + ) + for execution in listed_others: + assert execution.status in ["running", "failed"] + def test_retrieve_workflow_execution_detailed( self, cognite_client: CogniteClient,