Skip to content

Commit

Permalink
[CDF-19483] Workflows skip tasks (#1455)
Browse files Browse the repository at this point in the history
Adds the `onFailure` field to `WorkflowTasks`. Supports `abortWorkflow` and `skipTask`.
Behaviour: 
   - `skipTask`: for both failures and timeouts it will retry until the retries are exhausted, then the Task is marked as `COMPLETED_WITH_ERRORS` and the next tasks are executed.
   - `abortWorkflow`:
       -  in case of Failures, retries will be done until exhausted, then the task is marked as `FAILED` and the Workflow is marked the same.
       - in case of a Timeout, no retries will be performed, the task is marked as `TIMED_OUT` and the Workflow is marked as `FAILED`
  • Loading branch information
VerstraeteBert authored Oct 30, 2023
1 parent 1b20f3d commit d1f22ea
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [6.38.0] - 2023-10-30
### Added
- Support `onFailure` property in Workflows, allowing marking Tasks as optional in a Workflow.

## [6.37.0] - 2023-10-27
### Added
- Support for `type` property in `NodeApply` and `Node`.
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "6.37.0"
__version__ = "6.38.0"
__api_subversion__ = "V20220125"
9 changes: 8 additions & 1 deletion cognite/client/data_classes/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ class WorkflowTask(CogniteResource):
description (str | None): The description of the task. Defaults to None.
retries (int): The number of retries for the task. Defaults to 3.
timeout (int): The timeout of the task in seconds. Defaults to 3600.
on_failure (Literal["abortWorkflow", "skipTask"]): The policy to handle failures and timeouts. Defaults to *abortWorkflow*.\n
* *skipTask*: For both failures and timeouts, the task will retry until the retries are exhausted. After that, the Task is marked as COMPLETED_WITH_ERRORS and the subsequent tasks are executed.\n
* *abortWorkflow*: In case of failures, retries will be performed until exhausted. After which the task is marked as FAILED and the Workflow is marked the same. In the event of a timeout, no retries are undertaken; the task is marked as TIMED_OUT and the Workflow is marked as FAILED.
depends_on (list[str] | None): The external ids of the tasks that this task depends on. Defaults to None.
"""

Expand All @@ -352,6 +355,7 @@ def __init__(
description: str | None = None,
retries: int = 3,
timeout: int = 3600,
on_failure: Literal["abortWorkflow", "skipTask"] = "abortWorkflow",
depends_on: list[str] | None = None,
) -> None:
self.external_id = external_id
Expand All @@ -360,6 +364,7 @@ def __init__(
self.description = description
self.retries = retries
self.timeout = timeout
self.on_failure = on_failure
self.depends_on = depends_on

@property
Expand All @@ -377,16 +382,18 @@ def _load(cls, resource: dict | str, cognite_client: CogniteClient | None = None
# Allow default to come from the API.
retries=resource.get("retries"), # type: ignore[arg-type]
timeout=resource.get("timeout"), # type: ignore[arg-type]
on_failure=resource["onFailure"],
depends_on=[dep["externalId"] for dep in resource.get("dependsOn", [])] or None,
)

def dump(self, camel_case: bool = False) -> dict[str, Any]:
output: dict[str, Any] = {
("externalId" if camel_case else "external_id"): self.external_id,
"externalId" if camel_case else "external_id": self.external_id,
"type": self.type,
"parameters": self.parameters.dump(camel_case),
"retries": self.retries,
"timeout": self.timeout,
"onFailure" if camel_case else "on_failure": self.on_failure,
}
if self.name:
output["name"] = self.name
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "6.37.0"
version = "6.38.0"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
29 changes: 29 additions & 0 deletions tests/tests_unit/test_data_classes/data/workflow_execution.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
},
"retries": 2,
"timeout": 300,
"onFailure": "abortWorkflow",
"dependsOn": []
},
{
Expand All @@ -32,6 +33,7 @@
},
"retries": 0,
"timeout": 3600,
"onFailure": "skipTask",
"dependsOn": [
{
"externalId": "testTaskDispatcher"
Expand All @@ -49,6 +51,7 @@
},
"retries": 0,
"timeout": 3600,
"onFailure": "abortWorkflow",
"dependsOn": [
{
"externalId": "testTaskDispatcher"
Expand All @@ -69,6 +72,7 @@
},
"retries": 2,
"timeout": 300,
"onFailure": "abortWorkflow",
"dependsOn": [
{
"externalId": "testMatrixCalculation"
Expand Down Expand Up @@ -116,6 +120,7 @@
},
"retries": 2,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -137,6 +142,7 @@
},
"retries": 1,
"timeout": 3600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -153,6 +159,7 @@
},
"retries": 2,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -174,6 +181,7 @@
},
"retries": 1,
"timeout": 3600,
"onFailure": "abortWorkflow",
"type": "function"
}
],
Expand All @@ -192,6 +200,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -208,6 +217,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand Down Expand Up @@ -240,6 +250,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -256,6 +267,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -272,6 +284,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -288,6 +301,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -304,6 +318,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -320,6 +335,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
}
],
Expand Down Expand Up @@ -357,6 +373,7 @@
},
"retries": 2,
"timeout": 600,
"onFailure": "skipTask",
"type": "function"
},
{
Expand All @@ -378,6 +395,7 @@
},
"retries": 1,
"timeout": 3600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -394,6 +412,7 @@
},
"retries": 2,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -415,6 +434,7 @@
},
"retries": 1,
"timeout": 3600,
"onFailure": "abortWorkflow",
"type": "function"
}
]
Expand Down Expand Up @@ -446,6 +466,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -462,6 +483,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -478,6 +500,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -494,6 +517,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -510,6 +534,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -526,6 +551,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -542,6 +568,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -558,6 +585,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
},
{
Expand All @@ -574,6 +602,7 @@
},
"retries": 1,
"timeout": 600,
"onFailure": "abortWorkflow",
"type": "function"
}
]
Expand Down
2 changes: 2 additions & 0 deletions tests/tests_unit/test_data_classes/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,15 @@ def test_definition_parsed_correctly(self, execution_data: dict):
),
retries=2,
timeout=300,
on_failure="abortWorkflow",
),
WorkflowTask(
external_id="applicationExecution",
description="Run a collection of preprocessor and app runs concurrently",
parameters=DynamicTaskParameters(tasks="${testTaskDispatcher.output.response.testTasks}"),
retries=0,
timeout=3600,
on_failure="skipTask",
depends_on=["testTaskDispatcher"],
),
]
Expand Down

0 comments on commit d1f22ea

Please sign in to comment.