diff --git a/CHANGELOG.md b/CHANGELOG.md index f6fc000a55..f8fd57051c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [6.28.0] - 2023-09-26 +### Added +- Support for the WorkflowOrchestrationAPI with the implementation `client.workflows`. + ## [6.27.0] - 2023-09-13 ### Changed - Reduce concurrency in data modeling client to 1 diff --git a/cognite/client/_api/functions.py b/cognite/client/_api/functions.py index 304988cb15..6d605fd5a8 100644 --- a/cognite/client/_api/functions.py +++ b/cognite/client/_api/functions.py @@ -18,7 +18,6 @@ from cognite.client import utils from cognite.client._api_client import APIClient from cognite.client._constants import DEFAULT_LIMIT_READ -from cognite.client.credentials import OAuthClientCertificate from cognite.client.data_classes import ( ClientCredentials, Function, @@ -34,9 +33,9 @@ TimestampRange, ) from cognite.client.data_classes.functions import FunctionCallsFilter, FunctionsStatus -from cognite.client.exceptions import CogniteAuthError from cognite.client.utils._auxiliary import is_unlimited from cognite.client.utils._identifier import Identifier, IdentifierSequence +from cognite.client.utils._session import create_session_and_return_nonce if TYPE_CHECKING: from cognite.client import CogniteClient @@ -395,7 +394,7 @@ def call( """ identifier = IdentifierSequence.load(ids=id, external_ids=external_id).as_singleton()[0] id = _get_function_internal_id(self._cognite_client, identifier) - nonce = _create_session_and_return_nonce(self._cognite_client) + nonce = create_session_and_return_nonce(self._cognite_client, api_name="Functions API") if data is None: data = {} @@ -523,18 +522,6 @@ def status(self) -> FunctionsStatus: return FunctionsStatus._load(res.json()) -def _create_session_and_return_nonce( - client: CogniteClient, - client_credentials: dict | ClientCredentials | None = None, -) -> str: - if client_credentials is None: - if isinstance(client._config.credentials, OAuthClientCertificate): - raise CogniteAuthError("Client certificate credentials is not supported with the Functions API") - elif isinstance(client_credentials, dict): - client_credentials = ClientCredentials(client_credentials["client_id"], client_credentials["client_secret"]) - return client.iam.sessions.create(client_credentials).nonce - - def get_handle_function_node(file_path: Path) -> ast.FunctionDef | None: return next( ( @@ -1037,7 +1024,9 @@ def create( """ _get_function_identifier(function_id, function_external_id) - nonce = _create_session_and_return_nonce(self._cognite_client, client_credentials) + nonce = create_session_and_return_nonce( + self._cognite_client, api_name="Functions API", client_credentials=client_credentials + ) body: dict[str, list[dict[str, str | int | None | dict]]] = { "items": [ { diff --git a/cognite/client/_api/workflows.py b/cognite/client/_api/workflows.py new file mode 100644 index 0000000000..91c7488594 --- /dev/null +++ b/cognite/client/_api/workflows.py @@ -0,0 +1,548 @@ +from __future__ import annotations + +from abc import ABC +from typing import TYPE_CHECKING, Any, Literal, MutableSequence, Sequence, Tuple, Union +from warnings import warn + +from typing_extensions import TypeAlias + +from cognite.client._api_client import APIClient +from cognite.client._constants import DEFAULT_LIMIT_READ +from cognite.client.data_classes.workflows import ( + Workflow, + WorkflowExecution, + WorkflowExecutionDetailed, + WorkflowExecutionList, + WorkflowIds, + WorkflowList, + WorkflowTaskExecution, + WorkflowUpsert, + WorkflowVersion, + WorkflowVersionId, + WorkflowVersionList, + WorkflowVersionUpsert, +) +from cognite.client.exceptions import CogniteAPIError +from cognite.client.utils._identifier import ( + IdentifierSequence, + WorkflowVersionIdentifierSequence, +) +from cognite.client.utils._session import create_session_and_return_nonce + +if TYPE_CHECKING: + from cognite.client import ClientConfig, CogniteClient + + +class BetaWorkflowAPIClient(APIClient, ABC): + def __init__( + self, + config: ClientConfig, + api_version: str | None, + cognite_client: CogniteClient, + ) -> None: + super().__init__(config, api_version, cognite_client) + self._api_subversion = "beta" + + @staticmethod + def _experimental_warning() -> None: + warn( + "Workflow Orchestration endpoints are experimental and may be subject to breaking changes in future versions without notice.", + FutureWarning, + ) + + +WorkflowIdentifier: TypeAlias = Union[WorkflowVersionId, Tuple[str, str], str] +WorkflowVersionIdentifier: TypeAlias = Union[WorkflowVersionId, Tuple[str, str]] + + +class WorkflowTaskAPI(BetaWorkflowAPIClient): + _RESOURCE_PATH = "/workflows/tasks" + + def update( + self, task_id: str, status: Literal["completed", "failed"], output: dict | None = None + ) -> WorkflowTaskExecution: + """`Update status of async task. `_ + + For tasks that has been marked with 'is_async = True', the status must be updated by calling this endpoint with either 'completed' or 'failed'. + + Args: + task_id (str): The server-generated id of the task. + status (Literal["completed", "failed"]): The new status of the task. Must be either 'completed' or 'failed'. + output (dict | None): The output of the task. This will be available for tasks that has specified it as an output with the string "${.output}" + + Returns: + WorkflowTaskExecution: The updated task execution. + + Examples: + + Update task with UUID '000560bc-9080-4286-b242-a27bb4819253' to status 'completed': + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "completed") + + Update task with UUID '000560bc-9080-4286-b242-a27bb4819253' to status 'failed' with output '{"a": 1, "b": 2}': + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.tasks.update("000560bc-9080-4286-b242-a27bb4819253", "failed", output={"a": 1, "b": 2}) + + Trigger workflow, retrieve detailed task execution and update status of the second task (assumed to be async) to 'completed': + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.executions.trigger("my workflow", "1") + >>> res = c.workflows.executions.retrieve_detailed(res.id) + >>> res = c.workflows.tasks.update(res.tasks[1].id, "completed") + + """ + self._experimental_warning() + + body: dict[str, Any] = {"status": status.upper()} + if output is not None: + body["output"] = output + response = self._post( + url_path=f"{self._RESOURCE_PATH}/{task_id}/update", + json=body, + ) + return WorkflowTaskExecution._load(response.json()) + + +class WorkflowExecutionAPI(BetaWorkflowAPIClient): + _RESOURCE_PATH = "/workflows/executions" + + def retrieve_detailed(self, id: str) -> WorkflowExecutionDetailed | None: + """`Retrieve a workflow execution with detailed information. `_ + + Args: + id (str): The server-generated id of the workflow execution. + + Returns: + WorkflowExecutionDetailed | None: The requested workflow execution if it exists, None otherwise. + + Examples: + + Retrieve workflow execution with UUID '000560bc-9080-4286-b242-a27bb4819253': + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.executions.retrieve_detailed("000560bc-9080-4286-b242-a27bb4819253") + + List workflow executions and retrieve detailed information for the first one: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.executions.list() + >>> res = c.workflows.executions.retrieve_detailed(res[0].id) + + """ + self._experimental_warning() + try: + response = self._get(url_path=f"{self._RESOURCE_PATH}/{id}") + except CogniteAPIError as e: + if e.code == 400: + return None + raise + return WorkflowExecutionDetailed._load(response.json()) + + def trigger( + self, + workflow_external_id: str, + version: str, + input: dict | None = None, + ) -> WorkflowExecution: + """`Trigger a workflow execution. `_ + + Args: + workflow_external_id (str): External id of the workflow. + version (str): Version of the workflow. + input (dict | None): The input to the workflow execution. This will be available for tasks that have specified it as an input with the strind "${workflow.input}" + See tip below for more information. + + Tip: + The workflow input can be available in the workflow tasks. For example, if you have a Task with + function parameters then you can specify it as follows + + >>> from cognite.client.data_classes import WorkflowTask, FunctionTaskParameters + >>> task = WorkflowTask( + ... external_id="my_workflow-task1", + ... parameters=FunctionTaskParameters( + ... external_id="cdf_deployed_function:my_function", + ... data={"workflow_data": "${workflow.input}",})) + + Returns: + WorkflowExecution: The created workflow execution. + + Examples: + + Trigger workflow execution for workflow my workflow version 1: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.executions.trigger("my workflow", "1") + + Trigger workflow execution for workflow my workflow version 1 with input data '{"a": 1, "b": 2}: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.executions.trigger("my workflow", "1", input={"a": 1, "b": 2}) + + """ + self._experimental_warning() + nonce = create_session_and_return_nonce(self._cognite_client, api_name="Workflow API") + body = {"authentication": {"nonce": nonce}} + if input is not None: + body["input"] = input + + response = self._post( + url_path=f"/workflows/{workflow_external_id}/versions/{version}/run", + json=body, + ) + return WorkflowExecution._load(response.json()) + + def list( + self, + workflow_version_ids: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None = None, + created_time_start: int | None = None, + created_time_end: int | None = None, + limit: int = DEFAULT_LIMIT_READ, + ) -> WorkflowExecutionList: + """`List workflow executions in the project. `_ + + Args: + workflow_version_ids (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier] | None): Workflow version id or list of workflow version ids to filter on. + created_time_start (int | None): Filter out executions that was created before this time. Time is in milliseconds since epoch. + created_time_end (int | None): Filter out executions that was created after this time. Time is in milliseconds since epoch. + limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None + to return all items. + + Returns: + WorkflowExecutionList: The requested workflow executions. + + Examples: + + Get all workflow executions for workflows 'my_workflow' version '1': + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.executions.list(("my_workflow", "1")) + + Get all workflow executions for workflows after last 24 hours: + + >>> from cognite.client import CogniteClient + >>> from datetime import datetime, timedelta + >>> c = CogniteClient() + >>> res = c.workflows.executions.list(created_time_start=int((datetime.now() - timedelta(days=1)).timestamp() * 1000)) + + """ + self._experimental_warning() + filter_: dict[str, Any] = {} + if workflow_version_ids is not None: + filter_["workflowFilters"] = WorkflowIds._load(workflow_version_ids).dump( + camel_case=True, as_external_id=True + ) + if created_time_start is not None: + filter_["createdTimeStart"] = created_time_start + if created_time_end is not None: + filter_["createdTimeEnd"] = created_time_end + + return self._list( + method="POST", + resource_cls=WorkflowExecution, + list_cls=WorkflowExecutionList, + filter=filter_, + limit=limit, + ) + + +class WorkflowVersionAPI(BetaWorkflowAPIClient): + _RESOURCE_PATH = "/workflows/versions" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._DELETE_LIMIT = 100 + + def upsert(self, version: WorkflowVersionUpsert, mode: Literal["replace"] = "replace") -> WorkflowVersion: + """`Create a workflow version. `_ + + Note this is an upsert endpoint, so if a workflow with the same version external id already exists, it will be updated. + + Furthermore, if the workflow does not exist, it will be created. + + Args: + version (WorkflowVersionUpsert): The workflow version to create or update. + mode (Literal['replace']): This is not an option for the API, but is included here to document that the upserts are always done in replace mode. + + Returns: + WorkflowVersion: The created workflow version. + + Examples: + + Create workflow version with one Function task: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes import WorkflowVersionUpsert, WorkflowDefinitionUpsert, WorkflowTask, FunctionTaskParameters + >>> c = CogniteClient() + >>> new_version =WorkflowVersionUpsert( + ... workflow_external_id="my_workflow", + ... version="1", + ... workflow_definition=WorkflowDefinitionUpsert( + ... tasks=[ + ... WorkflowTask( + ... external_id="my_workflow-task1", + ... parameters=FunctionTaskParameters( + ... external_id="cdf_deployed_function:my_function", + ... data={"a": 1, "b": 2}, + ... ), + ... ) + ... ], + ... description="This workflow has one step", + ... ), + ... ) + >>> res = c.workflows.upsert(new_version) + """ + self._experimental_warning() + if mode != "replace": + raise ValueError("Only replace mode is supported for upserting workflow versions.") + + response = self._post( + url_path=self._RESOURCE_PATH, + json={"items": [version.dump(camel_case=True)]}, + ) + + return WorkflowVersion._load(response.json()["items"][0]) + + def delete( + self, + workflow_version_id: WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier], + ignore_unknown_ids: bool = False, + ) -> None: + """`Delete a workflow version(s). `_ + + Args: + workflow_version_id (WorkflowVersionIdentifier | MutableSequence[WorkflowVersionIdentifier]): Workflow version id or list of workflow version ids to delete. + ignore_unknown_ids (bool): Ignore external ids that are not found rather than throw an exception. + + Examples: + + Delete workflow version "1" of workflow "my workflow" specified by using a tuple: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> c.workflows.versions.delete(("my workflow", "1")) + + Delete workflow version "1" of workflow "my workflow" and workflow version "2" of workflow "my workflow 2" using the WorkflowVersionId class: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes import WorkflowVersionId + >>> c = CogniteClient() + >>> c.workflows.versions.delete([WorkflowVersionId("my workflow", "1"), WorkflowVersionId("my workflow 2", "2")]) + + """ + self._experimental_warning() + identifiers = WorkflowIds._load(workflow_version_id).dump(camel_case=True) + self._delete_multiple( + identifiers=WorkflowVersionIdentifierSequence.load(identifiers), + params={"ignoreUnknownIds": ignore_unknown_ids}, + wrap_ids=True, + ) + + def retrieve(self, workflow_external_id: str, version: str) -> WorkflowVersion | None: + """`Retrieve a workflow version. `_ + + Args: + workflow_external_id (str): External id of the workflow. + version (str): Version of the workflow. + + Returns: + WorkflowVersion | None: The requested workflow version if it exists, None otherwise. + + Examples: + + Retrieve workflow version 1 of workflow my workflow: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.versions.retrieve("my workflow", "1") + """ + self._experimental_warning() + try: + response = self._get( + url_path=f"/workflows/{workflow_external_id}/versions/{version}", + ) + except CogniteAPIError as e: + if e.code == 404: + return None + raise e + + return WorkflowVersion._load(response.json()) + + def list( + self, + workflow_version_ids: WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None = None, + limit: int = DEFAULT_LIMIT_READ, + ) -> WorkflowVersionList: + """`List workflow versions in the project `_ + + Args: + workflow_version_ids (WorkflowIdentifier | MutableSequence[WorkflowIdentifier] | None): Workflow version id or list of workflow version ids to filter on. + limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None + + Returns: + WorkflowVersionList: The requested workflow versions. + + Examples: + + Get all workflow version for workflows 'my_workflow' and 'my_workflow_2': + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.versions.list(["my_workflow", "my_workflow_2"]) + + Get all workflow versions for workflows 'my_workflow' and 'my_workflow_2' using the WorkflowVersionId class: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes import WorkflowVersionId + >>> c = CogniteClient() + >>> res = c.workflows.versions.list([WorkflowVersionId("my_workflow"), WorkflowVersionId("my_workflow_2")]) + + Get all workflow versions for workflows 'my_workflow' version '1' and 'my_workflow_2' version '2' using tuples: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.versions.list([("my_workflow", "1"), ("my_workflow_2", "2")]) + + """ + self._experimental_warning() + if workflow_version_ids is None: + workflow_ids_dumped = [] + else: + workflow_ids_dumped = WorkflowIds._load(workflow_version_ids).dump(camel_case=True, as_external_id=True) + + return self._list( + method="POST", + resource_cls=WorkflowVersion, + list_cls=WorkflowVersionList, + filter={"workflowFilters": workflow_ids_dumped}, + limit=limit, + ) + + +class WorkflowAPI(BetaWorkflowAPIClient): + _RESOURCE_PATH = "/workflows" + + def __init__( + self, + config: ClientConfig, + api_version: str | None, + cognite_client: CogniteClient, + ) -> None: + super().__init__(config, api_version, cognite_client) + self.versions = WorkflowVersionAPI(config, api_version, cognite_client) + self.executions = WorkflowExecutionAPI(config, api_version, cognite_client) + self.tasks = WorkflowTaskAPI(config, api_version, cognite_client) + self._DELETE_LIMIT = 100 + + def upsert(self, workflow: WorkflowUpsert, mode: Literal["replace"] = "replace") -> Workflow: + """`Create a workflow. `_ + + Note this is an upsert endpoint, so if a workflow with the same external id already exists, it will be updated. + + Args: + workflow (WorkflowUpsert): The workflow to create or update. + mode (Literal['replace']): This is not an option for the API, but is included here to document that the upserts are always done in replace mode. + + Returns: + Workflow: The created workflow. + + Examples: + + Create workflow my workflow: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes import WorkflowUpsert + >>> c = CogniteClient() + >>> res = c.workflows.upsert(WorkflowUpsert(external_id="my workflow", description="my workflow description")) + """ + self._experimental_warning() + if mode != "replace": + raise ValueError("Only replace mode is supported for upserting workflows.") + + response = self._post( + url_path=self._RESOURCE_PATH, + json={"items": [workflow.dump(camel_case=True)]}, + ) + return Workflow._load(response.json()["items"][0]) + + def retrieve(self, external_id: str) -> Workflow | None: + """`Retrieve a workflow. `_ + + Args: + external_id (str): Identifier for a Workflow. Must be unique for the project. + + Returns: + Workflow | None: The requested workflow if it exists, None otherwise. + + Examples: + + Retrieve workflow my workflow: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.retrieve("my workflow") + """ + self._experimental_warning() + try: + response = self._get(url_path=self._RESOURCE_PATH + f"/{external_id}") + except CogniteAPIError as e: + if e.code == 404: + return None + raise e + return Workflow._load(response.json()) + + def delete(self, external_id: str | Sequence[str], ignore_unknown_ids: bool = False) -> None: + """`Delete one or more workflows with versions. `_ + + Args: + external_id (str | Sequence[str]): External id or list of external ids to delete. + ignore_unknown_ids (bool): Ignore external ids that are not found rather than throw an exception. + + Examples: + + Delete workflow my workflow: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> c.workflows.delete("my workflow") + """ + self._experimental_warning() + self._delete_multiple( + identifiers=IdentifierSequence.load(external_ids=external_id), + params={"ignoreUnknownIds": ignore_unknown_ids}, + wrap_ids=True, + ) + + def list(self, limit: int = DEFAULT_LIMIT_READ) -> WorkflowList: + """`List all workflows in the project. `_ + + Args: + limit (int): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None + Returns: + WorkflowList: All workflows in the CDF project. + + Examples: + + List all workflows: + + >>> from cognite.client import CogniteClient + >>> c = CogniteClient() + >>> res = c.workflows.list() + """ + self._experimental_warning() + + return self._list( + method="GET", + resource_cls=Workflow, + list_cls=WorkflowList, + limit=limit, + ) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index 06074a1d88..16ad129b75 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -27,6 +27,7 @@ from cognite.client._api.time_series import TimeSeriesAPI from cognite.client._api.transformations import TransformationsAPI from cognite.client._api.vision import VisionAPI +from cognite.client._api.workflows import WorkflowAPI from cognite.client._api_client import APIClient from cognite.client.config import ClientConfig, global_config from cognite.client.credentials import CredentialProvider, OAuthClientCredentials, OAuthInteractive @@ -75,6 +76,7 @@ def __init__(self, config: ClientConfig | None = None) -> None: self.functions = FunctionsAPI(self._config, self._API_VERSION, self) self.data_modeling = DataModelingAPI(self._config, self._API_VERSION, self) self.documents = DocumentsAPI(self._config, self._API_VERSION, self) + self.workflows = WorkflowAPI(self._config, self._API_VERSION, self) # APIs just using base_url: self._api_client = APIClient(self._config, api_version=None, cognite_client=self) diff --git a/cognite/client/_version.py b/cognite/client/_version.py index bef7f3ed65..ecfef7f884 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "6.27.0" +__version__ = "6.28.0" __api_subversion__ = "V20220125" diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index e1bdbb284a..8deb4da571 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -209,6 +209,30 @@ TransformationSchemaColumnList, ) from cognite.client.data_classes.user_profiles import UserProfile, UserProfileList +from cognite.client.data_classes.workflows import ( + CDFTaskOutput, + CDFTaskParameters, + DynamicTaskOutput, + DynamicTaskParameters, + FunctionTaskOutput, + FunctionTaskParameters, + TransformationTaskOutput, + TransformationTaskParameters, + Workflow, + WorkflowDefinition, + WorkflowDefinitionUpsert, + WorkflowExecution, + WorkflowExecutionDetailed, + WorkflowExecutionList, + WorkflowList, + WorkflowTask, + WorkflowTaskExecution, + WorkflowUpsert, + WorkflowVersion, + WorkflowVersionId, + WorkflowVersionList, + WorkflowVersionUpsert, +) __all__ = [ "Annotation", @@ -380,4 +404,26 @@ "CoordinateReferenceSystem", "UserProfile", "UserProfileList", + "WorkflowUpsert", + "WorkflowExecution", + "WorkflowExecutionDetailed", + "WorkflowExecutionList", + "WorkflowList", + "WorkflowVersion", + "WorkflowVersionUpsert", + "WorkflowVersionId", + "WorkflowVersionList", + "FunctionTaskParameters", + "TransformationTaskParameters", + "CDFTaskParameters", + "DynamicTaskParameters", + "FunctionTaskOutput", + "TransformationTaskOutput", + "CDFTaskOutput", + "DynamicTaskOutput", + "WorkflowDefinition", + "WorkflowDefinitionUpsert", + "WorkflowTaskExecution", + "Workflow", + "WorkflowTask", ] diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py new file mode 100644 index 0000000000..c63e0ca95e --- /dev/null +++ b/cognite/client/data_classes/workflows.py @@ -0,0 +1,957 @@ +from __future__ import annotations + +import json +from abc import ABC, abstractmethod +from collections import UserList +from collections.abc import Collection +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, ClassVar, Literal, Sequence, cast + +from typing_extensions import Self, TypeAlias + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + T_CogniteResource, +) +from cognite.client.utils._text import convert_all_keys_to_snake_case, to_snake_case + +if TYPE_CHECKING: + from cognite.client import CogniteClient + +WorkflowStatus: TypeAlias = Literal[ + "in_progress", + "cancelled", + "failed", + "failed_with_terminal_error", + "completed", + "completed_with_errors", + "timed_out", + "skipped", +] + + +class WorkflowUpsert(CogniteResource): + """ + This class represents a workflow. This is the write version, used when creating or updating a workflow. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + description (str | None): Description of the workflow. Note that when updating a workflow, the description will + always be overwritten also if it is set to None. Meaning if the wokflow already has a description, + and you want to keep it, you need to provide the description when updating the workflow. + """ + + def __init__(self, external_id: str, description: str | None) -> None: + self.external_id = external_id + self.description = description + + @classmethod + def _load( + cls: type[T_CogniteResource], + resource: dict | str, + cognite_client: CogniteClient | None = None, + ) -> T_CogniteResource: + resource = json.loads(resource) if isinstance(resource, str) else resource + + resource = convert_all_keys_to_snake_case(resource) + return cls(**resource) + + +class Workflow(WorkflowUpsert): + """ + This class represents a workflow. This is the reading version, used when reading or listing a workflows. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + created_time (int): The time when the workflow was created. Unix timestamp in milliseconds. + description (str | None): Description of the workflow. Defaults to None. + + """ + + def __init__( + self, + external_id: str, + created_time: int, + description: str | None = None, + ) -> None: + super().__init__(external_id, description) + self.created_time = created_time + + +class WorkflowList(CogniteResourceList[Workflow]): + """This class represents a list of workflows.""" + + _RESOURCE = Workflow + + def as_external_ids(self) -> list[str]: + """Returns a list of external ids for the workflows in the list. + + Returns: + list[str]: List of external ids. + """ + return [workflow.external_id for workflow in self.data] + + +class WorkflowTaskParameters(CogniteResource, ABC): + task_type: ClassVar[str] + + @classmethod + def load_parameters(cls, data: dict) -> WorkflowTaskParameters: + type_ = data.get("type", data.get("taskType")) + parameters = data.get("parameters", data.get("input")) + if parameters is None: + raise ValueError( + "You must provide parameter data either with key " + "'parameter' or 'input', with parameter taking precedence." + ) + + if type_ == "function": + return FunctionTaskParameters._load(parameters) + elif type_ == "transformation": + return TransformationTaskParameters._load(parameters) + elif type_ == "cdf": + return CDFTaskParameters._load(parameters) + elif type_ == "dynamic": + return DynamicTaskParameters._load(parameters) + else: + raise ValueError(f"Unknown task type: {type_}. Expected 'function', 'transformation', 'cdf, or 'dynamic'") + + +class FunctionTaskParameters(WorkflowTaskParameters): + """The function parameters are used to specify the Cognite Function to be called. + + Args: + external_id (str): The external ID of the function to be called. + data (dict | str | None): The data to be passed to the function. Defaults to None. The data can be used to specify the input to the function from previous tasks or the workflow input. See the tip below for more information. + is_async_complete (bool): Whether the function is asynchronous. Defaults to False. + + If a function is asynchronous, you need to call the client.workflows.tasks.update() endpoint to update the status of the task. + While synchronous tasks update the status automatically. + + Tip: + You can dynamically specify data from other tasks or the workflow. You do this by following the format + `${prefix.jsonPath}` in the expression. The valid are: + + - `${workflow.input}`: The workflow input. + - `${.output}`: The output of the task with the given external id. + - `${.input}`: The input of the task with the given external id. + + For example, if you have a workflow containing two tasks, and the external_id of the first task is `task1` then, + you can specify the data for the second task as follows: + + >>> from cognite.client.data_classes import WorkflowTask, FunctionTaskParameters + >>> task = WorkflowTask( + ... external_id="task2", + ... parameters=FunctionTaskParameters( + ... external_id="cdf_deployed_function", + ... data={ + ... "workflow_data": "${workflow.input}", + ... "task1_input": "${task1.input}", + ... "task1_output": "${task1.output}" + ... }, + ... ), + ... ) + """ + + task_type: ClassVar[str] = "function" + + def __init__( + self, + external_id: str, + data: dict | str | None = None, + is_async_complete: bool = False, + ) -> None: + self.external_id = external_id + self.data = data + self.is_async_complete = is_async_complete + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> FunctionTaskParameters: + resource = json.loads(resource) if isinstance(resource, str) else resource + function: dict[str, Any] = resource["function"] + + return cls( + external_id=function["externalId"], + data=function.get("data"), + # Allow default to come from the API. + is_async_complete=resource.get("isAsyncComplete"), # type: ignore[arg-type] + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + function: dict[str, Any] = { + ("externalId" if camel_case else "external_id"): self.external_id, + } + if self.data: + function["data"] = self.data + + output: dict[str, Any] = { + "function": function, + "isAsyncComplete": self.is_async_complete, + } + return output + + +class TransformationTaskParameters(WorkflowTaskParameters): + """ + The transformation parameters are used to specify the transformation to be called. + + Args: + external_id (str): The external ID of the transformation to be called. + + """ + + task_type: ClassVar[str] = "transformation" + + def __init__(self, external_id: str) -> None: + self.external_id = external_id + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> TransformationTaskParameters: + resource = json.loads(resource) if isinstance(resource, str) else resource + return cls( + resource["transformation"]["externalId"], + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return {"transformation": {("externalId" if camel_case else "external_id"): self.external_id}} + + +class CDFTaskParameters(WorkflowTaskParameters): + """ + The CDF request parameters are used to specify a request to the Cognite Data Fusion API. + + Args: + resource_path (str): The resource path of the request. Note the path of the request which is prefixed by '{cluster}.cognitedata.com/api/v1/project/{project}' based on the cluster and project of the request. + method (Literal["GET", "POST", "PUT", "DELETE"] | str): The HTTP method of the request. + query_parameters (dict | str | None): The query parameters of the request. Defaults to None. + body (dict | str | None): The body of the request. Defaults to None. Limited to 1024KiB in size + request_timeout_in_millis (int | str): The timeout of the request in milliseconds. Defaults to 10000. + + Examples: + + Call the asset/list endpoint with a limit of 10: + + >>> from cognite.client.data_classes import WorkflowTask, CDFTaskParameters + >>> task = WorkflowTask( + ... external_id="task1", + ... parameters=CDFTaskParameters( + ... resource_path="/assets/list", + ... method="GET", + ... query_parameters={"limit": 10}, + ... ), + ... ) + + """ + + task_type: ClassVar[str] = "cdf" + + def __init__( + self, + resource_path: str, + method: Literal["GET", "POST", "PUT", "DELETE"] | str, + query_parameters: dict | str | None = None, + body: dict | str | None = None, + request_timeout_in_millis: int | str = 10000, + ) -> None: + self.resource_path = resource_path + self.method = method + self.query_parameters = query_parameters or {} + self.body = body or {} + self.request_timeout_in_millis = request_timeout_in_millis + + @classmethod + def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient | None = None) -> Self: + resource = json.loads(resource) if isinstance(resource, str) else resource + + cdf_request: dict[str, Any] = resource["cdfRequest"] + + arguments = convert_all_keys_to_snake_case(cdf_request) + return cls(**arguments) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + output = super().dump(camel_case) + return { + ("cdfRequest" if camel_case else "cdfRequest"): output, + } + + +class DynamicTaskParameters(WorkflowTaskParameters): + """ + The dynamic task parameters are used to specify a dynamic task. + + When the tasks and their order of execution are determined at runtime, we use dynamic tasks. It takes the tasks parameter, + which is an array of function, transformation, and cdf task definitions. + This array should then be generated and returned by a previous step in the workflow, for instance, + a Cognite Function task. + + Args: + dynamic (list[WorkflowTask] | str): The dynamic task to be called. The dynamic task is a string that is evaluated + during the workflow's execution. + + """ + + task_type: ClassVar[str] = "dynamic" + + def __init__(self, dynamic: list[WorkflowTask] | str) -> None: + self.dynamic = dynamic + + +class WorkflowTask(CogniteResource): + """ + This class represents a workflow task. + + Note: tasks do not distinguish between write and read versions. + + Args: + external_id (str): The external ID provided by the client. Must be unique for the resource type. + parameters (WorkflowTaskParameters): The parameters of the task. + name (str | None): The name of the task. Defaults to None. + description (str | None): The description of the task. Defaults to None. + retries (int): The number of retries for the task. Defaults to 3. + timeout (int): The timeout of the task in seconds. Defaults to 3600. + depends_on (list[str] | None): The external ids of the tasks that this task depends on. Defaults to None. + """ + + def __init__( + self, + external_id: str, + parameters: WorkflowTaskParameters, + name: str | None = None, + description: str | None = None, + retries: int = 3, + timeout: int = 3600, + depends_on: list[str] | None = None, + ) -> None: + self.external_id = external_id + self.parameters = parameters + self.name = name + self.description = description + self.retries = retries + self.timeout = timeout + self.depends_on = depends_on + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowTask: + resource = json.loads(resource) if isinstance(resource, str) else resource + return cls( + external_id=resource["externalId"], + parameters=WorkflowTaskParameters.load_parameters(resource), + name=resource.get("name"), + description=resource.get("description"), + # Allow default to come from the API. + retries=resource.get("retries"), # type: ignore[arg-type] + timeout=resource.get("timeout"), # type: ignore[arg-type] + depends_on=[dep["externalId"] for dep in resource.get("depends_on", [])] or None, + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + type_ = self.parameters.task_type + + output: dict[str, Any] = { + ("externalId" if camel_case else "external_id"): self.external_id, + "type": type_, + "parameters": self.parameters.dump(camel_case), + "retries": self.retries, + "timeout": self.timeout, + } + if self.name: + output["name"] = self.name + if self.description: + output["description"] = self.description + if self.depends_on: + output[("dependsOn" if camel_case else "depends_on")] = [ + {("externalId" if camel_case else "external_id"): dependency} for dependency in self.depends_on + ] + return output + + +class WorkflowTaskOutput(ABC): + task_type: ClassVar[str] + + @classmethod + @abstractmethod + def load(cls: type[Self], data: dict) -> Self: + raise NotImplementedError + + @classmethod + def load_output(cls, data: dict) -> WorkflowTaskOutput: + task_type = data["taskType"] + if task_type == "function": + return FunctionTaskOutput.load(data) + elif task_type == "transformation": + return TransformationTaskOutput.load(data) + elif task_type == "cdf": + return CDFTaskOutput.load(data) + elif task_type == "dynamic": + return DynamicTaskOutput.load(data) + else: + raise ValueError(f"Unknown task type: {task_type}") + + @abstractmethod + def dump(self, camel_case: bool = False) -> dict[str, Any]: + raise NotImplementedError + + +class FunctionTaskOutput(WorkflowTaskOutput): + """ + The class represent the output of Cognite Function task. + + Args: + call_id (int | None): The call_id of the CDF Function call. + function_id (int | None): The function_id of the CDF Function. + response (dict | None): The response of the CDF Function call. + + """ + + task_type: ClassVar[str] = "function" + + def __init__(self, call_id: int | None, function_id: int | None, response: dict | None) -> None: + self.call_id = call_id + self.function_id = function_id + self.response = response + + @classmethod + def load(cls, data: dict[str, Any]) -> FunctionTaskOutput: + output = data["output"] + return cls(output.get("callId"), output.get("functionId"), output.get("response")) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return { + "output": { + ("callId" if camel_case else "call_id"): self.call_id, + ("functionId" if camel_case else "function_id"): self.function_id, + "response": self.response, + }, + ("taskType" if camel_case else "task_type"): self.task_type, + } + + +class TransformationTaskOutput(WorkflowTaskOutput): + """ + The transformation output is used to specify the output of a transformation task. + + Args: + job_id (int): The job id of the transformation job. + """ + + task_type: ClassVar[str] = "transformation" + + def __init__(self, job_id: int) -> None: + self.job_id = job_id + + @classmethod + def load(cls, data: dict[str, Any]) -> TransformationTaskOutput: + output = data["output"] + return cls(output["jobId"]) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return { + "output": {("jobId" if camel_case else "job_id"): self.job_id}, + ("taskType" if camel_case else "task_type"): self.task_type, + } + + +class CDFTaskOutput(WorkflowTaskOutput): + """ + The CDF Request output is used to specify the output of a CDF Request. + + Args: + response (str | dict | None): The response of the CDF Request. Will be a JSON object if content-type is application/json, otherwise will be a string. + status_code (int | None): The status code of the CDF Request. + """ + + task_type: ClassVar[str] = "cdf" + + def __init__(self, response: str | dict | None, status_code: int | None) -> None: + self.response = response + self.status_code = status_code + + @classmethod + def load(cls, data: dict[str, Any]) -> CDFTaskOutput: + output = data["output"] + return cls(output.get("response"), output.get("statusCode")) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return { + "output": { + "response": self.response, + ("statusCode" if camel_case else "status_code"): self.status_code, + }, + ("taskType" if camel_case else "task_type"): self.task_type, + } + + +class DynamicTaskOutput(WorkflowTaskOutput): + """ + The dynamic task output is used to specify the output of a dynamic task. + + Args: + dynamic_tasks (list[WorkflowTask]): The dynamic tasks to be created on the fly. + """ + + task_type: ClassVar[str] = "dynamic" + + def __init__(self, dynamic_tasks: list[WorkflowTask]) -> None: + self.dynamic_tasks = dynamic_tasks + + @classmethod + def load(cls, data: dict[str, Any]) -> DynamicTaskOutput: + output = data["output"] + return cls([WorkflowTask._load(task) for task in output["dynamicTasks"]]) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return { + "output": {"dynamicTasks": [task.dump(camel_case) for task in self.dynamic_tasks]}, + ("taskType" if camel_case else "task_type"): self.task_type, + } + + +class WorkflowTaskExecution(CogniteResource): + """ + This class represents a task execution. + + Args: + id (str): The server generated id of the task execution. + external_id (str): The external ID provided by the client. Must be unique for the resource type. + status (WorkflowStatus): The status of the task execution. + input (WorkflowTaskParameters): The input parameters of the task execution. + output (WorkflowTaskOutput): The output of the task execution. + version (str | None): The version of the task execution. Defaults to None. + start_time (int | None): The start time of the task execution. Unix timestamp in milliseconds. Defaults to None. + end_time (int | None): The end time of the task execution. Unix timestamp in milliseconds. Defaults to None. + reason_for_incompletion (str | None): Provides the reason if the workflow did not complete successfully. Defaults to None. + + """ + + def __init__( + self, + id: str, + external_id: str, + status: WorkflowStatus, + input: WorkflowTaskParameters, + output: WorkflowTaskOutput, + version: str | None = None, + start_time: int | None = None, + end_time: int | None = None, + reason_for_incompletion: str | None = None, + ) -> None: + self.id = id + self.external_id = external_id + self.status = status + self.input = input + self.output = output + self.version = version + self.start_time = start_time + self.end_time = end_time + self.reason_for_incompletion = reason_for_incompletion + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowTaskExecution: + resource = json.loads(resource) if isinstance(resource, str) else resource + return cls( + id=resource["id"], + external_id=resource["externalId"], + status=cast(WorkflowStatus, to_snake_case(resource["status"])), + input=WorkflowTaskParameters.load_parameters(resource), + output=WorkflowTaskOutput.load_output(resource), + version=resource.get("version"), + start_time=resource.get("startTime"), + end_time=resource.get("endTime"), + reason_for_incompletion=resource.get("reasonForIncompletion"), + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + output: dict[str, Any] = super().dump(camel_case) + task_type_key = "taskType" if camel_case else "task_type" + output[task_type_key] = self.output.task_type + output["output"] = self.output.dump(camel_case) + return output + + +class WorkflowDefinitionUpsert(CogniteResource): + """ + This class represents a workflow definition. This represents the write/update version of a workflow definiton. + + A workflow definition defines the tasks and order/dependencies of these tasks. + + Args: + tasks (list[WorkflowTask]): The tasks of the workflow definition. + description (str | None): The description of the workflow definition. Note that when updating a workflow definition + description, it will always be overwritten also if it is set to None. Meaning if the + wokflow definition already has a description, and you want to keep it, you need to provide + the description when updating it. + """ + + def __init__( + self, + tasks: list[WorkflowTask], + description: str | None, + ) -> None: + self.hash = hash + self.tasks = tasks + self.description = description + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowDefinitionUpsert: + resource = json.loads(resource) if isinstance(resource, str) else resource + return cls( + tasks=[WorkflowTask._load(task) for task in resource["tasks"]], + description=resource.get("description"), + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + output: dict[str, Any] = {"tasks": [task.dump(camel_case) for task in self.tasks]} + if self.description: + output["description"] = self.description + return output + + +class WorkflowDefinition(WorkflowDefinitionUpsert): + """ + This class represents a workflow definition. This represents the read version of a workflow definiton. + + A workflow definition defines the tasks and order/dependencies of these tasks. + + Args: + hash_ (str): The hash of the tasks and description. This is used to uniquely identify the workflow definition as you can overwrite a workflow version. + tasks (list[WorkflowTask]): The tasks of the workflow definition. + description (str | None): The description of the workflow definition. Defaults to None. + """ + + def __init__( + self, + hash_: str, + tasks: list[WorkflowTask], + description: str | None = None, + ) -> None: + super().__init__(tasks, description) + self.hash_ = hash_ + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowDefinition: + resource = json.loads(resource) if isinstance(resource, str) else resource + return cls( + hash_=resource["hash"], + tasks=[WorkflowTask._load(task) for task in resource["tasks"]], + description=resource.get("description"), + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + output = super().dump(camel_case) + output["hash"] = self.hash_ + return output + + +class WorkflowVersionUpsert(CogniteResource): + """ + This class represents a workflow version. This is the write-variant, used when creating or updating a workflow variant. + + Args: + workflow_external_id (str): The external ID of the workflow. + version (str): The version of the workflow. + workflow_definition (WorkflowDefinitionUpsert): The workflow definition of the workflow version. + + """ + + def __init__( + self, + workflow_external_id: str, + version: str, + workflow_definition: WorkflowDefinitionUpsert, + ) -> None: + self.workflow_external_id = workflow_external_id + self.version = version + self.workflow_definition = workflow_definition + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> Self: + resource = json.loads(resource) if isinstance(resource, str) else resource + workflow_definition: dict[str, Any] = resource["workflowDefinition"] + return cls( + workflow_external_id=resource["workflowExternalId"], + version=resource["version"], + workflow_definition=WorkflowDefinitionUpsert._load(workflow_definition), + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + return { + ("workflowExternalId" if camel_case else "workflow_external_id"): self.workflow_external_id, + "version": self.version, + ("workflowDefinition" if camel_case else "workflow_definition"): self.workflow_definition.dump(camel_case), + } + + def as_id(self) -> WorkflowVersionId: + return WorkflowVersionId( + workflow_external_id=self.workflow_external_id, + version=self.version, + ) + + +class WorkflowVersion(WorkflowVersionUpsert): + """ + This class represents a workflow version. This is the read variant, used when retrieving/listing a workflow variant. + + Args: + workflow_external_id (str): The external ID of the workflow. + version (str): The version of the workflow. + workflow_definition (WorkflowDefinition): The workflow definition of the workflow version. + """ + + def __init__( + self, + workflow_external_id: str, + version: str, + workflow_definition: WorkflowDefinition, + ) -> None: + super().__init__(workflow_external_id, version, workflow_definition) + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowVersion: + resource = json.loads(resource) if isinstance(resource, str) else resource + workflow_definition: dict[str, Any] = resource["workflowDefinition"] + return cls( + workflow_external_id=resource["workflowExternalId"], + version=resource["version"], + workflow_definition=WorkflowDefinition._load(workflow_definition), + ) + + +class WorkflowVersionList(CogniteResourceList[WorkflowVersion]): + """ + This class represents a list of workflow versions. + """ + + _RESOURCE = WorkflowVersion + + def as_ids(self) -> WorkflowIds: + """Returns a WorkflowIds object with the workflow version ids.""" + return WorkflowIds([workflow_version.as_id() for workflow_version in self.data]) + + +class WorkflowExecution(CogniteResource): + """ + This class represents a workflow execution. + + Args: + id (str): The server generated id of the workflow execution. + workflow_external_id (str): The external ID of the workflow. + status (Literal["running", "completed", "failed", "timed_out", "terminated", "paused"]): The status of the workflow execution. + created_time (int): The time when the workflow execution was created. Unix timestamp in milliseconds. + version (str | None): The version of the workflow. Defaults to None. + start_time (int | None): The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None. + end_time (int | None): The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None. + reason_for_incompletion (str | None): Provides the reason if the workflow did not complete successfully. Defaults to None. + """ + + def __init__( + self, + id: str, + workflow_external_id: str, + status: Literal["running", "completed", "failed", "timed_out", "terminated", "paused"], + created_time: int, + version: str | None = None, + start_time: int | None = None, + end_time: int | None = None, + reason_for_incompletion: str | None = None, + ) -> None: + self.id = id + self.workflow_external_id = workflow_external_id + self.version = version + self.status = status + self.created_time = created_time + self.start_time = start_time + self.end_time = end_time + self.reason_for_incompletion = reason_for_incompletion + + def as_workflow_id(self) -> WorkflowVersionId: + return WorkflowVersionId( + workflow_external_id=self.workflow_external_id, + version=self.version, + ) + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowExecution: + resource = json.loads(resource) if isinstance(resource, str) else resource + return cls( + id=resource["id"], + workflow_external_id=resource["workflowExternalId"], + version=resource["version"], + status=cast( + Literal["running", "completed", "failed", "timed_out", "terminated", "paused"], + to_snake_case(resource["status"]), + ), + created_time=resource["createdTime"], + start_time=resource.get("startTime"), + end_time=resource.get("endTime"), + reason_for_incompletion=resource.get("reasonForIncompletion"), + ) + + +class WorkflowExecutionList(CogniteResourceList[WorkflowExecution]): + """ + This class represents a list of workflow executions. + """ + + _RESOURCE = WorkflowExecution + + +class WorkflowExecutionDetailed(WorkflowExecution): + """ + This class represents a detailed workflow execution. + + A detailed workflow execution contains the input and output of each task in the workflow execution. In addition, + it contains the workflow definition of the workflow. + + Args: + id (str): The server generated id of the workflow execution. + workflow_external_id (str): The external ID of the workflow. + workflow_definition (WorkflowDefinition): The workflow definition of the workflow. + status (Literal["running", "completed", "failed", "timed_out", "terminated", "paused"]): The status of the workflow execution. + executed_tasks (list[WorkflowTaskExecution]): The executed tasks of the workflow execution. + created_time (int): The time when the workflow execution was created. Unix timestamp in milliseconds. + version (str | None): The version of the workflow. Defaults to None. + start_time (int | None): The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None. + end_time (int | None): The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None. + reason_for_incompletion (str | None): Provides the reason if the workflow did not complete successfully. Defaults to None. + """ + + def __init__( + self, + id: str, + workflow_external_id: str, + workflow_definition: WorkflowDefinition, + status: Literal["running", "completed", "failed", "timed_out", "terminated", "paused"], + executed_tasks: list[WorkflowTaskExecution], + created_time: int, + version: str | None = None, + start_time: int | None = None, + end_time: int | None = None, + reason_for_incompletion: str | None = None, + ) -> None: + super().__init__( + id, workflow_external_id, status, created_time, version, start_time, end_time, reason_for_incompletion + ) + self.workflow_definition = workflow_definition + self.executed_tasks = executed_tasks + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowExecutionDetailed: + resource = json.loads(resource) if isinstance(resource, str) else resource + return cls( + id=resource["id"], + workflow_external_id=resource["workflowExternalId"], + version=resource.get("version"), + status=cast( + Literal["running", "completed", "failed", "timed_out", "terminated", "paused"], + to_snake_case(resource["status"]), + ), + created_time=resource["createdTime"], + start_time=resource.get("startTime"), + end_time=resource.get("endTime"), + reason_for_incompletion=resource.get("reasonForIncompletion"), + workflow_definition=WorkflowDefinition._load(resource["workflowDefinition"]), + executed_tasks=[WorkflowTaskExecution._load(task) for task in resource["executedTasks"]], + ) + + def dump(self, camel_case: bool = False) -> dict[str, Any]: + output = super().dump(camel_case) + output[("workflowDefinition" if camel_case else "workflow_definition")] = self.workflow_definition.dump( + camel_case + ) + output[("executedTasks" if camel_case else "executed_tasks")] = [ + task.dump(camel_case) for task in self.executed_tasks + ] + return output + + def as_execution(self) -> WorkflowExecution: + return WorkflowExecution( + id=self.id, + workflow_external_id=self.workflow_external_id, + version=self.version, + status=self.status, + created_time=self.created_time, + start_time=self.start_time, + end_time=self.end_time, + reason_for_incompletion=self.reason_for_incompletion, + ) + + +@dataclass(frozen=True) +class WorkflowVersionId: + """ + This class represents a Workflow Version Identifier. + + Args: + workflow_external_id (str): The external ID of the workflow. + version (str, optional): The version of the workflow. Defaults to None. + """ + + workflow_external_id: str + version: str | None = None + + def as_primitive(self) -> tuple[str, str | None]: + return self.workflow_external_id, self.version + + @classmethod + def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowVersionId: + resource = json.loads(resource) if isinstance(resource, str) else resource + if "workflowExternalId" in resource: + workflow_external_id = resource["workflowExternalId"] + elif "externalId" in resource: + workflow_external_id = resource["externalId"] + else: + raise ValueError("Invalid input to WorkflowVersionId._load") + + return cls( + workflow_external_id=workflow_external_id, + version=resource.get("version"), + ) + + def dump(self, camel_case: bool = False, as_external_id_key: bool = False) -> dict[str, Any]: + if as_external_id_key: + output: dict[str, Any] = {("externalId" if camel_case else "external_id"): self.workflow_external_id} + else: + output = {("workflowExternalId" if camel_case else "workflow_external_id"): self.workflow_external_id} + if self.version: + output["version"] = self.version + return output + + +class WorkflowIds(UserList): + """ + This class represents a list of Workflow Version Identifiers. + """ + + def __init__(self, workflow_ids: Collection[WorkflowVersionId]) -> None: + for workflow_id in workflow_ids: + if not isinstance(workflow_id, WorkflowVersionId): + raise TypeError( + f"All resources for class '{type(self).__name__}' must be of type " + f"'{type(WorkflowVersionId).__name__}', not '{type(workflow_id)}'." + ) + super().__init__(workflow_ids) + + @classmethod + def _load(cls, resource: Any, cognite_client: CogniteClient | None = None) -> WorkflowIds: + workflow_ids: Sequence[WorkflowVersionId] + if isinstance(resource, tuple) and len(resource) == 2 and all(isinstance(x, str) for x in resource): + workflow_ids = [WorkflowVersionId(*resource)] + elif isinstance(resource, WorkflowVersionId): + workflow_ids = [resource] + elif isinstance(resource, str): + workflow_ids = [WorkflowVersionId(workflow_external_id=resource)] + elif isinstance(resource, dict): + workflow_ids = [WorkflowVersionId._load(resource)] + elif isinstance(resource, Sequence) and resource and isinstance(resource[0], tuple): + workflow_ids = [WorkflowVersionId(*x) for x in resource] + elif isinstance(resource, Sequence) and resource and isinstance(resource[0], WorkflowVersionId): + workflow_ids = resource + elif isinstance(resource, Sequence) and resource and isinstance(resource[0], str): + workflow_ids = [WorkflowVersionId(workflow_external_id=x) for x in resource] + else: + raise ValueError("Invalid input to WorkflowIds") + return cls(workflow_ids) + + def dump(self, camel_case: bool = False, as_external_id: bool = False) -> list[dict[str, Any]]: + return [workflow_id.dump(camel_case, as_external_id_key=as_external_id) for workflow_id in self.data] diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 649fc389c8..e9751125eb 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -59,6 +59,7 @@ ) from cognite.client._api.user_profiles import UserProfilesAPI from cognite.client._api.vision import VisionAPI +from cognite.client._api.workflows import WorkflowAPI, WorkflowExecutionAPI, WorkflowTaskAPI, WorkflowVersionAPI class CogniteClientMock(MagicMock): @@ -153,6 +154,11 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.vision = MagicMock(spec_set=VisionAPI) + self.workflows = MagicMock(spec=WorkflowAPI) + self.workflows.versions = MagicMock(spec_set=WorkflowVersionAPI) + self.workflows.executions = MagicMock(spec_set=WorkflowExecutionAPI) + self.workflows.tasks = MagicMock(spec_set=WorkflowTaskAPI) + @contextmanager def monkeypatch_cognite_client() -> Iterator[CogniteClientMock]: diff --git a/cognite/client/utils/_identifier.py b/cognite/client/utils/_identifier.py index 5acd081fec..365b18e0cb 100644 --- a/cognite/client/utils/_identifier.py +++ b/cognite/client/utils/_identifier.py @@ -87,6 +87,21 @@ def as_primitive(self) -> str: return self.__value +class WorkflowVersionIdentifier: + def __init__(self, version: str, workflow_external_id: str) -> None: + self.__version: str = version + self.__workflow_external_id: str = workflow_external_id + + def as_dict(self, camel_case: bool = True) -> dict[str, str]: + return { + "version": self.__version, + ("workflowExternalId" if camel_case else "workflow_external_id"): self.__workflow_external_id, + } + + def as_primitive(self) -> NoReturn: + raise AttributeError(f"Not supported for {type(self).__name__} implementation") + + class DataModelingIdentifier: def __init__( self, @@ -259,3 +274,35 @@ def load(cls, user_identifiers: str | Sequence[str]) -> UserIdentifierSequence: def assert_singleton(self) -> None: if not self.is_singleton(): raise ValueError("Exactly one user identifier (string) must be specified") + + +class WorkflowVersionIdentifierSequence(IdentifierSequenceCore[WorkflowVersionIdentifier]): + @classmethod + def load(cls, workflow_ids: Sequence[dict]) -> WorkflowVersionIdentifierSequence: + if len(workflow_ids) == 1 and isinstance(workflow_ids[0], dict): + return cls( + [ + WorkflowVersionIdentifier( + version=workflow_ids[0]["version"], workflow_external_id=workflow_ids[0]["workflowExternalId"] + ) + ], + is_singleton=True, + ) + elif isinstance(workflow_ids, Sequence) and workflow_ids and isinstance(workflow_ids[0], dict): + return cls( + [WorkflowVersionIdentifier(entry["version"], entry["workflowExternalId"]) for entry in workflow_ids], + is_singleton=False, + ) + raise TypeError(f"WorkflowIdentifier must be of type str or Sequence[str]. Found {type(workflow_ids)}") + + def assert_singleton(self) -> None: + if not self.is_singleton(): + raise ValueError("Exactly one workflow version must be specified") + + @staticmethod + def unwrap_identifier(identifier: str | dict) -> str | tuple[str, str]: # type: ignore[override] + if isinstance(identifier, str): + return identifier + if "workflowExternalId" in identifier and "version" in identifier: + return identifier["workflowExternalId"], identifier["version"] + raise ValueError(f"{identifier} does not contain both 'workflowExternalId' and 'version''") diff --git a/cognite/client/utils/_session.py b/cognite/client/utils/_session.py new file mode 100644 index 0000000000..85d01f82f6 --- /dev/null +++ b/cognite/client/utils/_session.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from cognite.client.credentials import OAuthClientCertificate +from cognite.client.data_classes.iam import ClientCredentials +from cognite.client.exceptions import CogniteAuthError + +if TYPE_CHECKING: + from cognite.client import CogniteClient + + +def create_session_and_return_nonce( + client: CogniteClient, api_name: str, client_credentials: dict | ClientCredentials | None = None +) -> str: + if client_credentials is None: + if isinstance(client._config.credentials, OAuthClientCertificate): + raise CogniteAuthError(f"Client certificate credentials is not supported with the {api_name}") + elif isinstance(client_credentials, dict): + client_credentials = ClientCredentials(client_credentials["client_id"], client_credentials["client_secret"]) + return client.iam.sessions.create(client_credentials).nonce diff --git a/docs/source/index.rst b/docs/source/index.rst index 7a109afb2a..20e027ff20 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -48,6 +48,7 @@ Contents transformations functions data_modeling + workflow_orchestration deprecated base_data_classes exceptions diff --git a/docs/source/workflow_orchestration.rst b/docs/source/workflow_orchestration.rst new file mode 100644 index 0000000000..e503051cd6 --- /dev/null +++ b/docs/source/workflow_orchestration.rst @@ -0,0 +1,71 @@ +Workflow Orchestration +====================== + +.. warning:: + Workflow Orchestration is experimental and may be subject to breaking changes in future versions without notice. + + +Workflows +------------ +Upsert Workflow +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowAPI.upsert + +Delete Workflow(s) +^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowAPI.delete + +Retrieve Workflow +^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowAPI.retrieve + +List Workflows +^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowAPI.list + + +Workflow Versions +------------------ +Upsert Workflow Version +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.upsert + +Delete Workflow Version(s) +^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.delete + +Retrieve Workflow Version +^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.retrieve + +List Workflow Versions +^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowVersionAPI.list + + +Workflow Executions +-------------------- +List Workflow Executions +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.list + +Retrieve Detailed Workflow Execution +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.retrieve_detailed + +Trigger Workflow Execution +^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowExecutionAPI.trigger + +Workflow Tasks +------------------ +Update Status of Async Task +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automethod:: cognite.client._api.workflows.WorkflowTaskAPI.update + + +Workflow Orchestration data classes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. automodule:: cognite.client.data_classes.workflows + :members: + :show-inheritance: diff --git a/pyproject.toml b/pyproject.toml index 3b2ec26321..aeba3646df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "6.27.0" - +version = "6.28.0" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/scripts/add_capability.py b/scripts/add_capability.py index 3a6f70da99..b47c8ae211 100644 --- a/scripts/add_capability.py +++ b/scripts/add_capability.py @@ -7,13 +7,21 @@ def main(client: CogniteClient): - new_capabilities = [{"geospatialCrsAcl": {"scope": {"all": {}}, "actions": ["READ", "WRITE"]}}] + new_capabilities = [ + { + "experimentAcl": { + "actions": ["USE"], + "scope": {"experimentscope": {"experiments": ["workflowOrchestrator"]}}, + } + }, + ] source_id = "4521b63c-b914-44fe-9f86-f42b17fcb6c1" name = "integration-test-runner" groups = client.iam.groups.list() selected_group = next( - (group for group in groups if group.name == name and group.source_id == source_id and group.capabilities), None + (group for group in groups if group.name == name and group.source_id == source_id and group.capabilities), + None, ) if not selected_group: @@ -24,11 +32,17 @@ def main(client: CogniteClient): TMP_DIR.mkdir(exist_ok=True) (TMP_DIR / f"{selected_group.name}.json").write_text(json.dumps(selected_group.dump(camel_case=True), indent=4)) - available_capabilities = {next(iter(capability.keys())) for capability in selected_group.capabilities} + existing_capability_by_name = { + next(iter(capability.keys())): capability for capability in selected_group.capabilities + } added = [] for new_capability in new_capabilities: (capability_name,) = new_capability.keys() - if capability_name not in available_capabilities: + if capability_name not in existing_capability_by_name: + selected_group.capabilities.append(new_capability) + added.append(capability_name) + elif new_capability[capability_name] != existing_capability_by_name[capability_name]: + # Capability exists, but with different scope or actions selected_group.capabilities.append(new_capability) added.append(capability_name) else: diff --git a/tests/tests_integration/test_api/test_workflows.py b/tests/tests_integration/test_api/test_workflows.py new file mode 100644 index 0000000000..c18f717296 --- /dev/null +++ b/tests/tests_integration/test_api/test_workflows.py @@ -0,0 +1,344 @@ +from __future__ import annotations + +import time + +import pytest + +from cognite.client import CogniteClient +from cognite.client.data_classes import Function +from cognite.client.data_classes.workflows import ( + CDFTaskParameters, + FunctionTaskParameters, + TransformationTaskParameters, + Workflow, + WorkflowDefinitionUpsert, + WorkflowExecutionList, + WorkflowList, + WorkflowTask, + WorkflowUpsert, + WorkflowVersion, + WorkflowVersionList, + WorkflowVersionUpsert, +) +from cognite.client.exceptions import CogniteAPIError + + +@pytest.fixture +def workflow_list(cognite_client: CogniteClient) -> WorkflowList: + workflow1 = WorkflowUpsert( + external_id="integration_test-workflow1", + description="This is workflow for testing purposes", + ) + workflow2 = WorkflowUpsert( + external_id="integration_test-workflow2", + description="This is workflow for testing purposes", + ) + workflows = [workflow1, workflow2] + listed = cognite_client.workflows.list() + existing = listed._external_id_to_item + call_list = False + for workflow in workflows: + if workflow.external_id not in existing: + call_list = True + cognite_client.workflows.upsert(workflow) + if call_list: + return cognite_client.workflows.list() + return listed + + +@pytest.fixture +def workflow_version_list(cognite_client: CogniteClient) -> WorkflowVersionList: + workflow_id = "integration_test-workflow_with_versions" + version1 = WorkflowVersionUpsert( + workflow_external_id=workflow_id, + version="1", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id=f"{workflow_id}-1-task1", + parameters=TransformationTaskParameters( + external_id="None-existing-transformation", + ), + ) + ], + description=None, + ), + ) + version2 = WorkflowVersionUpsert( + workflow_external_id=workflow_id, + version="2", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id=f"{workflow_id}-2-task1", + parameters=CDFTaskParameters( + resource_path="/dummy/no/real/resource/path", + method="GET", + body={"limit": 1}, + ), + ) + ], + description=None, + ), + ) + listed = cognite_client.workflows.versions.list(workflow_version_ids=workflow_id) + existing = {w.version for w in listed} + call_list = False + for version in [version1, version2]: + if version.version not in existing: + call_list = True + cognite_client.workflows.versions.upsert(version) + if call_list: + return cognite_client.workflows.versions.list(workflow_version_ids=workflow_id) + return listed + + +@pytest.fixture(scope="session") +def cdf_function_add(cognite_client: CogniteClient) -> Function: + external_id = "integration_test-workflow-cdf_function_add" + add_function = cognite_client.functions.retrieve(external_id=external_id) + if add_function is not None: + return add_function + + def handle(client, data: dict): + output = data.copy() + output["sum"] = output["a"] + output["b"] + return output + + cognite_client.functions.create(name="Add", external_id=external_id, function_handle=handle) + pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True) + + +@pytest.fixture(scope="session") +def cdf_function_multiply(cognite_client: CogniteClient) -> Function: + external_id = "integration_test-workflow-cdf_function_multiply" + multiply_function = cognite_client.functions.retrieve(external_id=external_id) + if multiply_function is not None: + return multiply_function + + def handle(client, data: dict): + output = data.copy() + output["product"] = output["a"] * output["b"] + return output + + cognite_client.functions.create(name="Multiply", external_id=external_id, function_handle=handle) + pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True) + + +@pytest.fixture +def add_multiply_workflow( + cognite_client: CogniteClient, cdf_function_add: Function, cdf_function_multiply +) -> WorkflowVersion: + workflow_id = "integration_test-workflow-add_multiply" + version = WorkflowVersionUpsert( + workflow_external_id=workflow_id, + version="1", + workflow_definition=WorkflowDefinitionUpsert( + description=None, + tasks=[ + WorkflowTask( + external_id=f"{workflow_id}-1-add", + parameters=FunctionTaskParameters( + external_id=cdf_function_add.external_id, + data={"a": 1, "b": 2}, + ), + ), + WorkflowTask( + external_id=f"{workflow_id}-1-multiply", + parameters=FunctionTaskParameters( + external_id=cdf_function_multiply.external_id, + data={"a": 3, "b": 4}, + is_async_complete=True, + ), + timeout=120, + retries=2, + ), + ], + ), + ) + retrieved = cognite_client.workflows.versions.retrieve(workflow_id, version.version) + if retrieved is not None: + return retrieved + return cognite_client.workflows.versions.upsert(version) + + +@pytest.fixture +def workflow_execution_list( + cognite_client: CogniteClient, add_multiply_workflow: WorkflowVersion +) -> WorkflowExecutionList: + executions = cognite_client.workflows.executions.list(workflow_version_ids=add_multiply_workflow.as_id(), limit=5) + if executions: + return executions + # Creating at least one execution + result = cognite_client.workflows.executions.trigger( + add_multiply_workflow.workflow_external_id, add_multiply_workflow.version, {"a": 5, "b": 6} + ) + t0 = time.time() + while result.status != "completed": + result = cognite_client.workflows.executions.retrieve_detailed(result.id) + cognite_client.workflows.tasks.update(result.executed_tasks[1].id, "completed") + time.sleep(5) + if time.time() - t0 > 60: + raise TimeoutError("Workflow execution did not complete in time") + return cognite_client.workflows.executions.list(workflow_version_ids=add_multiply_workflow.as_id(), limit=5) + + +class TestWorkflows: + def test_upsert_delete(self, cognite_client: CogniteClient) -> None: + workflow = WorkflowUpsert( + external_id="integration_test-test_create_delete", + description="This is ephemeral workflow for testing purposes", + ) + cognite_client.workflows.delete(workflow.external_id, ignore_unknown_ids=True) + + created_workflow: Workflow | None = None + try: + created_workflow = cognite_client.workflows.upsert(workflow) + + assert created_workflow.external_id == workflow.external_id + assert created_workflow.description == workflow.description + assert created_workflow.created_time is not None + finally: + if created_workflow is not None: + cognite_client.workflows.delete(created_workflow.external_id) + + def test_delete_non_existing_raise(self, cognite_client: CogniteClient) -> None: + with pytest.raises(CogniteAPIError) as e: + cognite_client.workflows.delete("integration_test-non_existing_workflow", ignore_unknown_ids=False) + + assert "workflows were not found" in str(e.value) + + def test_delete_non_existing(self, cognite_client: CogniteClient) -> None: + cognite_client.workflows.delete("integration_test-non_existing_workflow", ignore_unknown_ids=True) + + def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_list: WorkflowList) -> None: + retrieved = cognite_client.workflows.retrieve(workflow_list[0].external_id) + + assert retrieved == workflow_list[0] + + def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> None: + non_existing = cognite_client.workflows.retrieve("integration_test-non_existing_workflow") + + assert non_existing is None + + +class TestWorkflowVersions: + def test_upsert_delete(self, cognite_client: CogniteClient) -> None: + version = WorkflowVersionUpsert( + workflow_external_id="integration_test-workflow_versions-test_create_delete", + version="1", + workflow_definition=WorkflowDefinitionUpsert( + tasks=[ + WorkflowTask( + external_id="integration_test-workflow_definitions-test_create_delete-task1", + parameters=FunctionTaskParameters( + external_id="integration_test-workflow_definitions-test_create_delete-task1-function", + data={"a": 1, "b": 2}, + ), + ) + ], + description="This is ephemeral workflow definition for testing purposes", + ), + ) + cognite_client.workflows.versions.delete(version.as_id(), ignore_unknown_ids=True) + + created_version: WorkflowVersion | None = None + try: + created_version = cognite_client.workflows.versions.upsert(version) + + assert created_version.workflow_external_id == version.workflow_external_id + assert created_version.workflow_definition.description == version.workflow_definition.description + assert created_version.workflow_definition.hash is not None + finally: + if created_version is not None: + cognite_client.workflows.versions.delete( + created_version.as_id(), + ) + cognite_client.workflows.delete(created_version.workflow_external_id) + + def test_list_workflow_versions( + self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList + ) -> None: + listed = cognite_client.workflows.versions.list(workflow_version_list.as_ids()) + + assert len(listed) == len(workflow_version_list) + assert listed == workflow_version_list + + def test_list_workflow_version_limit( + self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList + ) -> None: + listed = cognite_client.workflows.versions.list(limit=1) + + assert len(listed) == 1 + + def test_delete_non_existing_raise(self, cognite_client: CogniteClient) -> None: + with pytest.raises(CogniteAPIError) as e: + cognite_client.workflows.versions.delete( + ("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=False + ) + + assert "not found" in str(e.value) + + def test_delete_non_existing(self, cognite_client: CogniteClient) -> None: + cognite_client.workflows.versions.delete( + ("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=True + ) + + def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList) -> None: + retrieve_id = workflow_version_list[0].as_id() + + retrieved = cognite_client.workflows.versions.retrieve(*retrieve_id.as_primitive()) + + assert retrieved == workflow_version_list[0] + + def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) -> None: + non_existing = cognite_client.workflows.versions.retrieve("integration_test-non_existing_workflow", "1") + + assert non_existing is None + + +class TestWorkflowExecutions: + def test_list_workflow_executions( + self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList + ) -> None: + workflow_ids = set(w.as_workflow_id() for w in workflow_execution_list) + + listed = cognite_client.workflows.executions.list( + workflow_version_ids=list(workflow_ids), limit=len(workflow_execution_list) + ) + + assert len(listed) == len(workflow_execution_list) + assert all(w.as_workflow_id() in workflow_ids for w in listed) + + def test_retrieve_workflow_execution_detailed( + self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList + ) -> None: + retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list[0].id) + + assert retrieved.as_execution().dump() == workflow_execution_list[0].dump() + assert retrieved.executed_tasks + + def test_retrieve_non_existing_workflow_execution(self, cognite_client: CogniteClient) -> None: + non_existing = cognite_client.workflows.executions.retrieve_detailed( + "integration_test-non_existing_workflow_execution" + ) + + assert non_existing is None + + def test_trigger_retrieve_detailed_update_update_task( + self, cognite_client: CogniteClient, add_multiply_workflow: WorkflowVersion + ) -> None: + workflow_execution = cognite_client.workflows.executions.trigger( + add_multiply_workflow.workflow_external_id, + add_multiply_workflow.version, + ) + + async_task = add_multiply_workflow.workflow_definition.tasks[1] + assert isinstance(async_task.parameters, FunctionTaskParameters) + assert async_task.parameters.is_async_complete + + workflow_execution_detailed = cognite_client.workflows.executions.retrieve_detailed(workflow_execution.id) + async_task = workflow_execution_detailed.executed_tasks[1] + + async_task = cognite_client.workflows.tasks.update(async_task.id, "completed") + assert async_task.status == "completed" diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py new file mode 100644 index 0000000000..479e30db97 --- /dev/null +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -0,0 +1,73 @@ +from typing import Any + +import pytest + +from cognite.client.data_classes.workflows import ( + CDFTaskOutput, + DynamicTaskOutput, + FunctionTaskOutput, + FunctionTaskParameters, + TransformationTaskOutput, + WorkflowIds, + WorkflowTask, + WorkflowTaskOutput, + WorkflowVersionId, +) + + +class TestWorkflowTaskOutput: + @pytest.mark.parametrize( + "output", + [ + FunctionTaskOutput(call_id=123, function_id=3456, response={"test": 1}), + DynamicTaskOutput( + dynamic_tasks=[ + WorkflowTask(external_id="abc", name="abc", parameters=FunctionTaskParameters(external_id="def")) + ] + ), + CDFTaskOutput(response={"test": 1}, status_code=200), + TransformationTaskOutput(job_id=789), + ], + ) + def test_serialization(self, output: WorkflowTaskOutput): + assert WorkflowTaskOutput.load_output(output.dump(camel_case=True)).dump() == output.dump() + + +class TestWorkflowId: + @pytest.mark.parametrize( + "workflow_id", + [ + WorkflowVersionId(workflow_external_id="abc"), + WorkflowVersionId(workflow_external_id="def", version="3000"), + ], + ) + def test_serialization(self, workflow_id: WorkflowVersionId): + assert WorkflowVersionId._load(workflow_id.dump(camel_case=True)).dump() == workflow_id.dump() + + +class TestWorkflowIds: + @pytest.mark.parametrize( + "resource, expected", + [ + [("abc",), WorkflowIds([WorkflowVersionId("abc")])], + [("abc", "def"), WorkflowIds([WorkflowVersionId("abc", "def")])], + [{"workflowExternalId": "abc"}, WorkflowIds([WorkflowVersionId("abc")])], + [{"workflowExternalId": "abc", "version": "def"}, WorkflowIds([WorkflowVersionId("abc", "def")])], + [WorkflowVersionId("abc"), WorkflowIds([WorkflowVersionId("abc")])], + [["abc", "def"], WorkflowIds([WorkflowVersionId("abc"), WorkflowVersionId("def")])], + [ + [WorkflowVersionId("abc"), WorkflowVersionId("def")], + WorkflowIds([WorkflowVersionId("abc"), WorkflowVersionId("def")]), + ], + [ + WorkflowIds([WorkflowVersionId("abc"), WorkflowVersionId("def")]), + WorkflowIds([WorkflowVersionId("abc"), WorkflowVersionId("def")]), + ], + [ + [("abc", "def"), ("ghi", "jkl")], + WorkflowIds([WorkflowVersionId("abc", "def"), WorkflowVersionId("ghi", "jkl")]), + ], + ], + ) + def test_load(self, resource: Any, expected: WorkflowIds): + assert WorkflowIds._load(resource) == expected diff --git a/tests/tests_unit/test_docstring_examples.py b/tests/tests_unit/test_docstring_examples.py index 9348c6fae9..bef197299e 100644 --- a/tests/tests_unit/test_docstring_examples.py +++ b/tests/tests_unit/test_docstring_examples.py @@ -19,6 +19,7 @@ sequences, three_d, time_series, + workflows, ) from cognite.client._api.data_modeling import containers, data_models, graphql, instances, spaces, views from cognite.client.testing import CogniteClientMock @@ -89,3 +90,6 @@ def test_data_modeling(self): def test_datapoint_subscriptions(self): run_docstring_tests(datapoints_subscriptions) + + def test_workflows(self): + run_docstring_tests(workflows) diff --git a/tests/tests_unit/test_testing.py b/tests/tests_unit/test_testing.py index caccddd6e2..1c1a484532 100644 --- a/tests/tests_unit/test_testing.py +++ b/tests/tests_unit/test_testing.py @@ -3,6 +3,7 @@ import pytest from cognite.client import ClientConfig, CogniteClient +from cognite.client._api.workflows import BetaWorkflowAPIClient from cognite.client._api_client import APIClient from cognite.client.credentials import Token from cognite.client.testing import CogniteClientMock, monkeypatch_cognite_client @@ -12,11 +13,11 @@ def test_ensure_all_apis_are_available_on_cognite_mock(): mocked_apis = all_mock_children(CogniteClientMock()) available = {v.__class__ for v in mocked_apis.values()} - expected = set(all_subclasses(APIClient)) + expected = set(all_subclasses(APIClient)) - {BetaWorkflowAPIClient} # Any new APIs that have not been added to CogniteClientMock? - assert not expected.difference(available) + assert not expected.difference(available), f"Missing APIs: {expected.difference(available)}" # Any removed APIs that are still available on CogniteClientMock? - assert not available.difference(expected) + assert not available.difference(expected), f"Removed APIs: {available.difference(expected)}" def test_ensure_all_apis_use_equal_attr_paths_on_cognite_mock():