From 34a3338ee9556a3aa13571c3fc50cac18a03dec0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Fri, 22 Nov 2024 14:31:23 +0100 Subject: [PATCH] Workflow ergonomics (#2040) --- CHANGELOG.md | 16 + cognite/client/_api/workflows.py | 436 ++++++++++++++--------- cognite/client/_api_client.py | 4 +- cognite/client/_version.py | 2 +- cognite/client/data_classes/workflows.py | 86 +++-- docs/source/data_workflows.rst | 73 ++-- pyproject.toml | 2 +- 7 files changed, 382 insertions(+), 237 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eef519bb6..8628c5484 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,22 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [7.68.0] - 2024-11-22 +### Added +- New methods: `WorkflowTriggerAPI.[list, list_runs]` +- `WorkflowAPI.upsert` now supports upserting multiple. +- `WorkflowAPI.retrieve` now supports retrieving multiple. +- `WorkflowVersionAPI.upsert` now supports upserting multiple. +- `WorkflowVersionAPI.retrieve` now supports retrieving multiple. +- `WorkflowTriggerAPI.delete` now supports deleting multiple. +- Missing field `last_updated_time` for `Workflow`. +- Missing fields `last_updated_time` and `created_time` for `WorkflowVersion`. +### Deprecated +- `WorkflowTriggerAPI.get_triggers` is now deprecated in favor of `WorkflowTriggerAPI.list` +- `WorkflowTriggerAPI.get_trigger_run_history` is now deprecated in favor of `WorkflowTriggerAPI.list_runs` +### Fixed +- Listing the history of (workflow trigger) runs now work as expected for all external_ids (properly URL encoded). + ## [7.67.4] - 2024-11-21 ### Fixed - Creating a `CogniteClient` no longer gives a `UserWarning` for private link projects. diff --git a/cognite/client/_api/workflows.py b/cognite/client/_api/workflows.py index fec80b618..4d9272bbc 100644 --- a/cognite/client/_api/workflows.py +++ b/cognite/client/_api/workflows.py @@ -1,7 +1,7 @@ from __future__ import annotations import warnings -from collections.abc import Iterator, MutableSequence +from collections.abc import Iterator, MutableSequence, Sequence from typing import TYPE_CHECKING, Any, Literal, TypeAlias, overload from cognite.client._api_client import APIClient @@ -28,12 +28,14 @@ WorkflowVersionUpsert, ) from cognite.client.exceptions import CogniteAPIError -from cognite.client.utils._auxiliary import interpolate_and_url_encode +from cognite.client.utils._auxiliary import at_least_one_is_not_none, interpolate_and_url_encode, split_into_chunks +from cognite.client.utils._concurrency import execute_tasks from cognite.client.utils._identifier import ( IdentifierSequence, WorkflowVersionIdentifierSequence, ) from cognite.client.utils._session import create_session_and_return_nonce +from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr if TYPE_CHECKING: @@ -55,6 +57,10 @@ def wrap_workflow_ids( class WorkflowTriggerAPI(APIClient): _RESOURCE_PATH = "/workflows/triggers" + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._DELETE_LIMIT = 1 + def upsert( self, workflow_trigger: WorkflowTriggerUpsert, @@ -88,6 +94,7 @@ def upsert( ... ) Create or update a data modeling trigger for a workflow: + >>> from cognite.client.data_classes.workflows import WorkflowDataModelingTriggerRule, WorkflowTriggerDataModelingQuery >>> from cognite.client.data_classes.data_modeling.query import NodeResultSetExpression, Select, SourceSelector >>> from cognite.client.data_classes.data_modeling import ViewId @@ -123,39 +130,17 @@ def upsert( ) return WorkflowTrigger._load(response.json().get("items")[0]) - # TODO: remove method and associated data classes in next release + # TODO: remove method and associated data classes in next major release def create( self, workflow_trigger: WorkflowTriggerCreate, client_credentials: ClientCredentials | dict | None = None, ) -> WorkflowTrigger: - """`[DEPRECATED] Create or update a trigger for a workflow. `_ - - This method is deprecated, use '.upsert' instead. It will be completely removed October 2024. - - Args: - workflow_trigger (WorkflowTriggerCreate): The workflow trigger specification. - client_credentials (ClientCredentials | dict | None): Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials. + """Create or update a trigger for a workflow. - Returns: - WorkflowTrigger: The created or updated workflow trigger specification. - - Examples: + .. admonition:: Deprecation Warning - Create or update a scheduled trigger for a workflow: - - >>> from cognite.client import CogniteClient - >>> from cognite.client.data_classes.workflows import WorkflowTriggerCreate, WorkflowScheduledTriggerRule - >>> client = CogniteClient() - >>> client.workflows.triggers.create( - ... WorkflowTriggerCreate( - ... external_id="my_trigger", - ... trigger_rule=WorkflowScheduledTriggerRule(cron_expression="0 0 * * *"), - ... workflow_external_id="my_workflow", - ... workflow_version="1", - ... input={"a": 1, "b": 2}, - ... ) - ... ) + This method is deprecated, use '.upsert' instead. It will be removed in the next major version. """ warnings.warn( "This method is deprecated, use '.upsert' instead. It will be removed in the next major release.", @@ -163,14 +148,11 @@ def create( ) return self.upsert(workflow_trigger, client_credentials) - def delete( - self, - external_id: str, - ) -> None: - """`Delete a trigger for a workflow. `_ + def delete(self, external_id: str | SequenceNotStr[str]) -> None: + """`Delete one or more triggers for a workflow. `_ Args: - external_id (str): The external id of the trigger to delete. + external_id (str | SequenceNotStr[str]): The external id(s) of the trigger(s) to delete. Examples: @@ -179,31 +161,45 @@ def delete( >>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> client.workflows.triggers.delete("my_trigger") + + Delete a list of triggers: + + >>> client.workflows.triggers.delete(["my_trigger", "another_trigger"]) + """ + self._delete_multiple( + identifiers=IdentifierSequence.load(external_ids=external_id), + wrap_ids=True, + ) + + def get_triggers(self, limit: int | None = DEFAULT_LIMIT_READ) -> WorkflowTriggerList: + """List the workflow triggers. + + .. admonition:: Deprecation Warning + + This method is deprecated, use '.list' instead. It will be removed in the next major version. """ - self._post( - url_path=self._RESOURCE_PATH + "/delete", - json={"items": [{"externalId": external_id}]}, + warnings.warn( + "The 'get_triggers' method is deprecated, use 'list' instead. It will be removed in the next major release.", + UserWarning, ) + return self.list(limit) - def get_triggers( - self, - limit: int = DEFAULT_LIMIT_READ, - ) -> WorkflowTriggerList: - """`Retrieve the trigger list. `_ + def list(self, limit: int | None = DEFAULT_LIMIT_READ) -> WorkflowTriggerList: + """`List the workflow triggers. `_ Args: - limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None to return all items. + limit (int | None): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None to return all items. Returns: - WorkflowTriggerList: The trigger list. + WorkflowTriggerList: The list of triggers. Examples: - Get all triggers: + List all triggers: >>> from cognite.client import CogniteClient >>> client = CogniteClient() - >>> res = client.workflows.triggers.get_triggers() + >>> res = client.workflows.triggers.list(limit=None) """ return self._list( method="GET", @@ -214,15 +210,26 @@ def get_triggers( ) def get_trigger_run_history( - self, - external_id: str, - limit: int = DEFAULT_LIMIT_READ, + self, external_id: str, limit: int | None = DEFAULT_LIMIT_READ ) -> WorkflowTriggerRunList: - """`List the history of runs for a trigger. `_ + """List the history of runs for a trigger. + + .. admonition:: Deprecation Warning + + This method is deprecated, use '.list_runs' instead. It will be removed in the next major version. + """ + warnings.warn( + "The 'get_trigger_run_history' method is deprecated, use 'list_runs' instead. It will be removed in the next major release.", + UserWarning, + ) + return self.list_runs(external_id, limit) + + def list_runs(self, external_id: str, limit: int | None = DEFAULT_LIMIT_READ) -> WorkflowTriggerRunList: + """`List the history of runs for a trigger. `_ Args: external_id (str): The external id of the trigger to list runs for. - limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None to return all items. + limit (int | None): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None to return all items. Returns: WorkflowTriggerRunList: The requested trigger runs. @@ -233,11 +240,11 @@ def get_trigger_run_history( >>> from cognite.client import CogniteClient >>> client = CogniteClient() - >>> res = client.workflows.triggers.get_trigger_run_history("my_trigger") + >>> res = client.workflows.triggers.list_runs("my_trigger", limit=None) """ return self._list( method="GET", - url_path=self._RESOURCE_PATH + f"/{external_id}/history", + url_path=interpolate_and_url_encode(self._RESOURCE_PATH + "/{}/history", external_id), resource_cls=WorkflowTriggerRun, list_cls=WorkflowTriggerRunList, limit=limit, @@ -264,13 +271,13 @@ def update( Examples: - Update task with UUID '000560bc-9080-4286-b242-a27bb4819253' to status 'completed': + Update task with id '000560bc-9080-4286-b242-a27bb4819253' to status 'completed': >>> from cognite.client import CogniteClient >>> client = CogniteClient() >>> res = client.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "completed") - Update task with UUID '000560bc-9080-4286-b242-a27bb4819253' to status 'failed' with output '{"a": 1, "b": 2}': + Update task with id '000560bc-9080-4286-b242-a27bb4819253' to status 'failed' with output '{"a": 1, "b": 2}': >>> from cognite.client import CogniteClient >>> client = CogniteClient() @@ -288,10 +295,7 @@ def update( body: dict[str, Any] = {"status": status.upper()} if output is not None: body["output"] = output - response = self._post( - url_path=f"{self._RESOURCE_PATH}/{task_id}/update", - json=body, - ) + response = self._post(url_path=f"{self._RESOURCE_PATH}/{task_id}/update", json=body) return WorkflowTaskExecution.load(response.json()) @@ -309,7 +313,7 @@ def retrieve_detailed(self, id: str) -> WorkflowExecutionDetailed | None: Examples: - Retrieve workflow execution with UUID '000560bc-9080-4286-b242-a27bb4819253': + Retrieve workflow execution with id '000560bc-9080-4286-b242-a27bb4819253': >>> from cognite.client import CogniteClient >>> client = CogniteClient() @@ -339,18 +343,11 @@ def trigger( metadata: dict | None = None, client_credentials: ClientCredentials | None = None, ) -> WorkflowExecution: - """`[DEPRECATED]Trigger a workflow execution. `_ + """Trigger a workflow execution. - This method is deprecated, use '.run' instead. It will be completely removed October 2024. + .. admonition:: Deprecation Warning - Args: - workflow_external_id (str): External id of the workflow. - version (str): Version of the workflow. - input (dict | None): The input to the workflow execution. This will be available for tasks that have specified it as an input with the string "${workflow.input}" See tip below for more information. - metadata (dict | None): Application specific metadata. Keys have a maximum length of 32 characters, values a maximum of 255, and there can be a maximum of 10 key-value pairs. - client_credentials (ClientCredentials | None): Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials. - Returns: - WorkflowExecution: No description. + This method is deprecated, use '.run' instead. It will be completely removed in the next major version. """ warnings.warn( "This methods has been deprecated, use '.run' instead. It will completely removed in the next major release.", @@ -375,7 +372,7 @@ def run( input (dict | None): The input to the workflow execution. This will be available for tasks that have specified it as an input with the string "${workflow.input}" See tip below for more information. metadata (dict | None): Application specific metadata. Keys have a maximum length of 32 characters, values a maximum of 255, and there can be a maximum of 10 key-value pairs. client_credentials (ClientCredentials | None): Specific credentials that should be used to trigger the workflow execution. When passed will take precedence over the current credentials. - nonce (str | None): The nonce to use to bind the session. If not provided, a new session will be created using the current credentials. + nonce (str | None): The nonce to use to bind the session. If not provided, a new session will be created using the given 'client_credentials'. If this is not given, the current credentials will be used. Tip: The workflow input can be available in the workflow tasks. For example, if you have a Task with @@ -386,7 +383,7 @@ def run( ... external_id="my_workflow-task1", ... parameters=FunctionTaskParameters( ... external_id="cdf_deployed_function:my_function", - ... data={"workflow_data": "${workflow.input}",})) + ... data={"workflow_data": "${workflow.input}"})) Tip: You can create a session via the Sessions API, using the client.iam.session.create() method. @@ -434,7 +431,7 @@ def list( created_time_start: int | None = None, created_time_end: int | None = None, statuses: WorkflowStatus | MutableSequence[WorkflowStatus] | None = None, - limit: int = DEFAULT_LIMIT_READ, + limit: int | None = DEFAULT_LIMIT_READ, ) -> WorkflowExecutionList: """`List workflow executions in the project. `_ @@ -443,8 +440,8 @@ def list( created_time_start (int | None): Filter out executions that was created before this time. Time is in milliseconds since epoch. created_time_end (int | None): Filter out executions that was created after this time. Time is in milliseconds since epoch. statuses (WorkflowStatus | MutableSequence[WorkflowStatus] | None): Workflow status or list of workflow statuses to filter on. - limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None - to return all items. + limit (int | None): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None to return all items. + Returns: WorkflowExecutionList: The requested workflow executions. @@ -456,14 +453,21 @@ def list( >>> client = CogniteClient() >>> res = client.workflows.executions.list(("my_workflow", "1")) - Get all workflow executions for workflows after last 24 hours: + Get all workflow executions from the last 24 hours: >>> from cognite.client import CogniteClient - >>> from datetime import datetime, timedelta + >>> from cognite.client.utils import timestamp_to_ms >>> client = CogniteClient() - >>> res = client.workflows.executions.list(created_time_start=int((datetime.now() - timedelta(days=1)).timestamp() * 1000)) + >>> res = client.workflows.executions.list( + ... created_time_start=timestamp_to_ms("1d-ago")) """ + # Passing at least one filter criterion is required: + if not at_least_one_is_not_none(workflow_version_ids, created_time_start, created_time_end, statuses): + raise ValueError( + "At least one of 'workflow_version_ids', 'created_time_start', " + "'created_time_end', 'statuses' must be provided." + ) filter_: dict[str, Any] = {} if workflow_version_ids is not None: filter_["workflowFilters"] = WorkflowIds.load(workflow_version_ids).dump( @@ -488,15 +492,16 @@ def list( ) def cancel(self, id: str, reason: str | None) -> WorkflowExecution: - """`cancel a workflow execution. `_ + """`Cancel a workflow execution. `_ + + Note: + Cancelling a workflow will immediately cancel the `in_progress` tasks, but not their spawned work in + other services (like transformations and functions). Args: id (str): The server-generated id of the workflow execution. reason (str | None): The reason for the cancellation, this will be put within the execution's `reasonForIncompletion` field. It is defaulted to 'cancelled' if not provided. - Note: - Cancelling a workflow will immediately cancel the `in_progress` tasks, but not their spawned work in - other services (like transformations and functions). Returns: WorkflowExecution: The canceled workflow execution. @@ -512,13 +517,8 @@ def cancel(self, id: str, reason: str | None) -> WorkflowExecution: """ response = self._post( url_path=f"{self._RESOURCE_PATH}/{id}/cancel", - json={ - "reason": reason, - } - if reason - else {}, + json={"reason": reason} if reason else {}, ) - return WorkflowExecution._load(response.json()) def retry(self, id: str, client_credentials: ClientCredentials | None = None) -> WorkflowExecution: @@ -555,6 +555,8 @@ class WorkflowVersionAPI(APIClient): def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) + self._CREATE_LIMIT = 1 + self._RETRIEVE_LIMIT = 1 self._DELETE_LIMIT = 100 @overload @@ -602,40 +604,48 @@ def __iter__(self) -> Iterator[WorkflowVersion]: """Iterate all over workflow versions""" return self() - def upsert(self, version: WorkflowVersionUpsert, mode: Literal["replace"] = "replace") -> WorkflowVersion: - """`Create a workflow version. `_ + @overload + def upsert(self, version: WorkflowVersionUpsert) -> WorkflowVersion: ... + + @overload + def upsert(self, version: Sequence[WorkflowVersionUpsert]) -> WorkflowVersionList: ... - Note this is an upsert endpoint, so if a workflow with the same version external id already exists, it will be updated. + def upsert( + self, version: WorkflowVersionUpsert | Sequence[WorkflowVersionUpsert], mode: Literal["replace"] = "replace" + ) -> WorkflowVersion | WorkflowVersionList: + """`Create one or more workflow version(s). `_ - Furthermore, if the workflow does not exist, it will be created. + Note this is an upsert endpoint, so workflow versions that already exist will be updated, and new ones will be created. Args: - version (WorkflowVersionUpsert): The workflow version to create or update. + version (WorkflowVersionUpsert | Sequence[WorkflowVersionUpsert]): The workflow version(s) to upsert. mode (Literal['replace']): This is not an option for the API, but is included here to document that the upserts are always done in replace mode. Returns: - WorkflowVersion: The created workflow version. + WorkflowVersion | WorkflowVersionList: The created workflow version(s). Examples: - Create workflow version with one Function task: + Create one workflow version with a single Function task: >>> from cognite.client import CogniteClient - >>> from cognite.client.data_classes import WorkflowVersionUpsert, WorkflowDefinitionUpsert, WorkflowTask, FunctionTaskParameters + >>> from cognite.client.data_classes import ( + ... WorkflowVersionUpsert, WorkflowDefinitionUpsert, + ... WorkflowTask, FunctionTaskParameters, + ... ) >>> client = CogniteClient() + >>> function_task = WorkflowTask( + ... external_id="my_workflow-task1", + ... parameters=FunctionTaskParameters( + ... external_id="my_fn_xid", + ... data={"a": 1, "b": 2}, + ... ), + ... ) >>> new_version = WorkflowVersionUpsert( ... workflow_external_id="my_workflow", ... version="1", ... workflow_definition=WorkflowDefinitionUpsert( - ... tasks=[ - ... WorkflowTask( - ... external_id="my_workflow-task1", - ... parameters=FunctionTaskParameters( - ... external_id="cdf_deployed_function:my_function", - ... data={"a": 1, "b": 2}, - ... ), - ... ) - ... ], + ... tasks=[function_task], ... description="This workflow has one step", ... ), ... ) @@ -644,12 +654,14 @@ def upsert(self, version: WorkflowVersionUpsert, mode: Literal["replace"] = "rep if mode != "replace": raise ValueError("Only replace mode is supported for upserting workflow versions.") - response = self._post( - url_path=self._RESOURCE_PATH, - json={"items": [version.dump(camel_case=True)]}, - ) + assert_type(version, "workflow version", [WorkflowVersionUpsert, Sequence]) - return WorkflowVersion._load(response.json()["items"][0]) + return self._create_multiple( + list_cls=WorkflowVersionList, + resource_cls=WorkflowVersion, + items=version, + input_resource_cls=WorkflowVersionUpsert, + ) def delete( self, @@ -685,45 +697,102 @@ def delete( wrap_ids=True, ) - def retrieve(self, workflow_external_id: str, version: str) -> WorkflowVersion | None: + def retrieve( + self, + workflow_external_id: WorkflowVersionIdentifier | Sequence[WorkflowVersionIdentifier] | WorkflowIds | str, + version: str | None = None, + *, + ignore_unknown_ids: bool = False, + ) -> WorkflowVersion | WorkflowVersionList | None: """`Retrieve a workflow version. `_ Args: - workflow_external_id (str): External id of the workflow. - version (str): Version of the workflow. + workflow_external_id (WorkflowVersionIdentifier | Sequence[WorkflowVersionIdentifier] | WorkflowIds | str): External id of the workflow. + version (str | None): Version of the workflow. + ignore_unknown_ids (bool): When requesting multiple, whether to ignore external IDs that are not found rather than throwing an exception. Returns: - WorkflowVersion | None: The requested workflow version if it exists, None otherwise. + WorkflowVersion | WorkflowVersionList | None: If a single identifier is specified: the requested workflow version, or None if it does not exist. If several ids are specified: the requested workflow versions. Examples: - Retrieve workflow version 1 of workflow my workflow: + Retrieve workflow version 'v1' of workflow "my_workflow": >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes import WorkflowVersionId >>> client = CogniteClient() - >>> res = client.workflows.versions.retrieve("my workflow", "1") + >>> res = client.workflows.versions.retrieve(WorkflowVersionId("my_workflow", "v1")) + + Retrieve multiple workflow versions and ignore unknown: + + >>> res = client.workflows.versions.retrieve( + ... [WorkflowVersionId("my_workflow", "v1"), WorkflowVersionId("other", "v3.2")], + ... ignore_unknown_ids=True, + ... ) + >>> # A sequence of tuples is also accepted: + >>> res = client.workflows.versions.retrieve([("my_workflow", "v1"), ("other", "v3.2")]) + + DEPRECATED: You can also pass workflow_external_id and version as separate arguments: + + >>> res = client.workflows.versions.retrieve("my_workflow", "v1") + """ - try: - response = self._get( - url_path=interpolate_and_url_encode("/workflows/{}/versions/{}", workflow_external_id, version) - ) - except CogniteAPIError as e: - if e.code == 404: - return None - raise e + match workflow_external_id, version: + case str(), str(): + warnings.warn( + "This usage is deprecated, please pass one or more `WorkflowVersionId` instead.'", + DeprecationWarning, + ) + workflow_external_id = WorkflowVersionId(workflow_external_id, version) + case str(), None: + raise TypeError( + "You must specify which 'version' of the workflow to retrieve. Deprecation Warning: This usage is deprecated, please pass " + "one or more `WorkflowVersionId` instead." + ) + case WorkflowVersionId() | Sequence(), str(): + warnings.warn("Argument 'version' is ignored when passing one or more 'WorkflowVersionId'", UserWarning) + + # We can not use _retrieve_multiple as the backend doesn't support 'ignore_unknown_ids': + def get_single(wf_xid: WorkflowVersionId, ignore_missing: bool = ignore_unknown_ids) -> WorkflowVersion | None: + try: + response = self._get( + url_path=interpolate_and_url_encode("/workflows/{}/versions/{}", *wf_xid.as_tuple()) + ) + return WorkflowVersion._load(response.json()) + except CogniteAPIError as e: + if ignore_missing and e.code == 404: + return None + raise + + # WorkflowVersionId doesn't require 'version' to be given, so we raise in case it is missing: + given_wf_ids = WorkflowIds.load(workflow_external_id) + if any(wf_id.version is None for wf_id in given_wf_ids): + raise ValueError("Version must be specified for all workflow version IDs.") + + is_single = ( + isinstance(workflow_external_id, WorkflowVersionId) + or isinstance(workflow_external_id, tuple) + and len(given_wf_ids) == 1 + ) + if is_single: + return get_single(given_wf_ids[0], ignore_missing=True) - return WorkflowVersion._load(response.json()) + # Not really a point in splitting into chunks when chunk_size is 1, but... + tasks = list(map(tuple, split_into_chunks(given_wf_ids, self._RETRIEVE_LIMIT))) + tasks_summary = execute_tasks(get_single, tasks=tasks, max_workers=self._config.max_workers, fail_fast=True) + tasks_summary.raise_compound_exception_if_failed_tasks() + return WorkflowVersionList(list(filter(None, tasks_summary.results)), cognite_client=self._cognite_client) def list( self, workflow_version_ids: WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None = None, - limit: int = DEFAULT_LIMIT_READ, + limit: int | None = DEFAULT_LIMIT_READ, ) -> WorkflowVersionList: """`List workflow versions in the project `_ Args: workflow_version_ids (WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None): Workflow version id or list of workflow version ids to filter on. - limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None + limit (int | None): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None Returns: WorkflowVersionList: The requested workflow versions. @@ -741,13 +810,15 @@ def list( >>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import WorkflowVersionId >>> client = CogniteClient() - >>> res = client.workflows.versions.list([WorkflowVersionId("my_workflow"), WorkflowVersionId("my_workflow_2")]) + >>> res = client.workflows.versions.list( + ... [WorkflowVersionId("my_workflow"), WorkflowVersionId("my_workflow_2")]) Get all workflow versions for workflows 'my_workflow' version '1' and 'my_workflow_2' version '2' using tuples: >>> from cognite.client import CogniteClient >>> client = CogniteClient() - >>> res = client.workflows.versions.list([("my_workflow", "1"), ("my_workflow_2", "2")]) + >>> res = client.workflows.versions.list( + ... [("my_workflow", "1"), ("my_workflow_2", "2")]) """ return self._list( @@ -773,6 +844,8 @@ def __init__( self.executions = WorkflowExecutionAPI(config, api_version, cognite_client) self.tasks = WorkflowTaskAPI(config, api_version, cognite_client) self.triggers = WorkflowTriggerAPI(config, api_version, cognite_client) + self._RETRIEVE_LIMIT = 1 + self._CREATE_LIMIT = 1 self._DELETE_LIMIT = 100 @overload @@ -802,63 +875,107 @@ def __iter__(self) -> Iterator[Workflow]: """Iterate all over workflows""" return self() - def upsert(self, workflow: WorkflowUpsert, mode: Literal["replace"] = "replace") -> Workflow: - """`Create a workflow. `_ + @overload + def upsert(self, workflow: WorkflowUpsert) -> Workflow: ... - Note this is an upsert endpoint, so if a workflow with the same external id already exists, it will be updated. + @overload + def upsert(self, workflow: Sequence[WorkflowUpsert]) -> WorkflowList: ... + + def upsert( + self, workflow: WorkflowUpsert | Sequence[WorkflowUpsert], mode: Literal["replace"] = "replace" + ) -> Workflow | WorkflowList: + """`Create one or more workflow(s). `_ + + Note this is an upsert endpoint, so workflows that already exist will be updated, and new ones will be created. Args: - workflow (WorkflowUpsert): The workflow to create or update. + workflow (WorkflowUpsert | Sequence[WorkflowUpsert]): The workflow(s) to upsert. mode (Literal['replace']): This is not an option for the API, but is included here to document that the upserts are always done in replace mode. Returns: - Workflow: The created workflow. + Workflow | WorkflowList: The created workflow(s). Examples: - Create workflow my workflow: + Create one workflow with external id "my_workflow": >>> from cognite.client import CogniteClient >>> from cognite.client.data_classes import WorkflowUpsert >>> client = CogniteClient() - >>> res = client.workflows.upsert(WorkflowUpsert(external_id="my workflow", description="my workflow description")) + >>> wf = WorkflowUpsert(external_id="my_workflow", description="my workflow description") + >>> res = client.workflows.upsert(wf) + + Create multiple workflows: + + >>> wf2 = WorkflowUpsert(external_id="other", data_set_id=123) + >>> res = client.workflows.upsert([wf, wf2]) """ if mode != "replace": raise ValueError("Only replace mode is supported for upserting workflows.") - response = self._post( - url_path=self._RESOURCE_PATH, - json={"items": [workflow.dump(camel_case=True)]}, + assert_type(workflow, "workflow", [WorkflowUpsert, Sequence]) + + return self._create_multiple( + list_cls=WorkflowList, + resource_cls=Workflow, + items=workflow, + input_resource_cls=WorkflowUpsert, ) - return Workflow._load(response.json()["items"][0]) - def retrieve(self, external_id: str) -> Workflow | None: - """`Retrieve a workflow. `_ + @overload + def retrieve(self, external_id: str) -> Workflow | None: ... + + @overload + def retrieve(self, external_id: SequenceNotStr[str]) -> WorkflowList: ... + + def retrieve( + self, external_id: str | SequenceNotStr[str], ignore_unknown_ids: bool = False + ) -> Workflow | WorkflowList | None: + """`Retrieve one or more workflows. `_ Args: - external_id (str): Identifier for a Workflow. Must be unique for the project. + external_id (str | SequenceNotStr[str]): Identifier (or sequence of identifiers) for a Workflow. Must be unique. + ignore_unknown_ids (bool): When requesting multiple workflows, whether to ignore external IDs that are not found rather than throwing an exception. Returns: - Workflow | None: The requested workflow if it exists, None otherwise. + Workflow | WorkflowList | None: If a single external ID is specified: the requested workflow, or None if it does not exist. If several external IDs are specified: the requested workflows. Examples: - Retrieve workflow my workflow: + Retrieve workflow with external ID "my_workflow": >>> from cognite.client import CogniteClient >>> client = CogniteClient() - >>> res = client.workflows.retrieve("my workflow") + >>> workflow = client.workflows.retrieve("my_workflow") + + Retrieve multiple workflows: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> workflow_list = client.workflows.retrieve(["foo", "bar"]) """ - try: - response = self._get(url_path=interpolate_and_url_encode("/workflows/{}", external_id)) - except CogniteAPIError as e: - if e.code == 404: - return None - raise e - return Workflow._load(response.json()) + + # We can not use _retrieve_multiple as the backend doesn't support 'ignore_unknown_ids': + def get_single(xid: str, ignore_missing: bool = ignore_unknown_ids) -> Workflow | None: + try: + response = self._get(url_path=interpolate_and_url_encode("/workflows/{}", xid)) + return Workflow._load(response.json()) + except CogniteAPIError as e: + if ignore_missing and e.code == 404: + return None + raise + + if isinstance(external_id, str): + return get_single(external_id, ignore_missing=True) + + # Not really a point in splitting into chunks when chunk_size is 1, but... + tasks = list(map(tuple, split_into_chunks(external_id, self._RETRIEVE_LIMIT))) + tasks_summary = execute_tasks(get_single, tasks=tasks, max_workers=self._config.max_workers, fail_fast=True) + tasks_summary.raise_compound_exception_if_failed_tasks() + return WorkflowList(list(filter(None, tasks_summary.results)), cognite_client=self._cognite_client) def delete(self, external_id: str | SequenceNotStr[str], ignore_unknown_ids: bool = False) -> None: - """`Delete one or more workflows with versions. `_ + """`Delete one or more workflows with versions. `_ Args: external_id (str | SequenceNotStr[str]): External id or list of external ids to delete. @@ -866,11 +983,11 @@ def delete(self, external_id: str | SequenceNotStr[str], ignore_unknown_ids: boo Examples: - Delete workflow my workflow: + Delete workflow with external_id "my_workflow": >>> from cognite.client import CogniteClient >>> client = CogniteClient() - >>> client.workflows.delete("my workflow") + >>> client.workflows.delete("my_workflow") """ self._delete_multiple( identifiers=IdentifierSequence.load(external_ids=external_id), @@ -879,12 +996,13 @@ def delete(self, external_id: str | SequenceNotStr[str], ignore_unknown_ids: boo ) def list(self, limit: int | None = DEFAULT_LIMIT_READ) -> WorkflowList: - """`List all workflows in the project. `_ + """`List workflows in the project. `_ Args: limit (int | None): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None + Returns: - WorkflowList: All workflows in the CDF project. + WorkflowList: Workflows in the CDF project. Examples: @@ -892,7 +1010,7 @@ def list(self, limit: int | None = DEFAULT_LIMIT_READ) -> WorkflowList: >>> from cognite.client import CogniteClient >>> client = CogniteClient() - >>> res = client.workflows.list() + >>> res = client.workflows.list(limit=None) """ return self._list( method="GET", diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index ec99cd224..df93f600f 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -968,10 +968,10 @@ def _delete_multiple( returns_items: bool = False, executor: TaskExecutor | None = None, ) -> list | None: - resource_path = resource_path or self._RESOURCE_PATH + resource_path = (resource_path or self._RESOURCE_PATH) + "/delete" tasks = [ { - "url_path": resource_path + "/delete", + "url_path": resource_path, "json": { "items": chunk.as_dicts() if wrap_ids else chunk.as_primitives(), **(extra_body_fields or {}), diff --git a/cognite/client/_version.py b/cognite/client/_version.py index ca8a7d2df..690b6aa1d 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "7.67.4" +__version__ = "7.68.0" __api_subversion__ = "20230101" diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index a3e78ab78..836c7a264 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -1,5 +1,6 @@ from __future__ import annotations +import warnings from abc import ABC, abstractmethod from collections import UserList from collections.abc import Collection, Sequence @@ -75,11 +76,12 @@ def as_write(self) -> WorkflowUpsert: class Workflow(WorkflowCore): """ - This class represents a workflow. This is the reading version, used when reading or listing a workflows. + This class represents a workflow. This is the reading version, used when reading or listing workflows. Args: external_id (str): The external ID provided by the client. Must be unique for the resource type. created_time (int): The time when the workflow was created. Unix timestamp in milliseconds. + last_updated_time (int): The time when the workflow was last updated. Unix timestamp in milliseconds. description (str | None): Description of the workflow. Defaults to None. data_set_id (int | None): The id of the data set this workflow belongs to. """ @@ -88,11 +90,13 @@ def __init__( self, external_id: str, created_time: int, + last_updated_time: int, description: str | None = None, data_set_id: int | None = None, ) -> None: super().__init__(external_id, description, data_set_id) self.created_time = created_time + self.last_updated_time = last_updated_time @classmethod def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> Self: @@ -100,6 +104,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> S external_id=resource["externalId"], description=resource.get("description"), created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], data_set_id=resource.get("dataSetId"), ) @@ -214,7 +219,7 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> F return cls( external_id=function["externalId"], data=function.get("data"), - # API uses isAsyncComplete and asyncComplete inconsistently: + # TODO: Fix: API uses isAsyncComplete and asyncComplete inconsistently: is_async_complete=resource.get("isAsyncComplete", resource.get("asyncComplete")), ) @@ -229,7 +234,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: "function": function, } if self.is_async_complete is not None: - output[("isAsyncComplete" if camel_case else "is_async_complete")] = self.is_async_complete + output["isAsyncComplete" if camel_case else "is_async_complete"] = self.is_async_complete return output @@ -915,6 +920,8 @@ class WorkflowVersion(WorkflowVersionCore): workflow_external_id (str): The external ID of the workflow. version (str): The version of the workflow. workflow_definition (WorkflowDefinition): The workflow definition of the workflow version. + created_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds. + last_updated_time (int): The number of milliseconds since 00:00:00 Thursday, 1 January 1970, Coordinated Universal Time (UTC), minus leap seconds. """ def __init__( @@ -922,12 +929,13 @@ def __init__( workflow_external_id: str, version: str, workflow_definition: WorkflowDefinition, + created_time: int, + last_updated_time: int, ) -> None: - super().__init__( - workflow_external_id=workflow_external_id, - version=version, - ) + super().__init__(workflow_external_id=workflow_external_id, version=version) self.workflow_definition = workflow_definition + self.created_time = created_time + self.last_updated_time = last_updated_time @classmethod def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> WorkflowVersion: @@ -936,13 +944,17 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W workflow_external_id=resource["workflowExternalId"], version=resource["version"], workflow_definition=WorkflowDefinition._load(workflow_definition), + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], ) def dump(self, camel_case: bool = True) -> dict[str, Any]: return { - ("workflowExternalId" if camel_case else "workflow_external_id"): self.workflow_external_id, + "workflowExternalId" if camel_case else "workflow_external_id": self.workflow_external_id, "version": self.version, - ("workflowDefinition" if camel_case else "workflow_definition"): self.workflow_definition.dump(camel_case), + "workflowDefinition" if camel_case else "workflow_definition": self.workflow_definition.dump(camel_case), + "createdTime" if camel_case else "created_time": self.created_time, + "lastUpdatedTime" if camel_case else "last_updated_time": self.last_updated_time, } def as_write(self) -> WorkflowVersionUpsert: @@ -1019,10 +1031,7 @@ def __init__( self.metadata = metadata def as_workflow_id(self) -> WorkflowVersionId: - return WorkflowVersionId( - workflow_external_id=self.workflow_external_id, - version=self.version, - ) + return WorkflowVersionId(workflow_external_id=self.workflow_external_id, version=self.version) @classmethod def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> WorkflowExecution: @@ -1156,11 +1165,18 @@ class WorkflowVersionId: workflow_external_id: str version: str | None = None - def as_primitive(self) -> tuple[str, str | None]: + def as_tuple(self) -> tuple[str, str | None]: return self.workflow_external_id, self.version + def as_primitive(self) -> tuple[str, str | None]: + warnings.warn( + "as_primitive() is deprecated, use as_tuple instead. Will be removed in the next major release.", + DeprecationWarning, + ) + return self.as_tuple() + @classmethod - def load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> WorkflowVersionId: + def load(cls, resource: dict) -> Self: if "workflowExternalId" in resource: workflow_external_id = resource["workflowExternalId"] elif "externalId" in resource: @@ -1168,25 +1184,20 @@ def load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> Wo else: raise ValueError("Invalid input to WorkflowVersionId.load") - return cls( - workflow_external_id=workflow_external_id, - version=resource.get("version"), - ) + return cls(workflow_external_id=workflow_external_id, version=resource.get("version")) def dump(self, camel_case: bool = True, as_external_id_key: bool = False) -> dict[str, Any]: if as_external_id_key: - output: dict[str, Any] = {("externalId" if camel_case else "external_id"): self.workflow_external_id} + output: dict[str, Any] = {"externalId" if camel_case else "external_id": self.workflow_external_id} else: - output = {("workflowExternalId" if camel_case else "workflow_external_id"): self.workflow_external_id} + output = {"workflowExternalId" if camel_case else "workflow_external_id": self.workflow_external_id} if self.version: output["version"] = self.version return output class WorkflowIds(UserList): - """ - This class represents a list of Workflow Version Identifiers. - """ + """This class represents a list of Workflow Version Identifiers.""" def __init__(self, workflow_ids: Collection[WorkflowVersionId]) -> None: for workflow_id in workflow_ids: @@ -1197,8 +1208,11 @@ def __init__(self, workflow_ids: Collection[WorkflowVersionId]) -> None: ) super().__init__(workflow_ids) + def as_tuples(self) -> list[tuple[str, str | None]]: + return [wid.as_tuple() for wid in self] + @classmethod - def load(cls, resource: Any, cognite_client: CogniteClient | None = None) -> WorkflowIds: + def load(cls, resource: Any) -> Self: workflow_ids: Sequence[WorkflowVersionId] if isinstance(resource, tuple) and len(resource) == 2 and all(isinstance(x, str) for x in resource): workflow_ids = [WorkflowVersionId(*resource)] @@ -1208,14 +1222,18 @@ def load(cls, resource: Any, cognite_client: CogniteClient | None = None) -> Wor workflow_ids = [WorkflowVersionId(workflow_external_id=resource)] elif isinstance(resource, dict): workflow_ids = [WorkflowVersionId.load(resource)] - elif isinstance(resource, Sequence) and resource and isinstance(resource[0], tuple): - workflow_ids = [WorkflowVersionId(*x) for x in resource] - elif isinstance(resource, Sequence) and resource and isinstance(resource[0], WorkflowVersionId): - workflow_ids = resource - elif isinstance(resource, Sequence) and resource and isinstance(resource[0], str): - workflow_ids = [WorkflowVersionId(workflow_external_id=x) for x in resource] + elif isinstance(resource, Sequence) and resource: + workflow_ids = [] + for wf in resource: + match wf: + case tuple(): + workflow_ids.append(WorkflowVersionId(*wf)) + case str(): + workflow_ids.append(WorkflowVersionId(workflow_external_id=wf)) + case _: + workflow_ids.append(wf) else: - raise ValueError("Invalid input to WorkflowIds") + raise ValueError("Invalid input to WorkflowIds.load") return cls(workflow_ids) def dump(self, camel_case: bool = True, as_external_id: bool = False) -> list[dict[str, Any]]: @@ -1572,8 +1590,6 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> W class WorkflowTriggerRunList(CogniteResourceList[WorkflowTriggerRun], ExternalIDTransformerMixin): - """ - This class represents a list of workflow trigger runs. - """ + """This class represents a list of workflow trigger runs.""" _RESOURCE = WorkflowTriggerRun diff --git a/docs/source/data_workflows.rst b/docs/source/data_workflows.rst index 70b1eedfe..8c5c19b5b 100644 --- a/docs/source/data_workflows.rst +++ b/docs/source/data_workflows.rst @@ -3,34 +3,30 @@ Data Workflows Workflows ------------ -Upsert Workflow +Upsert Workflows ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowAPI.upsert -Delete Workflow(s) -^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowAPI.delete - -Retrieve Workflow -^^^^^^^^^^^^^^^^^ +Retrieve Workflows +^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowAPI.retrieve List Workflows ^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowAPI.list +Delete Workflows +^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowAPI.delete + Workflow Versions ------------------ -Upsert Workflow Version +Upsert Workflow Versions ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.upsert -Delete Workflow Version(s) -^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.delete - -Retrieve Workflow Version +Retrieve Workflow Versions ^^^^^^^^^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.retrieve @@ -38,24 +34,24 @@ List Workflow Versions ^^^^^^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.list +Delete Workflow Versions +^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.delete + Workflow Executions -------------------- -List Workflow Executions -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.list - -Retrieve Detailed Workflow Execution -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.retrieve_detailed - Run Workflow Execution ^^^^^^^^^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.run -Trigger Workflow Execution -^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.trigger +Retrieve detailed Workflow Execution +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.retrieve_detailed + +List Workflow Executions +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.list Cancel Workflow Execution ^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -65,33 +61,32 @@ Retry Workflow Execution ^^^^^^^^^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.retry + Workflow Tasks ------------------ -Update Status of Async Task +Update status of async Task ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowTaskAPI.update + Workflow Triggers ------------------- -Create or update triggers for workflow executions -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Upsert Trigger +^^^^^^^^^^^^^^ .. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.upsert -Create triggers for workflow executions -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.create +List Triggers +^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.list -Delete triggers for workflow executions -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.delete +List runs for a Trigger +^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.list_runs -Get triggers for workflow executions -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.get_triggers +Delete Triggers +^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.delete -Get trigger run history for a workflow trigger -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. automethod:: cognite.client._api.workflows.WorkflowTriggerAPI.get_trigger_run_history Data Workflows data classes ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/pyproject.toml b/pyproject.toml index fd494e1c8..e4e41ce1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "7.67.4" +version = "7.68.0" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com"