Skip to content

Commit

Permalink
Data Workflows: add subworkflow reference possibility
Browse files Browse the repository at this point in the history
  • Loading branch information
VerstraeteBert committed Sep 2, 2024
1 parent b53ada3 commit 2d185a3
Showing 1 changed file with 40 additions and 33 deletions.
73 changes: 40 additions & 33 deletions cognite/client/data_classes/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,57 +314,64 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
("cdfRequest" if camel_case else "cdf_request"): output,
}


class SubworkflowTaskParameters(WorkflowTaskParameters):
"""
The subworkflow task parameters are used to specify a subworkflow task.
When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for conveniece. It takes the tasks parameter which is an array of
function, transformation, and cdf task definitions. This array needs to be statically set on the worklow definition (if it needs to be defined at runtime, use a
dynamic task).
When a workflow is made of stages with dependencies between them, we can use subworkflow tasks for convenience.
It takes either:
- a list of tasks that is statically defined on the workflow definition
- a reference to another workflow which will be loaded in at workflow start time (defined by an external ID and version).
Args:
tasks (list[WorkflowTask]): The tasks belonging to the subworkflow.
tasks (list[WorkflowTask], optional): The tasks belonging to the subworkflow.
workflow_external_id (str, optional): The external ID of the referenced workflow.
version (str, optional): The version of the referenced workflow.
"""

task_type = "subworkflow"

@classmethod
def _load(cls: type[Self], resource: dict, cognite_client: CogniteClient | None = None) -> Self:
subworkflow: dict[str, Any] = resource[cls.task_type]

if subworkflow.get("tasks") is not None:
return SubworkflowTaskTasksListParameters._load(resource, cognite_client)
def __init__(self, tasks: list[WorkflowTask] = None, workflow_external_id: str = None, version: str = None) -> None:
if tasks:
self.tasks = tasks
self.workflow_external_id = None
self.version = None
elif workflow_external_id and version:
self.workflow_external_id = workflow_external_id
self.version = version
self.tasks = None
else:
return SubworkflowTaskReferenceParameters._load(resource, cognite_client)


class SubworkflowTaskReferenceParameters(SubworkflowTaskParameters):
def __init__(self, workflow_external_id: str, workflow_version: str) -> None:
self.workflow_external_id = workflow_external_id
self.workflow_version = workflow_version
raise ValueError("Either 'tasks' or both 'workflow_external_id' and 'version' must be provided.")

@classmethod
def _load(cls: type[Self], resource: dict, cognite_client: CogniteClient | None = None) -> Self:
subworkflow: dict[str, Any] = resource[cls.task_type]

return cls(
workflow_external_id = subworkflow["workflowExternalId"],
version = subworkflow["version"],
)


class SubworkflowTaskTasksListParameters(SubworkflowTaskParameters):
def __init__(self, tasks: list[WorkflowTask]) -> None:
self.tasks = tasks
if "tasks" in subworkflow:
return cls(
tasks=[WorkflowTask._load(task) for task in subworkflow["tasks"]],
)
elif "workflowExternalId" in subworkflow and "version" in subworkflow:
return cls(
workflow_external_id=subworkflow["workflowExternalId"],
version=subworkflow["version"]
)
else:
raise ValueError("Invalid subworkflow parameters provided.")

@classmethod
def _load(cls: type[Self], resource: dict, cognite_client: CogniteClient | None = None) -> Self:
subworkflow: dict[str, Any] = resource[cls.task_type]
def dump(self, camel_case: bool = True) -> dict[str, Any]:
if self.tasks is not None:
return {self.task_type: {"tasks": [task.dump(camel_case) for task in self.tasks]}}
elif self.workflow_external_id and self.version:
return {
self.task_type: {
"workflowExternalId": self.workflow_external_id,
"version": self.version,
}
}
else:
raise ValueError("Either 'tasks' or both 'workflow_external_id' and 'version' must be provided.")

return cls(
[WorkflowTask._load(task) for task in subworkflow["tasks"]],
)

class DynamicTaskParameters(WorkflowTaskParameters):
"""
Expand Down

0 comments on commit 2d185a3

Please sign in to comment.