Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subworkflow tasks to workflows #1445

Merged
merged 14 commits into from
Dec 6, 2023
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ Changes are grouped as follows
- `Removed` for now removed features.
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.3.4] - 2023-11-22
### Added
- Support for `subworkflow` tasks in `workflows`.

## [7.3.3] - 2023-11-22
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.3.3"
__version__ = "7.3.4"
__api_subversion__ = "V20220125"
64 changes: 60 additions & 4 deletions cognite/client/data_classes/workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
from abc import ABC, abstractmethod
from collections import UserList
from collections.abc import Collection
Expand Down Expand Up @@ -96,7 +97,7 @@ def as_external_ids(self) -> list[str]:


class WorkflowTaskParameters(CogniteObject, ABC):
task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic"]]
task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]]
silvavelosa marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def load_parameters(cls, data: dict) -> WorkflowTaskParameters:
Expand All @@ -116,8 +117,12 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters:
return CDFTaskParameters._load(parameters)
elif type_ == "dynamic":
return DynamicTaskParameters._load(parameters)
elif type == "subworkflow":
return SubworkflowTaskParameters._load(parameters)
else:
raise ValueError(f"Unknown task type: {type_}. Expected 'function', 'transformation', 'cdf, or 'dynamic'")
raise ValueError(
f"Unknown task type: {type_}. Expected 'function', 'transformation', 'cdf, 'dynamic' or 'subworkflow'"
)


class FunctionTaskParameters(WorkflowTaskParameters):
Expand Down Expand Up @@ -288,6 +293,37 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
}


class SubworkflowTaskParameters(WorkflowTaskParameters):
"""
The subworkflow task parameters are used to specify a subworkflow task.

When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for conveniece. It takes the tasks parameter which is an array of
function, transformation, and cdf task definitions. This array needs to be statically set on the worklow definition (if it needs to be defined at runtime, use a
dynamic task).

Args:
tasks (list[WorkflowTask]): The tasks belonging to the subworkflow.
"""

task_type = "subworkflow"

def __init__(self, tasks: list[WorkflowTask]) -> None:
self.tasks = tasks

@classmethod
def _load(cls: type[Self], resource: dict | str, cognite_client: CogniteClient | None = None) -> Self:
resource = json.loads(resource) if isinstance(resource, str) else resource
silvavelosa marked this conversation as resolved.
Show resolved Hide resolved

subworkflow: dict[str, Any] = resource[cls.task_type]

return cls(
[WorkflowTask._load(task) for task in subworkflow["tasks"]],
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {self.task_type: {"tasks": [task.dump(camel_case) for task in self.tasks]}}


class DynamicTaskParameters(WorkflowTaskParameters):
"""
The dynamic task parameters are used to specify a dynamic task.
Expand Down Expand Up @@ -377,7 +413,7 @@ def __init__(
self.depends_on = depends_on

@property
def type(self) -> Literal["function", "transformation", "cdf", "dynamic"]:
def type(self) -> Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]:
return self.parameters.task_type

@classmethod
Expand Down Expand Up @@ -433,6 +469,8 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput:
return CDFTaskOutput.load(data)
elif task_type == "dynamic":
return DynamicTaskOutput.load(data)
elif task_type == "subworkflow":
return SubworkflowTaskOutput.load(data)
else:
raise ValueError(f"Unknown task type: {task_type}")

Expand Down Expand Up @@ -539,6 +577,24 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {}


class SubworkflowTaskOutput(WorkflowTaskOutput):
"""
The subworkflow task output is used to specify the output of a subworkflow task.
"""

task_type: ClassVar[str] = "subworkflow"

def __init__(self) -> None:
...

@classmethod
def load(cls, data: dict[str, Any]) -> SubworkflowTaskOutput:
return cls()

def dump(self, camel_case: bool = False) -> dict[str, Any]:
return {}


class WorkflowTaskExecution(CogniteObject):
"""
This class represents a task execution.
Expand Down Expand Up @@ -578,7 +634,7 @@ def __init__(
self.reason_for_incompletion = reason_for_incompletion

@property
def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic"]:
def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]:
return self.input.task_type

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.3.3"
version = "7.3.4"
silvavelosa marked this conversation as resolved.
Show resolved Hide resolved
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
57 changes: 48 additions & 9 deletions tests/tests_integration/test_api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cognite.client.data_classes.workflows import (
CDFTaskParameters,
FunctionTaskParameters,
SubworkflowTaskParameters,
TransformationTaskParameters,
Workflow,
WorkflowDefinitionUpsert,
Expand Down Expand Up @@ -70,13 +71,37 @@ def workflow_version_list(cognite_client: CogniteClient) -> WorkflowVersionList:
workflow_definition=WorkflowDefinitionUpsert(
tasks=[
WorkflowTask(
external_id=f"{workflow_id}-2-task1",
external_id="subworkflow1",
parameters=SubworkflowTaskParameters(
tasks=[
WorkflowTask(
external_id="s1-task1",
parameters=CDFTaskParameters(
resource_path="/dummy/no/real/resource/path",
method="GET",
body={"limit": 1},
),
),
WorkflowTask(
external_id="s1-task2",
parameters=CDFTaskParameters(
resource_path="/dummy/no/real/resource/path",
method="GET",
body={"limit": 1},
),
),
]
),
),
WorkflowTask(
external_id="task1",
parameters=CDFTaskParameters(
resource_path="/dummy/no/real/resource/path",
method="GET",
body={"limit": 1},
),
)
depends_on=["subworkflow1"],
),
],
description=None,
),
Expand Down Expand Up @@ -106,7 +131,10 @@ def handle(client, data: dict):
return output

cognite_client.functions.create(name="Add", external_id=external_id, function_handle=handle)
pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True)
pytest.skip(
"Function need to be redeployed, skipping tests that need it",
allow_module_level=True,
)
silvavelosa marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture(scope="session")
Expand All @@ -122,7 +150,10 @@ def handle(client, data: dict):
return output

cognite_client.functions.create(name="Multiply", external_id=external_id, function_handle=handle)
pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True)
pytest.skip(
"Function need to be redeployed, skipping tests that need it",
allow_module_level=True,
)


@pytest.fixture
Expand Down Expand Up @@ -171,7 +202,9 @@ def workflow_execution_list(
return executions
# Creating at least one execution
result = cognite_client.workflows.executions.trigger(
add_multiply_workflow.workflow_external_id, add_multiply_workflow.version, {"a": 5, "b": 6}
add_multiply_workflow.workflow_external_id,
add_multiply_workflow.version,
{"a": 5, "b": 6},
)
t0 = time.time()
while result.status != "completed":
Expand Down Expand Up @@ -274,14 +307,16 @@ def test_list_workflow_version_limit(
def test_delete_non_existing_raise(self, cognite_client: CogniteClient) -> None:
with pytest.raises(CogniteAPIError) as e:
cognite_client.workflows.versions.delete(
("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=False
("integration_test-non_existing_workflow_version", "1"),
ignore_unknown_ids=False,
)

assert "not found" in str(e.value)

def test_delete_non_existing(self, cognite_client: CogniteClient) -> None:
cognite_client.workflows.versions.delete(
("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=True
("integration_test-non_existing_workflow_version", "1"),
ignore_unknown_ids=True,
)

def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList) -> None:
Expand All @@ -299,7 +334,9 @@ def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) ->

class TestWorkflowExecutions:
def test_list_workflow_executions(
self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList
self,
cognite_client: CogniteClient,
workflow_execution_list: WorkflowExecutionList,
) -> None:
workflow_ids = set(w.as_workflow_id() for w in workflow_execution_list)

Expand All @@ -311,7 +348,9 @@ def test_list_workflow_executions(
assert all(w.as_workflow_id() in workflow_ids for w in listed)

def test_retrieve_workflow_execution_detailed(
self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList
self,
cognite_client: CogniteClient,
workflow_execution_list: WorkflowExecutionList,
) -> None:
retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list[0].id)

Expand Down