Skip to content

Commit

Permalink
fix: Update map reduce logic
Browse files Browse the repository at this point in the history
  • Loading branch information
whiterabbit1983 committed Aug 22, 2024
1 parent 30814b5 commit a84cfcd
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions agents-api/agents_api/workflows/task_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import timedelta
from typing import Any

from pydantic import RootModel
from simpleeval import simple_eval
from temporalio import workflow
from temporalio.exceptions import ApplicationError
Expand All @@ -29,6 +30,7 @@
TransitionTarget,
WaitForInputStep,
Workflow,
WorkflowStep,
YieldStep,
)
from ..common.protocol.tasks import (
Expand Down Expand Up @@ -66,6 +68,8 @@
# IfElseWorkflowStep: task_steps.if_else_step,
}

GenericStep = RootModel[WorkflowStep]


@workflow.defn
class TaskExecutionWorkflow:
Expand Down Expand Up @@ -260,14 +264,18 @@ async def transition(**kwargs) -> None:
args=foreach_args,
)

case MapReduceStep(reduce=reduce, initial=initial), StepOutcome(
output=items
):
case MapReduceStep(
map=map_defn, reduce=reduce, initial=initial
), StepOutcome(output=items):
for i, item in enumerate(items):
workflow_name = f"`{context.cursor.workflow}`[{context.cursor.step}].mapreduce[{i}]"
map_reduce_task = execution_input.task.model_copy()
# TODO: set steps
map_reduce_task.workflows = [Workflow(name=workflow_name, steps=[])]
defn_dict = map_defn.model_dump()
defn_dict.pop("over")
step_defn = GenericStep(**defn_dict)
map_reduce_task.workflows = [
Workflow(name=workflow_name, steps=[step_defn])
]

# Create a new execution input
map_reduce_execution_input = execution_input.model_copy()
Expand Down

0 comments on commit a84cfcd

Please sign in to comment.