Skip to content

Commit

Permalink
[CDF-21657] Add status filter to workflow executions list (#1793)
Browse files Browse the repository at this point in the history
Allows filtering data workflow executions by status
  • Loading branch information
VerstraeteBert authored Jun 5, 2024
1 parent d894006 commit a50067b
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 13 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.49.0] - 2024-06-05
### Added
- `WorkfowExecutionAPI.list` now allows filtering by execution status.

## [7.48.1] - 2024-06-04
### Fixed
- A bug introduced in `7.45.0` that would short-circuit raw datapoint queries too early when a lot of time series was
Expand Down
9 changes: 8 additions & 1 deletion cognite/client/_api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
WorkflowExecutionList,
WorkflowIds,
WorkflowList,
WorkflowStatus,
WorkflowTaskExecution,
WorkflowUpsert,
WorkflowVersion,
Expand Down Expand Up @@ -191,6 +192,7 @@ def list(
workflow_version_ids: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None = None,
created_time_start: int | None = None,
created_time_end: int | None = None,
statuses: WorkflowStatus | MutableSequence[WorkflowStatus] | None = None,
limit: int = DEFAULT_LIMIT_READ,
) -> WorkflowExecutionList:
"""`List workflow executions in the project. <https://api-docs.cognite.com/20230101-beta/tag/Workflow-executions/operation/ListWorkflowExecutions>`_
Expand All @@ -199,9 +201,9 @@ def list(
workflow_version_ids (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None): Workflow version id or list of workflow version ids to filter on.
created_time_start (int | None): Filter out executions that was created before this time. Time is in milliseconds since epoch.
created_time_end (int | None): Filter out executions that was created after this time. Time is in milliseconds since epoch.
statuses (WorkflowStatus | MutableSequence[WorkflowStatus] | None): Workflow status or list of workflow statuses to filter on.
limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None
to return all items.
Returns:
WorkflowExecutionList: The requested workflow executions.
Expand Down Expand Up @@ -230,6 +232,11 @@ def list(
filter_["createdTimeStart"] = created_time_start
if created_time_end is not None:
filter_["createdTimeEnd"] = created_time_end
if statuses is not None:
if isinstance(statuses, MutableSequence):
filter_["status"] = [status.upper() for status in statuses]
else: # Assume it is a stringy type
filter_["status"] = [statuses.upper()]

return self._list(
method="POST",
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.48.1"
__version__ = "7.49.0"
__api_subversion__ = "20230101"
22 changes: 12 additions & 10 deletions cognite/client/data_classes/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if TYPE_CHECKING:
from cognite.client import CogniteClient

WorkflowStatus: TypeAlias = Literal[
TaskStatus: TypeAlias = Literal[
"in_progress",
"cancelled",
"failed",
Expand All @@ -32,6 +32,8 @@
"skipped",
]

WorkflowStatus: TypeAlias = Literal["completed", "failed", "running", "terminated", "timed_out"]


class WorkflowCore(WriteableCogniteResource["WorkflowUpsert"], ABC):
def __init__(self, external_id: str, description: str | None) -> None:
Expand Down Expand Up @@ -617,7 +619,7 @@ class WorkflowTaskExecution(CogniteObject):
Args:
id (str): The server generated id of the task execution.
external_id (str): The external ID provided by the client. Must be unique for the resource type.
status (WorkflowStatus): The status of the task execution.
status (TaskStatus): The status of the task execution.
input (WorkflowTaskParameters): The input parameters of the task execution.
output (WorkflowTaskOutput): The output of the task execution.
version (str | None): The version of the task execution. Defaults to None.
Expand All @@ -630,7 +632,7 @@ def __init__(
self,
id: str,
external_id: str,
status: WorkflowStatus,
status: TaskStatus,
input: WorkflowTaskParameters,
output: WorkflowTaskOutput,
version: str | None = None,
Expand All @@ -657,7 +659,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W
return cls(
id=resource["id"],
external_id=resource["externalId"],
status=cast(WorkflowStatus, to_snake_case(resource["status"])),
status=cast(TaskStatus, to_snake_case(resource["status"])),
input=WorkflowTaskParameters.load_parameters(resource),
output=WorkflowTaskOutput.load_output(resource),
version=resource.get("version"),
Expand Down Expand Up @@ -940,7 +942,7 @@ class WorkflowExecution(CogniteResource):
Args:
id (str): The server generated id of the workflow execution.
workflow_external_id (str): The external ID of the workflow.
status (Literal["running", "completed", "failed", "timed_out", "terminated", "paused"]): The status of the workflow execution.
status (WorkflowStatus): The status of the workflow execution.
created_time (int): The time when the workflow execution was created. Unix timestamp in milliseconds.
version (str | None): The version of the workflow. Defaults to None.
start_time (int | None): The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.
Expand All @@ -953,7 +955,7 @@ def __init__(
self,
id: str,
workflow_external_id: str,
status: Literal["running", "completed", "failed", "timed_out", "terminated", "paused"],
status: WorkflowStatus,
created_time: int,
version: str | None = None,
start_time: int | None = None,
Expand Down Expand Up @@ -984,7 +986,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W
workflow_external_id=resource["workflowExternalId"],
version=resource.get("version"),
status=cast(
Literal["running", "completed", "failed", "timed_out", "terminated", "paused"],
WorkflowStatus,
to_snake_case(resource["status"]),
),
created_time=resource["createdTime"],
Expand Down Expand Up @@ -1014,7 +1016,7 @@ class WorkflowExecutionDetailed(WorkflowExecution):
id (str): The server generated id of the workflow execution.
workflow_external_id (str): The external ID of the workflow.
workflow_definition (WorkflowDefinition): The workflow definition of the workflow.
status (Literal["running", "completed", "failed", "timed_out", "terminated", "paused"]): The status of the workflow execution.
status (WorkflowStatus): The status of the workflow execution.
executed_tasks (list[WorkflowTaskExecution]): The executed tasks of the workflow execution.
created_time (int): The time when the workflow execution was created. Unix timestamp in milliseconds.
version (str | None): The version of the workflow. Defaults to None.
Expand All @@ -1030,7 +1032,7 @@ def __init__(
id: str,
workflow_external_id: str,
workflow_definition: WorkflowDefinition,
status: Literal["running", "completed", "failed", "timed_out", "terminated", "paused"],
status: WorkflowStatus,
executed_tasks: list[WorkflowTaskExecution],
created_time: int,
version: str | None = None,
Expand All @@ -1055,7 +1057,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W
workflow_external_id=resource["workflowExternalId"],
version=resource.get("version"),
status=cast(
Literal["running", "completed", "failed", "timed_out", "terminated", "paused"],
WorkflowStatus,
to_snake_case(resource["status"]),
),
created_time=resource["createdTime"],
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.48.1"
version = "7.49.0"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
17 changes: 17 additions & 0 deletions tests/tests_integration/test_api/test_data_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,23 @@ def test_list_workflow_executions(
assert len(listed) == len(workflow_execution_list)
assert all(w.as_workflow_id() in workflow_ids for w in listed)

def test_list_workflow_executions_by_status(
self,
cognite_client: CogniteClient,
add_multiply_workflow: WorkflowVersion,
) -> None:
listed_completed = cognite_client.workflows.executions.list(
workflow_version_ids=add_multiply_workflow.as_id(), statuses="completed", limit=3
)
for execution in listed_completed:
assert execution.status == "completed"

listed_others = cognite_client.workflows.executions.list(
workflow_version_ids=add_multiply_workflow.as_id(), statuses=["running", "failed"], limit=3
)
for execution in listed_others:
assert execution.status in ["running", "failed"]

def test_retrieve_workflow_execution_detailed(
self,
cognite_client: CogniteClient,
Expand Down

0 comments on commit a50067b

Please sign in to comment.