Skip to content

Commit

Permalink
add subworkflow tasks to workflows (#1445)
Browse files Browse the repository at this point in the history
Co-authored-by: Anders Albert <[email protected]>
Co-authored-by: Håkon V. Treider <[email protected]>
  • Loading branch information
3 people authored Dec 6, 2023
1 parent 8fec922 commit 3ae8328
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ Changes are grouped as follows
- `Removed` for now removed features.
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.5.3] - 2023-12-06
### Added
- Support for `subworkflow` tasks in `workflows`.

## [7.5.2] - 2023-12-05
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.5.2"
__version__ = "7.5.3"
__api_subversion__ = "V20220125"
2 changes: 2 additions & 0 deletions cognite/client/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@
DynamicTaskParameters,
FunctionTaskOutput,
FunctionTaskParameters,
SubworkflowTaskParameters,
TransformationTaskOutput,
TransformationTaskParameters,
Workflow,
Expand Down Expand Up @@ -413,6 +414,7 @@
"TransformationTaskParameters",
"CDFTaskParameters",
"DynamicTaskParameters",
"SubworkflowTaskParameters",
"FunctionTaskOutput",
"TransformationTaskOutput",
"CDFTaskOutput",
Expand Down
62 changes: 58 additions & 4 deletions cognite/client/data_classes/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,11 @@ def as_external_ids(self) -> list[str]:
return [workflow.external_id for workflow in self.data]


ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]


class WorkflowTaskParameters(CogniteObject, ABC):
task_type: ClassVar[Literal["function", "transformation", "cdf", "dynamic"]]
task_type: ClassVar[ValidTaskType]

@classmethod
def load_parameters(cls, data: dict) -> WorkflowTaskParameters:
Expand All @@ -116,8 +119,10 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters:
return CDFTaskParameters._load(parameters)
elif type_ == "dynamic":
return DynamicTaskParameters._load(parameters)
elif type_ == "subworkflow":
return SubworkflowTaskParameters._load(parameters)
else:
raise ValueError(f"Unknown task type: {type_}. Expected 'function', 'transformation', 'cdf, or 'dynamic'")
raise ValueError(f"Unknown task type: {type_}. Expected {ValidTaskType}")


class FunctionTaskParameters(WorkflowTaskParameters):
Expand Down Expand Up @@ -288,6 +293,35 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
}


class SubworkflowTaskParameters(WorkflowTaskParameters):
"""
The subworkflow task parameters are used to specify a subworkflow task.
When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for conveniece. It takes the tasks parameter which is an array of
function, transformation, and cdf task definitions. This array needs to be statically set on the worklow definition (if it needs to be defined at runtime, use a
dynamic task).
Args:
tasks (list[WorkflowTask]): The tasks belonging to the subworkflow.
"""

task_type = "subworkflow"

def __init__(self, tasks: list[WorkflowTask]) -> None:
self.tasks = tasks

@classmethod
def _load(cls: type[Self], resource: dict, cognite_client: CogniteClient | None = None) -> Self:
subworkflow: dict[str, Any] = resource[cls.task_type]

return cls(
[WorkflowTask._load(task) for task in subworkflow["tasks"]],
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {self.task_type: {"tasks": [task.dump(camel_case) for task in self.tasks]}}


class DynamicTaskParameters(WorkflowTaskParameters):
"""
The dynamic task parameters are used to specify a dynamic task.
Expand Down Expand Up @@ -377,7 +411,7 @@ def __init__(
self.depends_on = depends_on

@property
def type(self) -> Literal["function", "transformation", "cdf", "dynamic"]:
def type(self) -> ValidTaskType:
return self.parameters.task_type

@classmethod
Expand Down Expand Up @@ -433,6 +467,8 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput:
return CDFTaskOutput.load(data)
elif task_type == "dynamic":
return DynamicTaskOutput.load(data)
elif task_type == "subworkflow":
return SubworkflowTaskOutput.load(data)
else:
raise ValueError(f"Unknown task type: {task_type}")

Expand Down Expand Up @@ -539,6 +575,24 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {}


class SubworkflowTaskOutput(WorkflowTaskOutput):
"""
The subworkflow task output is used to specify the output of a subworkflow task.
"""

task_type: ClassVar[str] = "subworkflow"

def __init__(self) -> None:
...

@classmethod
def load(cls, data: dict[str, Any]) -> SubworkflowTaskOutput:
return cls()

def dump(self, camel_case: bool = False) -> dict[str, Any]:
return {}


class WorkflowTaskExecution(CogniteObject):
"""
This class represents a task execution.
Expand Down Expand Up @@ -578,7 +632,7 @@ def __init__(
self.reason_for_incompletion = reason_for_incompletion

@property
def task_type(self) -> Literal["function", "transformation", "cdf", "dynamic"]:
def task_type(self) -> ValidTaskType:
return self.input.task_type

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.5.2"
version = "7.5.3"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
67 changes: 55 additions & 12 deletions tests/tests_integration/test_api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cognite.client.data_classes.workflows import (
CDFTaskParameters,
FunctionTaskParameters,
SubworkflowTaskParameters,
TransformationTaskParameters,
Workflow,
WorkflowDefinitionUpsert,
Expand Down Expand Up @@ -70,13 +71,37 @@ def workflow_version_list(cognite_client: CogniteClient) -> WorkflowVersionList:
workflow_definition=WorkflowDefinitionUpsert(
tasks=[
WorkflowTask(
external_id=f"{workflow_id}-2-task1",
external_id="subworkflow1",
parameters=SubworkflowTaskParameters(
tasks=[
WorkflowTask(
external_id="s1-task1",
parameters=CDFTaskParameters(
resource_path="/dummy/no/real/resource/path",
method="GET",
body={"limit": 1},
),
),
WorkflowTask(
external_id="s1-task2",
parameters=CDFTaskParameters(
resource_path="/dummy/no/real/resource/path",
method="GET",
body={"limit": 1},
),
),
]
),
),
WorkflowTask(
external_id="task1",
parameters=CDFTaskParameters(
resource_path="/dummy/no/real/resource/path",
method="GET",
body={"limit": 1},
),
)
depends_on=["subworkflow1"],
),
],
description=None,
),
Expand Down Expand Up @@ -122,7 +147,10 @@ def handle(client, data: dict):
return output

cognite_client.functions.create(name="Multiply", external_id=external_id, function_handle=handle)
pytest.skip("Function need to be redeployed, skipping tests that need it", allow_module_level=True)
pytest.skip(
"Function need to be redeployed, skipping tests that need it",
allow_module_level=True,
)


@pytest.fixture
Expand Down Expand Up @@ -171,7 +199,9 @@ def workflow_execution_list(
return executions
# Creating at least one execution
result = cognite_client.workflows.executions.trigger(
add_multiply_workflow.workflow_external_id, add_multiply_workflow.version, {"a": 5, "b": 6}
add_multiply_workflow.workflow_external_id,
add_multiply_workflow.version,
{"a": 5, "b": 6},
)
t0 = time.time()
while result.status != "completed":
Expand Down Expand Up @@ -230,10 +260,17 @@ def test_upsert_delete(self, cognite_client: CogniteClient) -> None:
workflow_definition=WorkflowDefinitionUpsert(
tasks=[
WorkflowTask(
external_id="integration_test-workflow_definitions-test_create_delete-task1",
parameters=FunctionTaskParameters(
external_id="integration_test-workflow_definitions-test_create_delete-task1-function",
data={"a": 1, "b": 2},
external_id="integration_test-workflow_definitions-test_create_delete-subworkflow1",
parameters=SubworkflowTaskParameters(
tasks=[
WorkflowTask(
external_id="integration_test-workflow_definitions-test_create_delete-task1",
parameters=FunctionTaskParameters(
external_id="integration_test-workflow_definitions-test_create_delete-task1-function",
data={"a": 1, "b": 2},
),
)
]
),
)
],
Expand Down Expand Up @@ -274,14 +311,16 @@ def test_list_workflow_version_limit(
def test_delete_non_existing_raise(self, cognite_client: CogniteClient) -> None:
with pytest.raises(CogniteAPIError) as e:
cognite_client.workflows.versions.delete(
("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=False
("integration_test-non_existing_workflow_version", "1"),
ignore_unknown_ids=False,
)

assert "not found" in str(e.value)

def test_delete_non_existing(self, cognite_client: CogniteClient) -> None:
cognite_client.workflows.versions.delete(
("integration_test-non_existing_workflow_version", "1"), ignore_unknown_ids=True
("integration_test-non_existing_workflow_version", "1"),
ignore_unknown_ids=True,
)

def test_retrieve_workflow(self, cognite_client: CogniteClient, workflow_version_list: WorkflowVersionList) -> None:
Expand All @@ -299,7 +338,9 @@ def test_retrieve_non_existing_workflow(self, cognite_client: CogniteClient) ->

class TestWorkflowExecutions:
def test_list_workflow_executions(
self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList
self,
cognite_client: CogniteClient,
workflow_execution_list: WorkflowExecutionList,
) -> None:
workflow_ids = set(w.as_workflow_id() for w in workflow_execution_list)

Expand All @@ -311,7 +352,9 @@ def test_list_workflow_executions(
assert all(w.as_workflow_id() in workflow_ids for w in listed)

def test_retrieve_workflow_execution_detailed(
self, cognite_client: CogniteClient, workflow_execution_list: WorkflowExecutionList
self,
cognite_client: CogniteClient,
workflow_execution_list: WorkflowExecutionList,
) -> None:
retrieved = cognite_client.workflows.executions.retrieve_detailed(workflow_execution_list[0].id)

Expand Down

0 comments on commit 3ae8328

Please sign in to comment.