Skip to content

Commit

Permalink
Workflows: dynamic task fixes (#1395)
Browse files Browse the repository at this point in the history
Co-authored-by: Dmytro Donukis <[email protected]>
Co-authored-by: Dmytro Donukis <[email protected]>
  • Loading branch information
3 people authored Oct 4, 2023
1 parent 39c2228 commit 72b474c
Show file tree
Hide file tree
Showing 7 changed files with 1,211 additions and 65 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [6.28.5] - 2023-10-03
### Fixed
- Bugfix for serialization of Workflows' `DynamicTasksParameters` during `workflows.versions.upsert` and `workflows.execution.retrieve_detailed`

## [6.28.4] - 2023-10-03
### Fixed
- Overload data_set/create for improved type safety
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def upsert(self, version: WorkflowVersionUpsert, mode: Literal["replace"] = "rep
>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import WorkflowVersionUpsert, WorkflowDefinitionUpsert, WorkflowTask, FunctionTaskParameters
>>> c = CogniteClient()
>>> new_version =WorkflowVersionUpsert(
>>> new_version = WorkflowVersionUpsert(
... workflow_external_id="my_workflow",
... version="1",
... workflow_definition=WorkflowDefinitionUpsert(
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "6.28.4"
__version__ = "6.28.5"
__api_subversion__ = "V20220125"
143 changes: 91 additions & 52 deletions cognite/client/data_classes/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def as_external_ids(self) -> list[str]:


class WorkflowTaskParameters(CogniteResource, ABC):
task_type: ClassVar[str]
task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic"]]

@classmethod
def load_parameters(cls, data: dict) -> WorkflowTaskParameters:
Expand Down Expand Up @@ -124,7 +124,7 @@ class FunctionTaskParameters(WorkflowTaskParameters):
Args:
external_id (str): The external ID of the function to be called.
data (dict | str | None): The data to be passed to the function. Defaults to None. The data can be used to specify the input to the function from previous tasks or the workflow input. See the tip below for more information.
is_async_complete (bool): Whether the function is asynchronous. Defaults to False.
is_async_complete (bool | None): Whether the function is asynchronous. Defaults to None, which the API will interpret as False.
If a function is asynchronous, you need to call the client.workflows.tasks.update() endpoint to update the status of the task.
While synchronous tasks update the status automatically.
Expand All @@ -136,6 +136,7 @@ class FunctionTaskParameters(WorkflowTaskParameters):
- `${workflow.input}`: The workflow input.
- `${<taskExternalId>.output}`: The output of the task with the given external id.
- `${<taskExternalId>.input}`: The input of the task with the given external id.
- `${<taskExternalId>.input.someKey}`: A specific key within the input of the task with the given external id.
For example, if you have a workflow containing two tasks, and the external_id of the first task is `task1` then,
you can specify the data for the second task as follows:
Expand All @@ -154,13 +155,13 @@ class FunctionTaskParameters(WorkflowTaskParameters):
... )
"""

task_type: ClassVar[str] = "function"
task_type = "function"

def __init__(
self,
external_id: str,
data: dict | str | None = None,
is_async_complete: bool = False,
is_async_complete: bool | None = None,
) -> None:
self.external_id = external_id
self.data = data
Expand All @@ -174,8 +175,7 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None
return cls(
external_id=function["externalId"],
data=function.get("data"),
# Allow default to come from the API.
is_async_complete=resource.get("isAsyncComplete"), # type: ignore[arg-type]
is_async_complete=resource.get("isAsyncComplete") or resource.get("asyncComplete"),
)

def dump(self, camel_case: bool = False) -> dict[str, Any]:
Expand All @@ -187,8 +187,9 @@ def dump(self, camel_case: bool = False) -> dict[str, Any]:

output: dict[str, Any] = {
"function": function,
"isAsyncComplete": self.is_async_complete,
}
if self.is_async_complete is not None:
output[("isAsyncComplete" if camel_case else "is_async_complete")] = self.is_async_complete
return output


Expand All @@ -201,7 +202,7 @@ class TransformationTaskParameters(WorkflowTaskParameters):
"""

task_type: ClassVar[str] = "transformation"
task_type = "transformation"

def __init__(self, external_id: str) -> None:
self.external_id = external_id
Expand Down Expand Up @@ -244,7 +245,7 @@ class CDFTaskParameters(WorkflowTaskParameters):
"""

task_type: ClassVar[str] = "cdf"
task_type = "cdf"

def __init__(
self,
Expand Down Expand Up @@ -272,29 +273,59 @@ def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient |
def dump(self, camel_case: bool = False) -> dict[str, Any]:
output = super().dump(camel_case)
return {
("cdfRequest" if camel_case else "cdfRequest"): output,
("cdfRequest" if camel_case else "cdf_request"): output,
}


class DynamicTaskParameters(WorkflowTaskParameters):
"""
The dynamic task parameters are used to specify a dynamic task.
When the tasks and their order of execution are determined at runtime, we use dynamic tasks. It takes the tasks parameter,
which is an array of function, transformation, and cdf task definitions.
This array should then be generated and returned by a previous step in the workflow, for instance,
When the tasks and their order of execution are determined at runtime, we use dynamic tasks. It takes the tasks parameter which is a Reference to
an array of function, transformation, and cdf task definitions. This array should be generated and returned by a previous step in the workflow, for instance,
a Cognite Function task.
Args:
dynamic (list[WorkflowTask] | str): The dynamic task to be called. The dynamic task is a string that is evaluated
during the workflow's execution.
Tip:
You can reference data from other tasks or the workflow. You do this by following the format
`${prefix.jsonPath}` in the expression. Some valid option are:
- `${workflow.input}`: The workflow input.
- `${<taskExternalId>.output}`: The output of the task with the given external id.
- `${<taskExternalId>.input}`: The input of the task with the given external id.
- `${<taskExternalId>.input.someKey}`: A specific key within the input of the task with the given external id.
Args:
tasks (list[WorkflowTask] | str): The tasks to be dynamically executed. The dynamic task is a string that is evaluated
during the workflow's execution. When calling Version Upsert, the tasks parameter must be a Reference string.
When calling Execution details, the tasks parameter will be a list of WorkflowTask objects.
"""

task_type: ClassVar[str] = "dynamic"
task_type = "dynamic"

def __init__(self, tasks: list[WorkflowTask] | str) -> None:
self.tasks = tasks

@classmethod
def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient | None = None) -> Self:
resource = json.loads(resource) if isinstance(resource, str) else resource

dynamic: dict[str, Any] = resource[cls.task_type]

def __init__(self, dynamic: list[WorkflowTask] | str) -> None:
self.dynamic = dynamic
# can either be a reference string (i.e., in case of WorkflowDefinitions)
if isinstance(dynamic["tasks"], str):
return cls(dynamic["tasks"])

# or can be resolved to a list of Tasks (i.e., during or after execution)
return cls(
[WorkflowTask._load(task) for task in dynamic["tasks"]],
)

def dump(self, camel_case: bool = False) -> dict[str, Any]:
return {
self.task_type: {
"tasks": self.tasks if isinstance(self.tasks, str) else [task.dump(camel_case) for task in self.tasks]
}
}


class WorkflowTask(CogniteResource):
Expand Down Expand Up @@ -331,6 +362,10 @@ def __init__(
self.timeout = timeout
self.depends_on = depends_on

@property
def type(self) -> Literal["function", "transformation", "cdf", "dynamic"]:
return self.parameters.task_type

@classmethod
def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowTask:
resource = json.loads(resource) if isinstance(resource, str) else resource
Expand All @@ -342,15 +377,13 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None
# Allow default to come from the API.
retries=resource.get("retries"), # type: ignore[arg-type]
timeout=resource.get("timeout"), # type: ignore[arg-type]
depends_on=[dep["externalId"] for dep in resource.get("depends_on", [])] or None,
depends_on=[dep["externalId"] for dep in resource.get("dependsOn", [])] or None,
)

def dump(self, camel_case: bool = False) -> dict[str, Any]:
type_ = self.parameters.task_type

output: dict[str, Any] = {
("externalId" if camel_case else "external_id"): self.external_id,
"type": type_,
"type": self.type,
"parameters": self.parameters.dump(camel_case),
"retries": self.retries,
"timeout": self.timeout,
Expand Down Expand Up @@ -418,12 +451,9 @@ def load(cls, data: dict[str, Any]) -> FunctionTaskOutput:

def dump(self, camel_case: bool = False) -> dict[str, Any]:
return {
"output": {
("callId" if camel_case else "call_id"): self.call_id,
("functionId" if camel_case else "function_id"): self.function_id,
"response": self.response,
},
("taskType" if camel_case else "task_type"): self.task_type,
("callId" if camel_case else "call_id"): self.call_id,
("functionId" if camel_case else "function_id"): self.function_id,
"response": self.response,
}


Expand All @@ -446,10 +476,7 @@ def load(cls, data: dict[str, Any]) -> TransformationTaskOutput:
return cls(output["jobId"])

def dump(self, camel_case: bool = False) -> dict[str, Any]:
return {
"output": {("jobId" if camel_case else "job_id"): self.job_id},
("taskType" if camel_case else "task_type"): self.task_type,
}
return {("jobId" if camel_case else "job_id"): self.job_id}


class CDFTaskOutput(WorkflowTaskOutput):
Expand All @@ -474,37 +501,27 @@ def load(cls, data: dict[str, Any]) -> CDFTaskOutput:

def dump(self, camel_case: bool = False) -> dict[str, Any]:
return {
"output": {
"response": self.response,
("statusCode" if camel_case else "status_code"): self.status_code,
},
("taskType" if camel_case else "task_type"): self.task_type,
"response": self.response,
("statusCode" if camel_case else "status_code"): self.status_code,
}


class DynamicTaskOutput(WorkflowTaskOutput):
"""
The dynamic task output is used to specify the output of a dynamic task.
Args:
dynamic_tasks (list[WorkflowTask]): The dynamic tasks to be created on the fly.
"""

task_type: ClassVar[str] = "dynamic"

def __init__(self, dynamic_tasks: list[WorkflowTask]) -> None:
self.dynamic_tasks = dynamic_tasks
def __init__(self) -> None:
...

@classmethod
def load(cls, data: dict[str, Any]) -> DynamicTaskOutput:
output = data["output"]
return cls([WorkflowTask._load(task) for task in output["dynamicTasks"]])
return cls()

def dump(self, camel_case: bool = False) -> dict[str, Any]:
return {
"output": {"dynamicTasks": [task.dump(camel_case) for task in self.dynamic_tasks]},
("taskType" if camel_case else "task_type"): self.task_type,
}
return {}


class WorkflowTaskExecution(CogniteResource):
Expand All @@ -521,7 +538,6 @@ class WorkflowTaskExecution(CogniteResource):
start_time (int | None): The start time of the task execution. Unix timestamp in milliseconds. Defaults to None.
end_time (int | None): The end time of the task execution. Unix timestamp in milliseconds. Defaults to None.
reason_for_incompletion (str | None): Provides the reason if the workflow did not complete successfully. Defaults to None.
"""

def __init__(
Expand All @@ -546,6 +562,10 @@ def __init__(
self.end_time = end_time
self.reason_for_incompletion = reason_for_incompletion

@property
def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic"]:
return self.input.task_type

@classmethod
def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowTaskExecution:
resource = json.loads(resource) if isinstance(resource, str) else resource
Expand All @@ -563,8 +583,15 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None

def dump(self, camel_case: bool = False) -> dict[str, Any]:
output: dict[str, Any] = super().dump(camel_case)
task_type_key = "taskType" if camel_case else "task_type"
output[task_type_key] = self.output.task_type
output["input"] = self.input.dump(camel_case)
output["status"] = self.status.upper()
output[("taskType" if camel_case else "task_type")] = self.task_type
# API uses isAsyncComplete and asyncComplete inconsistently:
if self.task_type == "function":
if (is_async_complete := output["input"].get("isAsyncComplete")) is not None:
output["input"]["asyncComplete"] = is_async_complete
del output["input"]["isAsyncComplete"]

output["output"] = self.output.dump(camel_case)
return output

Expand Down Expand Up @@ -814,6 +841,8 @@ class WorkflowExecutionDetailed(WorkflowExecution):
start_time (int | None): The start time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.
end_time (int | None): The end time of the workflow execution. Unix timestamp in milliseconds. Defaults to None.
reason_for_incompletion (str | None): Provides the reason if the workflow did not complete successfully. Defaults to None.
input (dict | None): Input arguments the workflow was triggered with.
metadata (dict | None): Metadata set when the workflow was triggered.
"""

def __init__(
Expand All @@ -828,12 +857,16 @@ def __init__(
start_time: int | None = None,
end_time: int | None = None,
reason_for_incompletion: str | None = None,
input: dict | None = None,
metadata: dict | None = None,
) -> None:
super().__init__(
id, workflow_external_id, status, created_time, version, start_time, end_time, reason_for_incompletion
)
self.workflow_definition = workflow_definition
self.executed_tasks = executed_tasks
self.input = input
self.metadata = metadata

@classmethod
def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None) -> WorkflowExecutionDetailed:
Expand All @@ -852,6 +885,8 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None
reason_for_incompletion=resource.get("reasonForIncompletion"),
workflow_definition=WorkflowDefinition._load(resource["workflowDefinition"]),
executed_tasks=[WorkflowTaskExecution._load(task) for task in resource["executedTasks"]],
input=resource.get("input"),
metadata=resource.get("metadata"),
)

def dump(self, camel_case: bool = False) -> dict[str, Any]:
Expand All @@ -862,6 +897,10 @@ def dump(self, camel_case: bool = False) -> dict[str, Any]:
output[("executedTasks" if camel_case else "executed_tasks")] = [
task.dump(camel_case) for task in self.executed_tasks
]
if self.input:
output["input"] = self.input
if self.metadata:
output["metadata"] = self.metadata
return output

def as_execution(self) -> WorkflowExecution:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "6.28.4"
version = "6.28.5"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
Loading

0 comments on commit 72b474c

Please sign in to comment.