Skip to content

Commit

Permalink
fix(engine): Evaluate expressions on non-loop child workflow path (Tr…
Browse files Browse the repository at this point in the history
  • Loading branch information
daryllimyt authored Aug 16, 2024
1 parent c74236d commit 8ea2e3c
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 0 deletions.
160 changes: 160 additions & 0 deletions tests/unit/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,3 +467,163 @@ async def test_child_workflow_success(
retry_policy=retry_policies["workflow:fail_fast"],
)
assert result == expected


@pytest.mark.asyncio
async def test_child_workflow_context_passing(temporal_cluster, test_user):
# Setup
test_name = "test_child_workflow_context_passing"
wf_exec_id = generate_test_exec_id(test_name)
role = Role(
type="service",
user_id=test_user.id,
service_id="tracecat-runner",
)
ctx_role.set(role)

# Child
child_dsl = DSLInput(
**{
"entrypoint": {"expects": {}, "ref": "reshape_parent_data"},
"actions": [
{
"ref": "reshape_parent_data",
"action": "core.transform.reshape",
"args": {
"value": {
"parent_data": "${{ TRIGGER.data_from_parent }}",
},
},
"depends_on": [],
"description": "",
}
],
"config": {"enable_runtime_tests": False, "scheduler": "dynamic"},
"description": "Testing child workflow",
"inputs": {},
"returns": None,
"tests": [],
"title": "Aug 16, 2024, 13:44:37",
"triggers": [],
}
)

# Create a definition for the child workflow
async with get_async_session_context_manager() as session:
# Create the child workflow
mgmt_service = WorkflowsManagementService(session, role=role)
child_res = await mgmt_service.create_workflow_from_dsl(
child_dsl.model_dump(), skip_secret_validation=True
)
child_workflow = child_res.workflow
if not child_workflow:
raise ValueError("Child workflow not created")
_ = child_workflow.actions
child_dsl = DSLInput.from_workflow(child_workflow)

# Commit the child workflow
defn_service = WorkflowDefinitionsService(session, role=role)
await defn_service.create_workflow_definition(
workflow_id=child_workflow.id, dsl=child_dsl
)

# Parent
parent_workflow_id = "wf-00000000000000000000000000000002"
parent_dsl = DSLInput(
**{
"title": "Parent",
"description": "Test parent workflow can pass context to child",
"entrypoint": {
"ref": "parent_first_action",
},
"actions": [
{
"ref": "parent_first_action",
"action": "core.transform.reshape",
"args": {
"value": {
"reshaped_data": "${{ TRIGGER.data }}",
},
},
"depends_on": [],
"description": "",
},
{
"ref": "parent_second_action",
"action": "core.workflow.execute",
"args": {
"workflow_id": child_workflow.id,
"trigger_inputs": {
"data_from_parent": "Parent sent child ${{ ACTIONS.parent_first_action.result.reshaped_data }}", # This is the parent's trigger data
},
},
"depends_on": ["parent_first_action"],
"description": "",
},
],
"config": {
"enable_runtime_tests": False,
"scheduler": "dynamic",
},
"inputs": {},
"returns": None,
"tests": [],
"triggers": [],
}
)
run_args = DSLRunArgs(
dsl=parent_dsl,
role=role,
wf_id=parent_workflow_id,
trigger_inputs={
"data": "__EXPECTED_DATA__",
},
)

queue = os.environ["TEMPORAL__CLUSTER_QUEUE"]
client = await get_temporal_client()
async with Worker(
client,
task_queue=queue,
activities=DSLActivities.load() + [get_workflow_definition_activity],
workflows=[DSLWorkflow],
workflow_runner=new_sandbox_runner(),
):
result = await client.execute_workflow(
DSLWorkflow.run,
run_args,
id=wf_exec_id,
task_queue=queue,
retry_policy=retry_policies["workflow:fail_fast"],
)
# Parent expected
expected = {
"ACTIONS": {
"parent_first_action": {
"result": {
"reshaped_data": "__EXPECTED_DATA__",
},
"result_typename": "dict",
},
"parent_second_action": {
"result": {
"ACTIONS": {
"reshape_parent_data": {
"result": {
"parent_data": "Parent sent child __EXPECTED_DATA__"
},
"result_typename": "dict",
}
},
"INPUTS": {},
"TRIGGER": {
"data_from_parent": "Parent sent child __EXPECTED_DATA__"
},
},
"result_typename": "dict",
},
},
"INPUTS": {},
"TRIGGER": {"data": "__EXPECTED_DATA__"},
}
assert result == expected
4 changes: 4 additions & 0 deletions tracecat/dsl/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ async def execute_task(self, task: ActionStatement) -> None:
logger.info("Begin task execution", task_ref=task.ref)
# Check for a child workflow
if self._should_execute_child_workflow(task):
# NOTE: We don't support (nor recommend, unless a use case is justified) passing SECRETS to child workflows
# 1. Prepare the child workflow
logger.trace("Preparing child workflow")
child_run_args_data = await self._prepare_child_workflow(task)
Expand Down Expand Up @@ -447,6 +448,9 @@ async def execute_task(self, task: ActionStatement) -> None:
"Executing child workflow",
dsl_run_args=child_run_args,
)

args = eval_templated_object(task.args, operand=self.context)
child_run_args.trigger_inputs = args.get("trigger_inputs", {})
action_result = await self._run_child_workflow(child_run_args)
else:
# NOTE: We should check for loop iteration here.
Expand Down

0 comments on commit 8ea2e3c

Please sign in to comment.