Skip to content

Commit

Permalink
Workflow ergonomics (#2040)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt authored Nov 22, 2024
1 parent b751b9b commit 34a3338
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 237 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.68.0] - 2024-11-22
### Added
- New methods: `WorkflowTriggerAPI.[list, list_runs]`
- `WorkflowAPI.upsert` now supports upserting multiple.
- `WorkflowAPI.retrieve` now supports retrieving multiple.
- `WorkflowVersionAPI.upsert` now supports upserting multiple.
- `WorkflowVersionAPI.retrieve` now supports retrieving multiple.
- `WorkflowTriggerAPI.delete` now supports deleting multiple.
- Missing field `last_updated_time` for `Workflow`.
- Missing fields `last_updated_time` and `created_time` for `WorkflowVersion`.
### Deprecated
- `WorkflowTriggerAPI.get_triggers` is now deprecated in favor of `WorkflowTriggerAPI.list`
- `WorkflowTriggerAPI.get_trigger_run_history` is now deprecated in favor of `WorkflowTriggerAPI.list_runs`
### Fixed
- Listing the history of (workflow trigger) runs now work as expected for all external_ids (properly URL encoded).

## [7.67.4] - 2024-11-21
### Fixed
- Creating a `CogniteClient` no longer gives a `UserWarning` for private link projects.
Expand Down
436 changes: 277 additions & 159 deletions cognite/client/_api/workflows.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions cognite/client/_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,10 @@ def _delete_multiple(
returns_items: bool = False,
executor: TaskExecutor | None = None,
) -> list | None:
resource_path = resource_path or self._RESOURCE_PATH
resource_path = (resource_path or self._RESOURCE_PATH) + "/delete"
tasks = [
{
"url_path": resource_path + "/delete",
"url_path": resource_path,
"json": {
"items": chunk.as_dicts() if wrap_ids else chunk.as_primitives(),
**(extra_body_fields or {}),
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.67.4"
__version__ = "7.68.0"
__api_subversion__ = "20230101"
86 changes: 51 additions & 35 deletions cognite/client/data_classes/workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from abc import ABC, abstractmethod
from collections import UserList
from collections.abc import Collection, Sequence
Expand Down Expand Up @@ -75,11 +76,12 @@ def as_write(self) -> WorkflowUpsert:

class Workflow(WorkflowCore):
"""
This class represents a workflow. This is the reading version, used when reading or listing a workflows.
This class represents a workflow. This is the reading version, used when reading or listing workflows.
Args:
external_id (str): The external ID provided by the client. Must be unique for the resource type.
created_time (int): The time when the workflow was created. Unix timestamp in milliseconds.
last_updated_time (int): The time when the workflow was last updated. Unix timestamp in milliseconds.
description (str | None): Description of the workflow. Defaults to None.
data_set_id (int | None): The id of the data set this workflow belongs to.
"""
Expand All @@ -88,18 +90,21 @@ def __init__(
self,
external_id: str,
created_time: int,
last_updated_time: int,
description: str | None = None,
data_set_id: int | None = None,
) -> None:
super().__init__(external_id, description, data_set_id)
self.created_time = created_time
self.last_updated_time = last_updated_time

@classmethod
def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> Self:
return cls(
external_id=resource["externalId"],
description=resource.get("description"),
created_time=resource["createdTime"],
last_updated_time=resource["lastUpdatedTime"],
data_set_id=resource.get("dataSetId"),
)

Expand Down Expand Up @@ -214,7 +219,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> F
return cls(
external_id=function["externalId"],
data=function.get("data"),
# API uses isAsyncComplete and asyncComplete inconsistently:
# TODO: Fix: API uses isAsyncComplete and asyncComplete inconsistently:
is_async_complete=resource.get("isAsyncComplete", resource.get("asyncComplete")),
)

Expand All @@ -229,7 +234,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
"function": function,
}
if self.is_async_complete is not None:
output[("isAsyncComplete" if camel_case else "is_async_complete")] = self.is_async_complete
output["isAsyncComplete" if camel_case else "is_async_complete"] = self.is_async_complete
return output


Expand Down Expand Up @@ -915,19 +920,22 @@ class WorkflowVersion(WorkflowVersionCore):
workflow_external_id (str): The external ID of the workflow.
version (str): The version of the workflow.
workflow_definition (WorkflowDefinition): The workflow definition of the workflow version.
created_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
last_updated_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds.
"""

def __init__(
self,
workflow_external_id: str,
version: str,
workflow_definition: WorkflowDefinition,
created_time: int,
last_updated_time: int,
) -> None:
super().__init__(
workflow_external_id=workflow_external_id,
version=version,
)
super().__init__(workflow_external_id=workflow_external_id, version=version)
self.workflow_definition = workflow_definition
self.created_time = created_time
self.last_updated_time = last_updated_time

@classmethod
def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> WorkflowVersion:
Expand All @@ -936,13 +944,17 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W
workflow_external_id=resource["workflowExternalId"],
version=resource["version"],
workflow_definition=WorkflowDefinition._load(workflow_definition),
created_time=resource["createdTime"],
last_updated_time=resource["lastUpdatedTime"],
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {
("workflowExternalId" if camel_case else "workflow_external_id"): self.workflow_external_id,
"workflowExternalId" if camel_case else "workflow_external_id": self.workflow_external_id,
"version": self.version,
("workflowDefinition" if camel_case else "workflow_definition"): self.workflow_definition.dump(camel_case),
"workflowDefinition" if camel_case else "workflow_definition": self.workflow_definition.dump(camel_case),
"createdTime" if camel_case else "created_time": self.created_time,
"lastUpdatedTime" if camel_case else "last_updated_time": self.last_updated_time,
}

def as_write(self) -> WorkflowVersionUpsert:
Expand Down Expand Up @@ -1019,10 +1031,7 @@ def __init__(
self.metadata = metadata

def as_workflow_id(self) -> WorkflowVersionId:
return WorkflowVersionId(
workflow_external_id=self.workflow_external_id,
version=self.version,
)
return WorkflowVersionId(workflow_external_id=self.workflow_external_id, version=self.version)

@classmethod
def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> WorkflowExecution:
Expand Down Expand Up @@ -1156,37 +1165,39 @@ class WorkflowVersionId:
workflow_external_id: str
version: str | None = None

def as_primitive(self) -> tuple[str, str | None]:
def as_tuple(self) -> tuple[str, str | None]:
return self.workflow_external_id, self.version

def as_primitive(self) -> tuple[str, str | None]:
warnings.warn(
"as_primitive() is deprecated, use as_tuple instead. Will be removed in the next major release.",
DeprecationWarning,
)
return self.as_tuple()

@classmethod
def load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> WorkflowVersionId:
def load(cls, resource: dict) -> Self:
if "workflowExternalId" in resource:
workflow_external_id = resource["workflowExternalId"]
elif "externalId" in resource:
workflow_external_id = resource["externalId"]
else:
raise ValueError("Invalid input to WorkflowVersionId.load")

return cls(
workflow_external_id=workflow_external_id,
version=resource.get("version"),
)
return cls(workflow_external_id=workflow_external_id, version=resource.get("version"))

def dump(self, camel_case: bool = True, as_external_id_key: bool = False) -> dict[str, Any]:
if as_external_id_key:
output: dict[str, Any] = {("externalId" if camel_case else "external_id"): self.workflow_external_id}
output: dict[str, Any] = {"externalId" if camel_case else "external_id": self.workflow_external_id}
else:
output = {("workflowExternalId" if camel_case else "workflow_external_id"): self.workflow_external_id}
output = {"workflowExternalId" if camel_case else "workflow_external_id": self.workflow_external_id}
if self.version:
output["version"] = self.version
return output


class WorkflowIds(UserList):
"""
This class represents a list of Workflow Version Identifiers.
"""
"""This class represents a list of Workflow Version Identifiers."""

def __init__(self, workflow_ids: Collection[WorkflowVersionId]) -> None:
for workflow_id in workflow_ids:
Expand All @@ -1197,8 +1208,11 @@ def __init__(self, workflow_ids: Collection[WorkflowVersionId]) -> None:
)
super().__init__(workflow_ids)

def as_tuples(self) -> list[tuple[str, str | None]]:
return [wid.as_tuple() for wid in self]

@classmethod
def load(cls, resource: Any, cognite_client: CogniteClient | None = None) -> WorkflowIds:
def load(cls, resource: Any) -> Self:
workflow_ids: Sequence[WorkflowVersionId]
if isinstance(resource, tuple) and len(resource) == 2 and all(isinstance(x, str) for x in resource):
workflow_ids = [WorkflowVersionId(*resource)]
Expand All @@ -1208,14 +1222,18 @@ def load(cls, resource: Any, cognite_client: CogniteClient | None = None) -> Wor
workflow_ids = [WorkflowVersionId(workflow_external_id=resource)]
elif isinstance(resource, dict):
workflow_ids = [WorkflowVersionId.load(resource)]
elif isinstance(resource, Sequence) and resource and isinstance(resource[0], tuple):
workflow_ids = [WorkflowVersionId(*x) for x in resource]
elif isinstance(resource, Sequence) and resource and isinstance(resource[0], WorkflowVersionId):
workflow_ids = resource
elif isinstance(resource, Sequence) and resource and isinstance(resource[0], str):
workflow_ids = [WorkflowVersionId(workflow_external_id=x) for x in resource]
elif isinstance(resource, Sequence) and resource:
workflow_ids = []
for wf in resource:
match wf:
case tuple():
workflow_ids.append(WorkflowVersionId(*wf))
case str():
workflow_ids.append(WorkflowVersionId(workflow_external_id=wf))
case _:
workflow_ids.append(wf)
else:
raise ValueError("Invalid input to WorkflowIds")
raise ValueError("Invalid input to WorkflowIds.load")
return cls(workflow_ids)

def dump(self, camel_case: bool = True, as_external_id: bool = False) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -1572,8 +1590,6 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W


class WorkflowTriggerRunList(CogniteResourceList[WorkflowTriggerRun], ExternalIDTransformerMixin):
"""
This class represents a list of workflow trigger runs.
"""
"""This class represents a list of workflow trigger runs."""

_RESOURCE = WorkflowTriggerRun
73 changes: 34 additions & 39 deletions docs/source/data_workflows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,55 @@ Data Workflows

Workflows
------------
Upsert Workflow
Upsert Workflows
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowAPI.upsert

Delete Workflow(s)
^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowAPI.delete

Retrieve Workflow
^^^^^^^^^^^^^^^^^
Retrieve Workflows
^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowAPI.retrieve

List Workflows
^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowAPI.list

Delete Workflows
^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowAPI.delete


Workflow Versions
------------------
Upsert Workflow Version
Upsert Workflow Versions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.upsert

Delete Workflow Version(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.delete

Retrieve Workflow Version
Retrieve Workflow Versions
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.retrieve

List Workflow Versions
^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.list

Delete Workflow Versions
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.delete


Workflow Executions
--------------------
List Workflow Executions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.list

Retrieve Detailed Workflow Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.retrieve_detailed

Run Workflow Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.run

Trigger Workflow Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.trigger
Retrieve detailed Workflow Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.retrieve_detailed

List Workflow Executions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.list

Cancel Workflow Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand All @@ -65,33 +61,32 @@ Retry Workflow Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.retry


Workflow Tasks
------------------
Update Status of Async Task
Update status of async Task
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTaskAPI.update


Workflow Triggers
-------------------
Create or update triggers for workflow executions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Upsert Trigger
^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.upsert

Create triggers for workflow executions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.create
List Triggers
^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.list

Delete triggers for workflow executions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.delete
List runs for a Trigger
^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.list_runs

Get triggers for workflow executions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.get_triggers
Delete Triggers
^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.delete

Get trigger run history for a workflow trigger
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.get_trigger_run_history

Data Workflows data classes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.67.4"
version = "7.68.0"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down

0 comments on commit 34a3338

Please sign in to comment.