From 301ec1579205bead599b397650fc38401eec9363 Mon Sep 17 00:00:00 2001 From: elijahbenizzy Date: Tue, 22 Oct 2024 15:03:38 -0700 Subject: [PATCH] Implemets parallelism capabilities according to spec --- burr/core/application.py | 44 +- burr/core/graph.py | 5 +- burr/core/parallelism.py | 706 +++++++++++++ burr/core/state.py | 7 + docs/concepts/parallelism.rst | 2 + docs/reference/index.rst | 1 + docs/reference/integrations/index.rst | 1 + docs/reference/parallelism.rst | 27 + .../recursive/application_parallel_feature.py | 231 +++++ examples/recursive/statemachine.png | Bin 21716 -> 21885 bytes tests/core/test_parallelism.py | 945 ++++++++++++++++++ 11 files changed, 1964 insertions(+), 5 deletions(-) create mode 100644 burr/core/parallelism.py create mode 100644 docs/reference/parallelism.rst create mode 100644 examples/recursive/application_parallel_feature.py create mode 100644 tests/core/test_parallelism.py diff --git a/burr/core/application.py b/burr/core/application.py index 30c92b9a..f9aa54a2 100644 --- a/burr/core/application.py +++ b/burr/core/application.py @@ -7,6 +7,7 @@ import logging import pprint import uuid +from concurrent.futures import Executor, ThreadPoolExecutor from contextlib import AbstractContextManager from typing import ( TYPE_CHECKING, @@ -497,6 +498,7 @@ class ApplicationContext(AbstractContextManager): partition_key: Optional[str] sequence_id: Optional[int] tracker: Optional["TrackingClient"] + parallel_executor_factory: Callable[[], Executor] @staticmethod def get() -> Optional["ApplicationContext"]: @@ -683,6 +685,10 @@ def post_run_step( StreamResultType = TypeVar("StreamResultType", bound=Union[dict, Any]) +def _create_default_executor() -> Executor: + return ThreadPoolExecutor() + + class Application(Generic[ApplicationStateType]): def __init__( self, @@ -697,6 +703,7 @@ def __init__( fork_parent_pointer: Optional[burr_types.ParentPointer] = None, spawning_parent_pointer: Optional[burr_types.ParentPointer] = None, tracker: Optional["TrackingClient"] = None, + parallel_executor_factory: Optional[Executor] = None, ): """Instantiates an Application. This is an internal API -- use the builder! @@ -731,6 +738,11 @@ def __init__( self._set_sequence_id(sequence_id) self._builder = builder self._parent_pointer = fork_parent_pointer + self._parallel_executor_factory = ( + parallel_executor_factory + if parallel_executor_factory is not None + else _create_default_executor + ) self._dependency_factory = { "__tracer": functools.partial( visibility.tracing.TracerFactory, @@ -780,6 +792,7 @@ def _context_factory(self, action: Action, sequence_id: int) -> ApplicationConte tracker=self._tracker, partition_key=self._partition_key, sequence_id=sequence_id, + parallel_executor_factory=self._parallel_executor_factory, ) def _step( @@ -862,7 +875,7 @@ def _process_inputs(self, inputs: Dict[str, Any], action: Action) -> Dict[str, A BASE_ERROR_MESSAGE + f"Inputs starting with a double underscore ({starting_with_double_underscore}) " f"are reserved for internal use/injected inputs." - "Please do not use keys" + "Please do not directly pass keys starting with a double underscore." ) inputs = inputs.copy() processed_inputs = {} @@ -1922,6 +1935,7 @@ def __init__(self): self.graph_builder = None self.prebuilt_graph = None self.typing_system = None + self._parallel_executor_factory = None def with_identifiers( self, app_id: str = None, partition_key: str = None, sequence_id: int = None @@ -2015,6 +2029,33 @@ def with_graph(self, graph: Graph) -> "ApplicationBuilder[StateType]": self.prebuilt_graph = graph return self + def with_parallel_executor(self, executor_factory: lambda: Executor): + """Assigns a default executor to be used for recursive/parallel sub-actions. This effectively allows + for executing multiple Burr apps in parallel. See https://burr.dagworks.io/pull/concepts/parallelism/ + for more details. + + This will default to a simple threadpool executor, meaning that you will be bound by the number of threads + your computer can handle. If you want to use a more advanced executor, you can pass it in here -- any subclass + of concurrent.futures.Executor will work. + + If you specify executors for specific tasks, this will default to that. + + Note that, if you are using asyncio, you cannot specify an executor. It will default to using + asyncio.gather with asyncio's event loop. + + :param executor: + :return: + """ + if self._parallel_executor_factory is not None: + raise ValueError( + BASE_ERROR_MESSAGE + + "You have already set an executor. You cannot set multiple executors. Current executor is:" + f"{self._parallel_executor_factory}" + ) + + self._parallel_executor_factory = executor_factory + return self + def _ensure_no_prebuilt_graph(self): if self.prebuilt_graph is not None: raise ValueError( @@ -2365,4 +2406,5 @@ def build(self) -> Application[StateType]: if self.spawn_from_app_id is not None else None ), + parallel_executor_factory=self._parallel_executor_factory, ) diff --git a/burr/core/graph.py b/burr/core/graph.py index ff70a4b1..61bc4961 100644 --- a/burr/core/graph.py +++ b/burr/core/graph.py @@ -31,7 +31,6 @@ def _validate_actions(actions: Optional[List[Action]]): def _validate_transitions( transitions: Optional[List[Tuple[str, str, Condition]]], actions: Set[str] ): - assert_set(transitions, "_transitions", "with_transitions") exhausted = {} # items for which we have seen a default transition for from_, to, condition in transitions: if from_ not in actions: @@ -235,7 +234,7 @@ class GraphBuilder: def __init__(self): """Initializes the graph builder.""" - self.transitions: Optional[List[Tuple[str, str, Condition]]] = None + self.transitions: Optional[List[Tuple[str, str, Condition]]] = [] self.actions: Optional[List[Action]] = None def with_actions( @@ -283,8 +282,6 @@ def with_transitions( :param transitions: Transitions to add :return: The application builder for future chaining. """ - if self.transitions is None: - self.transitions = [] for transition in transitions: from_, to_, *conditions = transition if len(conditions) > 0: diff --git a/burr/core/parallelism.py b/burr/core/parallelism.py new file mode 100644 index 00000000..47d5f071 --- /dev/null +++ b/burr/core/parallelism.py @@ -0,0 +1,706 @@ +import abc +import asyncio +import dataclasses +import inspect +from typing import Any, AsyncGenerator, Callable, Dict, Generator, List, Tuple, TypeVar, Union + +from burr.core import Action, Application, ApplicationBuilder, ApplicationContext, Graph, State +from burr.core.action import SingleStepAction +from burr.core.graph import GraphBuilder + +SubgraphType = Union[Action, Callable, "RunnableGraph"] + + +@dataclasses.dataclass +class RunnableGraph: + """Contains a graph with information it needs to run. + This is a bit more than a graph -- we have entrypoints + halt_after points. + This is the core element of a recursive action -- your recursive generators can yield these + (as well as actions/functions, which both get turned into single-node graphs...) + """ + + graph: Graph + entrypoint: str + halt_after: List[str] + + @staticmethod + def create(from_: SubgraphType) -> "RunnableGraph": + """Creates a RunnableGraph from a callable/action. This will create a single-node runnable graph, + so we can wrap it up in a task. + + :param from_: Callable or Action to wrap + :return: RunnableGraph + """ + if isinstance(from_, RunnableGraph): + return from_ + if isinstance(from_, Action): + assert ( + from_.name is not None + ), "Action must have a name to be run, internal error, reach out to devs" + graph = GraphBuilder().with_actions(from_).build() + (action,) = graph.actions + return RunnableGraph(graph=graph, entrypoint=action.name, halt_after=[action.name]) + + +@dataclasses.dataclass +class SubGraphTask: + """Task to run a subgraph. Has runtime-spefici information, like inputs, state, and + the application ID. This is the lower-level component -- the user will only directly interact + with this if they use the TaskBasedParallelAction interface, which produces a generator of these. + """ + + graph: RunnableGraph + inputs: Dict[str, Any] + state: State + application_id: str + + def _create_app(self, parent_context: ApplicationContext) -> Application: + return ( + ApplicationBuilder() + .with_graph(self.graph.graph) + .with_entrypoint(self.graph.entrypoint) + .with_state(self.state) + .with_spawning_parent( + app_id=parent_context.app_id, + sequence_id=parent_context.sequence_id, + partition_key=parent_context.partition_key, + ) + .with_tracker(parent_context.tracker.copy()) # We have to copy + # TODO -- handle persistence... + .with_identifiers( + app_id=self.application_id, + partition_key=parent_context.partition_key, # cascade the partition key + ) + .build() + ) + + def run( + self, + parent_context: ApplicationContext, + ) -> State: + """Runs the task -- this simply executes it b y instantiating a sub-application""" + app = self._create_app(parent_context) + action, result, state = app.run( + halt_after=self.graph.halt_after, + inputs={key: value for key, value in self.inputs.items() if not key.startswith("__")}, + ) + return state + + async def arun(self, parent_context: ApplicationContext): + app = self._create_app(parent_context) + action, result, state = await app.arun( + halt_after=self.graph.halt_after, + inputs={key: value for key, value in self.inputs.items() if not key.startswith("__")}, + ) + return state + + +def _stable_app_id_hash(app_id: str, child_key: str) -> str: + """Gives a stable hash for an application. Given the parent app_id and a child key, + this will give a hash that will be stable across runs. + + :param app_id: + :param additional_key: + :return: + """ + ... + + +class TaskBasedParallelAction(SingleStepAction): + """The base class for actions that run a set of tasks in parallel and reduce the results. + This is more power-user mode -- if you need fine-grained control over the set of tasks + your parallel action utilizes, then this is for you. If not, you'll want to see: + + - :py:class:`MapActionsAndStates` -- a cartesian product of actions/states + - :py:class:`MapActions` -- a map of actions over a single state + - :py:class:`MapStates` -- a map of a single action over multiple states + + If you're unfamiliar about where to start, you'll want to see the docs on :ref:`parallelism `. + + This is responsible for two things: + + 1. Creating a set of tasks to run in parallel + 2. Reducing the results of those tasks into a single state for the action to return. + + The following example shows how to call a set of prompts over a set of different models in parallel and return the result. + + .. code-block:: python + + from burr.core import action, state, ApplicationContext + from burr.core.parallelism import MapStates, RunnableGraph + from typing import Callable, Generator, List + + @action(reads=["prompt", "model"], writes=["llm_output"]) + def query_llm(state: State, model: str) -> State: + # TODO -- implement _query_my_llm to call litellm or something + return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) + + class MultipleTaskExample(TaskBasedParallelAction): + def tasks(state: State, context: ApplicationContext) -> Generator[SubGraphTask, None, None]: + for prompt in state["prompts"]: + for action in [ + query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), + query_llm.bind(model="o1").with_name("o1_answer"), + query_llm.bind(model="claude").with_name("claude_answer"), + ] + yield SubGraphTask( + action=action, # can be a RunnableGraph as well + state=state.update(prompt=prompt), + inputs={}, + # stable hash -- up to you to ensure uniqueness + application_id=hashlib.sha256(context.application_id + action.name + prompt).hexdigest(), + # a few other parameters we might add -- see advanced usage -- failure conditions, etc... + ) + + def reduce(self, states: Generator[State, None, None]) -> State: + all_llm_outputs = [] + for state in states: + all_llm_outputs.append( + { + "output" : state["llm_output"], + "model" : state["model"], + "prompt" : state["prompt"], + } + ) + return state.update(all_llm_outputs=all_llm_outputs) + """ + + def __init__(self): + super().__init__() + + def run_and_update(self, state: State, **run_kwargs) -> Tuple[dict, State]: + """Runs and updates. This is not user-facing, so do not override it. + This runs all actions in parallel (using the supplied executor, from the context), + and then reduces the results. + + :param state: Input state + :param run_kwargs: Additional inputs (runtime inputs) + :return: The results, updated state tuple. The results are empty, but we may add more in the future. + """ + + def _run_and_update(): + context: ApplicationContext = run_kwargs.get("__context") + if context is None: + raise ValueError("This action requires a context to run") + state_without_internals = state.wipe( + delete=[item for item in state.keys() if item.startswith("__")] + ) + task_generator = self.tasks(state_without_internals, context, run_kwargs) + + def execute_task(task): + return task.run(run_kwargs["__context"]) + + with context.parallel_executor_factory() as executor: + # Directly map the generator to the executor + results = list(executor.map(execute_task, task_generator)) + + def state_generator() -> Generator[Any, None, None]: + yield from results + + return {}, self.reduce(state_without_internals, state_generator()) + + async def _arun_and_update(): + context: ApplicationContext = run_kwargs.get("__context") + if context is None: + raise ValueError("This action requires a context to run") + state_without_internals = state.wipe( + delete=[item for item in state.keys() if item.startswith("__")] + ) + task_generator = self.tasks(state_without_internals, context, run_kwargs) + + # TODO -- run in parallel + async def state_generator(): + """This makes it easier on the user -- if they don't have an async generator we can still exhause it + This way we run through all of the task generators. These correspond to the task generation capabilities above (the map*/task generation stuff) + """ + if inspect.isasyncgen(task_generator): + coroutines = [task.arun(context) async for task in task_generator] + else: + coroutines = [task.arun(context) for task in task_generator] + results = await asyncio.gather(*coroutines) + # TODO -- yield in order... + for result in results: + yield result + + return {}, await self.reduce(state_without_internals, state_generator()) + + if self.is_async(): + return _arun_and_update() # type: ignore + return _run_and_update() + + def is_async(self) -> bool: + """This says whether or not the action is async. Note you have to override this if you have async tasks + and want to use asyncio.gather on them. Otherwise leave this blank. + + :return: Whether or not the action is async + """ + return False + + @property + def inputs(self) -> Union[list[str], tuple[list[str], list[str]]]: + """Inputs from this -- if you want to override you'll want to call super() + first so you get these inputs. + + :return: the list of inputs that will populate kwargs. + """ + return ["__context"] # TODO -- add any additional input + + @abc.abstractmethod + def tasks( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[SubGraphTask, None, None]: + """Creates all tasks that this action will run, given the state/inputs. + This produces a generator of SubGraphTasks that will be run in parallel. + + :param state: State prior to action's execution + :param context: Context for the action + :yield: SubGraphTasks to run + """ + pass + + @abc.abstractmethod + def reduce(self, state: State, states: Generator[State, None, None]) -> State: + """Reduces the states from the tasks into a single state. + + :param states: State outputs from the subtasks + :return: Reduced state + """ + pass + + @property + @abc.abstractmethod + def writes(self) -> list[str]: + pass + + @property + @abc.abstractmethod + def reads(self) -> list[str]: + pass + + +class MapActionsAndStates(TaskBasedParallelAction): + """Base class to run a cartesian product of actions x states. + + For example, if you want to run the following: + + - n prompts + - m models + + This will make it easy to do. If you need fine-grained control, you can use the :py:class:`TaskBasedParallelAction`, + which allows you to specify the tasks individually. If you just want to vary actions/states (and not both), use + :py:class:`MapActions` or :py:class:`MapStates` implementations. + + The following shows how to run a set of prompts over a set of models in parallel and return the results. + + .. code-block:: python + + from burr.core import action, state + from burr.core.parallelism import MapActionsAndStates, RunnableGraph + from typing import Callable, Generator, List + + @action(reads=["prompt", "model"], writes=["llm_output"]) + def query_llm(state: State, model: str) -> State: + # TODO -- implement _query_my_llm to call litellm or something + return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) + + class TestModelsOverPrompts(MapActionsAndStates): + + def actions(self, state: State) -> Generator[Action | Callable | RunnableGraph, None, None]: + # make sure to add a name to the action + # This is not necessary for subgraphs, as actions will already have names + for action in [ + query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), + query_llm.bind(model="o1").with_name("o1_answer"), + query_llm.bind(model="claude").with_name("claude_answer"), + ] + yield action + + def states(self, state: State) -> Generator[State, None, None]: + for prompt in [ + "What is the meaning of life?", + "What is the airspeed velocity of an unladen swallow?", + "What is the best way to cook a steak?", + ]: + yield state.update(prompt=prompt) + + def reduce(self, states: Generator[State, None, None]) -> State: + all_llm_outputs = [] + for state in states: + all_llm_outputs.append( + { + "output" : state["llm_output"], + "model" : state["model"], + "prompt" : state["prompt"], + } + ) + return state.update(all_llm_outputs=all_llm_outputs) + + def reads() -> List[str]: + return ["prompts"] + + def writes() -> List[str]: + return ["all_llm_outputs"] + + """ + + @abc.abstractmethod + def actions( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[SubgraphType, None, None]: + """Yields actions to run in parallel. These will be merged with the states as a cartesian product. + + :param state: Input state at the time of running the "parent" action. + :param inputs: Runtime Inputs to the action + :return: Generator of actions to run + """ + pass + + @abc.abstractmethod + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + """Yields states to run in parallel. These will be merged with the actions as a cartesian product. + + :param state: Input state at the time of running the "parent" action. + :param context: Context for the action + :param inputs: Runtime Inputs to the action + :return: Generator of states to run + """ + pass + + def tasks( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[SubGraphTask, None, None]: + """Takes the cartesian product of actions and states, creating tasks for each. + + :param state: Input state at the time of running the "parent" action. + :param context: Context for the action + :param inputs: Runtime Inputs to the action + :return: Generator of tasks to run + """ + for i, action in enumerate(self.actions(state, context, inputs)): + for j, state in enumerate(self.states(state, context, inputs)): + key = f"{i}-{j}" # this is a stable hash for now but will not handle caching + # TODO -- allow for custom hashes that will indicate stability (user is responsible) + yield SubGraphTask( + graph=RunnableGraph.create(action), + inputs=inputs, + state=state, + application_id=_stable_app_id_hash(context.app_id, key), + ) + + @abc.abstractmethod + def reduce(self, state: State, states: Generator[State, None, None]) -> State: + """Reduces the states from the tasks into a single state. + + :param states: State outputs from the subtasks + :return: Reduced state + """ + pass + + +class MapActions(MapActionsAndStates, abc.ABC): + """Base class to run a set of actions over the same state. Actions can be functions (decorated with @action), + action objects, or subdags implemented as :py:class:`RunnableGraph` objects. With this, you can do the following: + + 1. Specify the actions to run + 2. Specify the state to run the actions over + 3. Reduce the results into a single state + + This is useful, for example, to run different LLMs over the same set of prompts, + + Here is an example (with some pseudocode) of doing just that: + + .. code-block:: python + + from burr.core import action, state + from burr.core.parallelism import MapActions, RunnableGraph + from typing import Callable, Generator, List + + @action(reads=["prompt", "model"], writes=["llm_output"]) + def query_llm(state: State, model: str) -> State: + # TODO -- implement _query_my_llm to call litellm or something + return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) + + class TestMultipleModels(MapActions): + + def actions(self, state: State) -> Generator[Action | Callable | RunnableGraph, None, None]: + # Make sure to add a name to the action if you use bind() with a function, + # note that these can be different actions, functions, etc... + # in this case we're using `.bind()` to create multiple actions, but we can use some mix of + # subgraphs, functions, action objects, etc... + for action in [ + query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), + query_llm.bind(model="o1").with_name("o1_answer"), + query_llm.bind(model="claude").with_name("claude_answer"), + ] + yield action + + def state(self, state: State) -> State: + return state.update(prompt="What is the meaning of life?") + + def reduce(self, states: Generator[State, None, None]) -> State: + all_llm_outputs = [] + for state in states: + all_llm_outputs.append(state["llm_output"]) + return state.update(all_llm_outputs=all_llm_outputs) + + def reads() -> List[str]: + return ["prompt"] # we're just running this on a single prompt, for multiple actions + + def writes() -> List[str]: + return ["all_llm_outputs"] + + """ + + @abc.abstractmethod + def actions( + self, state: State, inputs: Dict[str, Any], context: ApplicationContext + ) -> Generator[SubgraphType, None, None]: + """Gives all actions to map over, given the state/inputs. + + :param state: State at the time of running the action + :param inputs: Runtime Inputs to the action + :param context: Context for the action + :return: Generator of actions to run + """ + + @abc.abstractmethod + def state(self, state: State, inputs: Dict[str, Any]): + """Gives the state for each of the actions + + :param state: State at the time of running the action + :param inputs: Runtime inputs to the action + :return: State for the action + """ + pass + + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + """Just converts the state into a generator of 1, so we can use the superclass. This is internal.""" + yield self.state(state, inputs) + + @abc.abstractmethod + def reduce(self, state: State, states: Generator[State, None, None]) -> State: + """Reduces the task's results into a single state. Runs through all outputs + and combines them together, to form the final state for the action. + + :param states: State outputs from the subtasks + :return: Reduced state + """ + pass + + +class MapStates(MapActionsAndStates, abc.ABC): + """Base class to run a single action over a set of states. States are given as + updates (manipulations) of the action's input state, specified by the `states` + generator. + + With this, you can do the following: + + 1. Specify the states to run + 2. Specify the action to run over all the states + 3. Reduce the results into a single state + + This is useful, for example, to run different prompts over the same LLM, + + Here is an example (with some pseudocode) of doing just that: + + .. code-block:: python + + from burr.core import action, state + from burr.core.parallelism import MapStates, RunnableGraph + from typing import Callable, Generator, List + + @action(reads=["prompt"], writes=["llm_output"]) + def query_llm(state: State) -> State: + return state.update(llm_output=_query_my_llm(prompt=state["prompt"])) + + class TestMultiplePrompts(MapStates): + + def action(self) -> Action | Callable | RunnableGraph: + # make sure to add a name to the action + # This is not necessary for subgraphs, as actions will already have names + return query_llm.with_name("query_llm") + + def states(self, state: State) -> Generator[State, None, None]: + # You could easily have a list_prompts upstream action that writes to "prompts" in state + # And loop through those + # This hardcodes for simplicity + for prompt in [ + "What is the meaning of life?", + "What is the airspeed velocity of an unladen swallow?", + "What is the best way to cook a steak?", + ]: + yield state.update(prompt=prompt) + + + def reduce(self, states: Generator[State, None, None]) -> State: + all_llm_outputs = [] + for state in states: + all_llm_outputs.append(state["llm_output"]) + return state.update(all_llm_outputs=all_llm_outputs) + + def reads() -> List[str]: + return ["prompts"] + + def writes() -> List[str]: + return ["all_llm_outputs"] + """ + + @abc.abstractmethod + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + """Generates all states to map over, given the state and inputs. + Each state will be an update to the input state. + + For instance, you may want to take an input state that has a list field, and expand it + into a set of states, each with a different value from the list. + + For example: + + .. code-block:: python + + def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]) -> Generator[State, None, None]: + for item in state["multiple_fields"]: + yield state.update(individual_field=item) + + :param state: Initial state + :param context: Context for the action + :param inputs: Runtime inputs to the action + :return: Generator of states to run + """ + pass + + @abc.abstractmethod + def action(self, state: State, inputs: Dict[str, Any]) -> SubgraphType: + """The single action to apply to each state. + This can be a function (decorated with `@action`, action object, or subdag). + + :param state: State to run the action over + :param inputs: Runtime inputs to the action + :return: Action to run + """ + pass + + def actions( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[SubgraphType, None, None]: + """Maps the action over each state generated by the `states` method. + Internally used, do not implement.""" + yield self.action(state, inputs) + + @abc.abstractmethod + def reduce(self, state: State, results: Generator[State, None, None]) -> State: + """Reduces the task's results + + :param results: + :return: + """ + pass + + +GenType = TypeVar("GenType") +ReturnType = TypeVar("ReturnType") + +SyncOrAsyncGenerator = Union[Generator[GenType, None, None], AsyncGenerator[GenType, None]] +SyncOrAsyncGeneratorOrItemOrList = Union[SyncOrAsyncGenerator[GenType], List[GenType], GenType] + + +class PassThroughMapActionsAndStates(MapActionsAndStates): + def __init__( + self, + action: Union[ + SubgraphType, + List[SubgraphType], + Callable[ + [State, ApplicationContext, Dict[str, Any]], SyncOrAsyncGenerator[SubgraphType] + ], + ], + state: Callable[[State, ApplicationContext, Dict[str, Any]], SyncOrAsyncGenerator[State]], + reducer: Callable[[State, SyncOrAsyncGenerator[State]], State], + reads: List[str], + writes: List[str], + inputs: List[str], + ): + super().__init__() + self._action_or_generator = action + self._state_or_generator = state + self._reducer = reducer + self._reads = reads + self._writes = writes + self._inputs = inputs + + def actions( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[SubgraphType, None, None]: + if isinstance(self._action_or_generator, list): + for action in self._action_or_generator: + yield action + return + if isinstance(self._action_or_generator, SubgraphType): + yield self._action_or_generator + else: + gen = self._action_or_generator(state, context, inputs) + if inspect.isasyncgen(gen): + + async def gen(): + async for item in self._action_or_generator(state, context, inputs): + yield item + + return gen() + else: + yield from self._action_or_generator(state, context, inputs) + + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + gen = self._state_or_generator(state, context, inputs) + if isinstance(gen, State): + yield gen + if inspect.isasyncgen(gen): + + async def gen(): + async for item in self._state_or_generator(state, context, inputs): + yield item + + return gen() + else: + yield from gen + + def reduce(self, state: State, states: SyncOrAsyncGenerator[State]) -> State: + return self._reducer(state, states) + + @property + def writes(self) -> list[str]: + return self._writes + + @property + def reads(self) -> list[str]: + return self._reads + + +def map_reduce_action( + # action: Optional[SubgraphType]=None, + action: Union[ + SubgraphType, + List[SubgraphType], + Callable[ + [State, ApplicationContext, Dict[str, Any]], + SyncOrAsyncGeneratorOrItemOrList[SubgraphType], + ], + ], + state: Callable[ + [State, ApplicationContext, Dict[str, Any]], SyncOrAsyncGeneratorOrItemOrList[State] + ], + reducer: Callable[[State, SyncOrAsyncGenerator[State]], State], + reads: List[str], + writes: List[str], + inputs: List[str], +): + """Experimental API for creating a map-reduce action easily. We'll be improving this.""" + return PassThroughMapActionsAndStates( + action=action, state=state, reducer=reducer, reads=reads, writes=writes, inputs=inputs + ) diff --git a/burr/core/state.py b/burr/core/state.py index addad13e..85e89a8d 100644 --- a/burr/core/state.py +++ b/burr/core/state.py @@ -428,6 +428,13 @@ def subset(self, *keys: str, ignore_missing: bool = True) -> "State[StateType]": ) def __getitem__(self, __k: str) -> Any: + if __k not in self._state: + raise KeyError( + f"Key \"{__k}\" not found in state. Keys state knows about are: {[key for key in self._state.keys() if not key.startswith('__')]}. " + "If you hit this within the context of an application, you want to " + "(a) ensure that an upstream action has produced this state/it is set as an initial state value and " + "(b) ensure that your action declares this as a read key." + ) return self._state[__k] def __len__(self) -> int: diff --git a/docs/concepts/parallelism.rst b/docs/concepts/parallelism.rst index 476904c8..e30791ab 100644 --- a/docs/concepts/parallelism.rst +++ b/docs/concepts/parallelism.rst @@ -1,3 +1,5 @@ +.. _parallelism: + =========== Parallelism =========== diff --git a/docs/reference/index.rst b/docs/reference/index.rst index f6e96eec..1deeb28e 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -21,6 +21,7 @@ need functionality that is not publicly exposed, please open an issue and we can tracking visibility lifecycle + parallelism typing integrations/index telemetry diff --git a/docs/reference/integrations/index.rst b/docs/reference/integrations/index.rst index 0d15cda1..08ae18af 100644 --- a/docs/reference/integrations/index.rst +++ b/docs/reference/integrations/index.rst @@ -13,3 +13,4 @@ Integrations -- we will be adding more traceloop langchain pydantic + haystack diff --git a/docs/reference/parallelism.rst b/docs/reference/parallelism.rst new file mode 100644 index 00000000..a21ec51e --- /dev/null +++ b/docs/reference/parallelism.rst @@ -0,0 +1,27 @@ +.. _parallelismref: + +=========== +Parallelism +=========== + +Tools to make sub-actions/sub-graphs easier to work with. Read the docs on :ref:`parallelism` for more information. + +.. autoclass:: burr.core.parallelism.RunnableGraph + :members: + +.. autoclass:: burr.core.parallelism.SubGraphTask + :members: + +.. autoclass:: burr.core.parallelism.TaskBasedParallelAction + :members: + +.. autoclass:: burr.core.parallelism.MapActionsAndStates + :members: + +.. autoclass:: burr.core.parallelism.MapActions + :members: + +.. autoclass:: burr.core.parallelism.MapStates + :members: + +.. automethod:: burr.core.parallelism.map_reduce_action diff --git a/examples/recursive/application_parallel_feature.py b/examples/recursive/application_parallel_feature.py new file mode 100644 index 00000000..0ce4e52a --- /dev/null +++ b/examples/recursive/application_parallel_feature.py @@ -0,0 +1,231 @@ +from concurrent.futures import ProcessPoolExecutor +from typing import Any, Callable, Dict, Generator, List, Tuple, Union + +import openai + +from burr.core import Action, Application, ApplicationBuilder, Condition, State, action +from burr.core.application import ApplicationContext +from burr.core.graph import GraphBuilder +from burr.core.parallelism import MapStates, RunnableGraph + + +# full agent +def _query_llm(prompt: str) -> str: + """Simple wrapper around the OpenAI API.""" + client = openai.Client() + return ( + client.chat.completions.create( + model="gpt-4o", + messages=[ + {"role": "system", "content": "You are a helpful assistant"}, + {"role": "user", "content": prompt}, + ], + ) + .choices[0] + .message.content + ) + + +@action( + reads=["feedback", "current_draft", "poem_type", "prompt"], + writes=["current_draft", "draft_history", "num_drafts"], +) +def write(state: State) -> Tuple[dict, State]: + """Writes a draft of a poem.""" + poem_subject = state["prompt"] + poem_type = state["poem_type"] + current_draft = state.get("current_draft") + feedback = state.get("feedback") + + parts = [ + f'You are an AI poet. Create a {poem_type} poem on the following subject: "{poem_subject}". ' + "It is absolutely imperative that you respond with only the poem and no other text." + ] + + if current_draft: + parts.append(f'Here is the current draft of the poem: "{current_draft}".') + + if feedback: + parts.append(f'Please incorporate the following feedback: "{feedback}".') + + parts.append( + f"Ensure the poem is creative, adheres to the style of a {poem_type}, and improves upon the previous draft." + ) + + prompt = "\n".join(parts) + + draft = _query_llm(prompt) + + return {"draft": draft}, state.update( + current_draft=draft, + draft_history=state.get("draft_history", []) + [draft], + ).increment(num_drafts=1) + + +@action(reads=["current_draft", "poem_type", "prompt"], writes=["feedback"]) +def edit(state: State) -> Tuple[dict, State]: + """Edits a draft of a poem, providing feedback""" + poem_subject = state["prompt"] + poem_type = state["poem_type"] + current_draft = state["current_draft"] + + prompt = f""" + You are an AI poetry critic. Review the following {poem_type} poem based on the subject: "{poem_subject}". + Here is the current draft of the poem: "{current_draft}". + Provide detailed feedback to improve the poem. If the poem is already excellent and needs no changes, simply respond with an empty string. + """ + + feedback = _query_llm(prompt) + + return {"feedback": feedback}, state.update(feedback=feedback) + + +@action(reads=["current_draft"], writes=["final_draft"]) +def final_draft(state: State) -> Tuple[dict, State]: + return {"final_draft": state["current_draft"]}, state.update(final_draft=state["current_draft"]) + + +# +# +# def _create_sub_application( +# max_num_drafts: int, +# spawning_application_context: ApplicationContext, +# poem_type: str, +# prompt: str, +# ) -> Application: +# """Utility to create sub-application -- note""" +# out = ( +# ApplicationBuilder() +# .with_actions( +# edit, +# write, +# final_draft, +# ) +# .with_transitions( +# ("write", "edit", Condition.expr(f"num_drafts < {max_num_drafts}")), +# ("write", "final_draft"), +# ("edit", "final_draft", Condition.expr("len(feedback) == 0")), +# ("edit", "write"), +# ) +# .with_tracker(spawning_application_context.tracker.copy()) # remember to do `copy()` here! +# .with_spawning_parent( +# spawning_application_context.app_id, +# spawning_application_context.sequence_id, +# spawning_application_context.partition_key, +# ) +# .with_entrypoint("write") +# .with_state( +# current_draft=None, +# poem_type=poem_type, +# prompt=prompt, +# feedback=None, +# ) +# .build() +# ) +# return out + +sub_application_graph = ( + GraphBuilder() + .with_actions( + edit, + write, + final_draft, + ) + .with_transitions( + ("write", "edit", Condition.expr("num_drafts < max_num_drafts")), + ("write", "final_draft"), + ("edit", "final_draft", Condition.expr("len(feedback) == 0")), + ("edit", "write"), + ) + .build() +) + + +# full agent +@action( + reads=[], + writes=[ + "max_drafts", + "poem_types", + "poem_subject", + ], +) +def user_input( + state: State, max_drafts: int, poem_types: List[str], poem_subject: str +) -> Tuple[dict, State]: + """Collects user input for the poem generation process.""" + return { + "max_drafts": max_drafts, + "poem_types": poem_types, + "poem_subject": poem_subject, + }, state.update(max_drafts=max_drafts, poem_types=poem_types, poem_subject=poem_subject) + + +class GenerateAllPoems(MapStates): + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + for poem_type in state["poem_types"]: + yield state.update(poem_type=poem_type, prompt=state["poem_subject"], max_num_drafts=2) + + def action( + self, state: State, inputs: Dict[str, Any] + ) -> Union[Action, Callable, RunnableGraph]: + return RunnableGraph(sub_application_graph, entrypoint="write", halt_after=["final_draft"]) + + def reduce(self, state: State, results: Generator[State, None, None]) -> State: + new_state = state + for output_state in results: + new_state = new_state.append(proposals=output_state["final_draft"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["proposals"] + + @property + def reads(self) -> list[str]: + return ["max_drafts", "poem_types", "poem_subject"] + + +@action(reads=["proposals", "prompts"], writes=["final_results"]) +def final_results(state: State) -> Tuple[dict, State]: + # joins them into a string + proposals = state["proposals"] + final_results = "\n\n".join( + [f"{poem_type}:\n{proposal}" for poem_type, proposal in zip(state["poem_types"], proposals)] + ) + return {"final_results": final_results}, state.update(final_results=final_results) + + +def application() -> Application: + return ( + ApplicationBuilder() + .with_actions(user_input, final_results, generate_all_poems=GenerateAllPoems()) + .with_transitions( + ("user_input", "generate_all_poems"), + ("generate_all_poems", "final_results"), + ) + .with_tracker(project="demo:parallelism_poem_generation") + .with_entrypoint("user_input") + .with_parallel_executor(ProcessPoolExecutor(max_workers=-1)) + .build() + ) + + +if __name__ == "__main__": + app = application() + app.visualize(output_file_path="statemachine", format="png") + app.run( + halt_after=["final_results"], + inputs={ + "max_drafts": 2, + "poem_types": [ + "sonnet", + "limerick", + "haiku", + "acrostic", + ], + "poem_subject": "state machines", + }, + ) diff --git a/examples/recursive/statemachine.png b/examples/recursive/statemachine.png index 0856bd5c7a55711b88cd99326d79e0559056b654..9f7e4dad74b499d843b10ef63d70b19a13fcc5dd 100644 GIT binary patch literal 21885 zcmcG$cT`kevp(2>1VN%8L4q_QISC>X6&i_xAPAC!5(FgYsN^gelpG`}Q3NDO76cJQ zK#(kg*4*Cu{5KYEaAADURk+3yWB&|z#M=ghgTs+YY~Nk*Jy|=CHz|r zt1JW!FHY3S9`&y}SJHfyH0jPL24fSG*tM{uy)`*SMa9NwNAD88xx>W7#P#*{&wCp! zB6d?Kii;PwynjthFhAw&447rA>*M2s7*P*VMn3_)d%HP>p{DJG|$@bsx_%Ene zViGsEwpyG0@yTgvIh-l8O%f6k0%--!jMk@`?Hn99H&1ZxY>R^=RLww|6I85!Aq?yQja z{jY9V+0C?_FEk}1R}(mY;X+#si<|?`{IENQQ>Wtbc@jo(cIOM(dKDEFWHdA(BO}^X z+mj8Ru1;$c`aIqjEm;iG#uZ>nF3bA$^yH}A`sek3Pd(rHpAlaI$CKu=a^F_5dE zE_bCnPWf8vVBrI`oV&zT+h=38O<1Up&P#6+Peb2F>U<3yEjJAo{m9BmzPtRhbuR=j zG%QT|{(UkE3W~6|eG3Z~+}dTRN#8T?n4Q84-rnBl@bFr?x)gX$Y=2hZ!n$y*AO6}? zN}8H7TApk=%UOZD_@{}vUXsbq&QC;lPfi*#j2Hce=O^E^Wn`4{;dZ8sd@#`YMzi=K zjksIcpupx0+X?YTIib&?7uopjXE5R(+XeE6o}0hV|N8Zd-?ZaGKu{3&ly+0Qcau|! zq_^l$so`0_%hsRXR@X`I{KN~3KJR|0WA3GiY9bHX-B*%%@9r<_)2#v?+ZI*Z?^cI( zc;+#Y^taR*#-G-wrqVAgIt&pUmfi8kQ($;LxUe$(qNJpRf~{XX&wX-!Ugp%x+nZ=S zm<$`0nUnKip;03%QOoJ`OXkfpCIORk9Z?M7_@s=PhD|;N58u(zGcX7^E!?m7*ioD) zQO%IlE;GVXNfQl}^gjGFI!fofG$dJV3YN3G-IU4&SI8&p5j7(j$FxFm|O(SbC}dL+?z(yOQcAn@P)9=s1XP4e3E&`|68NC=i?_&3Wa#Pt!+I# zM9q3%;mXO$(a_Ol!(Z2Tc4QqKD!Rw=?sY^nmF}#?WRGY?GfIRBn0917e5X^Oo<~?; zUti`hr})_190%odFh?z9J3;UGr$m22j>lVLi1Wu2)R8j7}6I8c0(hCaMr|0MQPE8UtN)7Z+&z?Pd)-o|+F!J+fD9k#v z@Iii9B9AU1DqAj^ant4D!-p;X{c6c6DWPDu(7)gFiEiG!iFNzJWntxsuRni2f(^np z>`1~Os(~UaHEg;Reu;Kkw`7+n;#{C#$%aE0I^fi2q>%rzXXR z4^usb|m6q~-boz__=(@&x;lc&Xk0MQ37ncvQrQMg^BYR;& z=TPKiWSQTDd3jYPR1)mfTs(s>(4x1ux7((g{bgaCh)?nh3txi4t69E&ANl(A>!Bt& z!n#sMNiXdMSUe>qB>|UZZ2`|+>*>{1UQ`IjgEs`s0rIM<)ZqJ6A3uJa`P~&KO~4!* z7bjcw^hZESN=l96e3as>&+pIVYHyTjjV&!Xf=HQoy}Y~vYHMrDs;j9{7$>LqWYQrp z!m!aQt8i7x0}P4`HmRaH2_Yec&FK4>CKo~TuRX4BtL)F#eiSDy{3zCXKFoLi$VcJS zi%j~zJVNZBX|AUzZ?BAL^`zc#Ko}$kfFuIbZb6-Boq`B zbQj~|;=;Pk{ip)d$PI75e>SeOu&@BJ9!#;xXK?$rAL`2245`v@yh1`6)?*)5{0|bk zz#=Sq`uc%+d3iOyXN~pxckc#Se18)OZb3F&qJL4Epufr27wh&kuG`6BRsHzZd>>WE z(}{ZbO&7oo)Tn^+^748q<^UQYirES1q1{*%?;{Y-oXf?>!pT9Kb{rmgbaM}Zo29#$zEqzqz)Hm zWo2y&KYH}2rL|QCMK(P>y=miFE%l`1D%f(c)X)&wXqRt3`#jd(*7nc0!7j|0)6%M< zBCS&1OT2o?R_^7LY8@RNXda$>S-lY5&Y|r0R`r?NUN;NhyqVM#GeJm1bP?6|0+%R& zEYLTOT_%Ma5;OMgbl>~Z($eQpgobvpN+x8ifWA$Oq0=1i^De*LEzf4tMBMv+~K|KOfqq9Wv`b<_d`$B>exj^2zpUg!zFhJp2x>i0&bMc z(pOiHt#7Kfwy`mWgm`wcQuA|kl=E$Iem*|x^XJd^y=OQ1jXt5QN8g_Zq}v8T16jb- zTaI|(Y(dJ`;;N5EYS`-lPy1fPyrr9QcC0DBG-G{NrOi3yg=bbIl{`) zp~46FeHl`nGwtEzjEqq=H6oDoa1<02YKJm}ZRmW?PF#;SI+@BUD_i@d&fYo<6>F7Z zqa;tZzw1ReFf_S7($&o|ZVe**qS19MusC`K+kAR$jnv%STtrm#Qb9E7UvTx9y~0Qc zIVWECJAG1yQ=dR&!b1IO^cEX7?T%8Yb(lk;czJni4{5|)-ge(2D$?eqf@u16PhjJ# zLYlC!a4q9a0Rb#OJ===C2DgoDzrUNi<=lPg5;&-lv9bB5dxtw#C|j4`^L-gODA<$C zab1)?Z{NQC2L2x?HDjAG>%G`mlw&jYp+KWB)~lrwpqeznb3me8@4;FH6%B%BJ)e4C z-`p5?oy>x?hhPm+=fwa(jF+B%O*id~GM?>-#5FJ^9NlN+=xO|D(T~Jdyv(>2ucoeU{-;5`>t=P)#4}F=CT#b}*I8M{S%SV1A9sCiau5@oda?I_2}GKF zDL;o*MWnh{QUiK=;`TE^#@5`#){b|y1#zC>)>#`JD%e~?oT3V zRaH{&qg}{SWUCVm;orX9hlR;jh`UC(A}P$<{ml06T|D&%4~%VXqi;F>F@o*mw!J|9 zr!OOV?n}~aCzEg6@xd0+dmf0Toj_)ke$NfQ+GgY^$XCrkaR1QI5QLGSW5amVrdKbl zK(z1K2{i&gGki}&0ecpSTkVeREy3Tu0eJ}QoQN`1GdrbmUOi(?xVCb{y0dlX=ZSiP zy^km5zCN2uDhhS5+obeU;NWNtFuVEJ)H{=n-c+N30RdOo*z&cn=?@hsaFrM~r7uz%i1(oo9tnMNac^gb?9UYmuYG=? zj%ChE4-aMmN3?)xQJM0#&1$h01D?5=tG{af0Wfbb^vh(?)Ozmm^6J$9T)2Ln0zjoq z+_ih_Fqs_imi@UT!{36>xxV!F5?@cb5KPMSP&*xHp`p(~p1dHJ|54MTkmV#_VFqdU zYEn^zqmz?Rc1{jeBEv0*bE2Z6fPQ64c45l^@-w!!PF<1!1QQ9XG7CJ5^O<+Eg$8*- zT3RHaE#tk_@fwFY!arZr!k>Ie%yM0uK=3On(|)!C;>afeF813CY7ipu#y>jyLnM?o z1l)tO{%1ns?DVMAtG24DNR7hVGh$>bRyI5zTo@N&MC${HbEDB;dDlr3MWzlasX{r)+JVYCE23rTwpW{9?Kw!}zWMDHhGp}!MzJRpj zHL!MUu!P<^P5A?9-aY+!$>O7B+XmmiGr9oIG-}Wt-Q9`+?L&Z+oQKh3-A0iCp{0NT z7gxHs_=5|3%!DYZGFifN{&YgIQbO zyHWIocDR9L(%iZqlxb*byT%rlmcn6N-F$|hK7cLbGD`HV9F!>YF563 z$Vf8jp4T^3QDhKjxYm&*Vr*>OGBoticsT+V1Hypj#2Z(Hb4pf6ix$i-iraWbQ@R+A z1D8h7lz0w6G=%Ms)H>$#3kZ;dl3@aL9NRBh!t;Z)uWhNg$5~izv;rU_mb(%6M$ieFl+?@r*F*D*}#k^b%NY+zveF6u<;#tlsrndjarj~&yZ2F314?CC*fk5A#U zi0-3Dq2w%gLx9N;0y%g0R8>`tdG+eB+2(7C;_Ic>a}@j_E7FL=gQc z$LHjbDfQ0@Y&LeSlGGOF{p_!szq>B#)!1PL1qJQ@j#K@ul~~Vr>lQ8IOp1zxAhq1j zQ>4lhER-=Yt@PY`2H^px>GY7xV|yX%o52rzH8al#4;~;cv%l{a&u7R80u;hL39Iyd ze0)UQ)~}#hScrb8`T&0+q^70@7b(a)J4PZ7ySVQuJv=8kGLkqoEp3iAL&`TQAmH3! zftn0YAJ|M^@1xUsNcx+OTJf4AJ!{d3_T<-XT_ zmnIFDRIDqoX4JfTewCG#E&3=_CggQ_1%*4x$`LINkfeY+goK5?C@d^ICYU@L+;LT5 z31=^~RKN6u!*iyZt$tdR!t^R0(?yt zFvY_oWlWvVn+x3eZ)N7;Z*Sia*m1AoWgopdl`PaOz64a>?)3P;#CNr2prClGU`nU@ z**lH8dfr~#2|Z({vr}&nUzolm@_gAEc!0k5VC1ARw|{t8&d4Zp^w8H+^RYd(nuf@$ zL&;jZX>7oCK@csxw$lS&DVj=h*jQWNZ}f66G(~-T%iX=Xz0Ey%*xcL<^KAzD?Kn=b z$zwyC1XMlCVXj-fligwT`e}P%*^z`L{=Lz?8yw*kLOb3R)YQQcX)*yIB;Rs;GiXnU zF>UJF7D#-@p0~FX;AHQ?cpJT5D0!F z?8(92{?6^&x61?219z>4-o{KGg@%MEHh%haoBo!=)nNypG`ZYtR;9!#h~Mg+SGrk2 z9C`lyc}QfWrqbDXook;nj(PTgvgsRD@!hdHhVFg9U@pZwh_533{V4A8&&|Q&mjy4dG{=W{uTw~ zcK4FG+LEu~VPP%rEx*5nSFwHXs~ZAd`|DmAzk%{-15WuT*c#F=F+m8Q6qYpx-OkS5 zzH4s-zE5j!Z+{>u3EGoJ@xw?&SAARhtj=Yn+i{?|+GBS)^ll`by?2RW_E3hX^V9Sd zM=CYYeeIl`dmR~57JlYyM0x@@!5dJa<2}x8stKkm#fs#tq%PeqFlB=fvpQv-GvY zKq?TGoA)dY&UCcq4}bpr>9#S08yg!t1!i3C>)TNpK3w}wE=`aybmh>mJ)GKfW2W8h zaGL`FW6;KIXAUGi^-eUgnWbeY?1h%$VXZ~>Tv=ZIj{%S<{wYmu3)AUY;RF@fK*<+?<+4=Rgc1s{0Vybk_^zBB124w&~4DtAjz)(&Ch57kksGbuifZ%Yi4E! zWNQ>CKZPt5UF9L!CQmFZ2q5gIN_x}5+S<9ebN~du4@2llm$(dJJwX&rt}xXRd3k+r zFE}lY0c4Z=#SeK>K?Q~D*EcqvgDfye9_nCBE*tB9q&W!|oPsO~nIK0ofpdL#w*%HS zFG>#cG%+sD-yfee8}M6!ex0zijEsQuq6Si=fZTE$43ZZmzkP>0TiyZWB}jF`phN|O zG@L5s%b4MF$N@Tg5WsFw3l(67Vc+7|R6$TD0JRXP7~|i+e-ShTh?WFw18KluF$ALs zqoQm5{Fz{4Vxq?BFGUoS6p5~`uA94i7DNju5443-^W{d#V*<%&Y0pC{2OeY!VAf)| zgaOpuh_W(%HxCbh{;F^@PUL2Vk1EXoB;uFo*9A>9c(Po%f&xH+OcTVLT$rY!$I0Oh zm|OpQ%Lg^Jwafrvq>k5d7{%TEK(#L1S;fXIcA&$AUIFvR#>OUNVv5epymAb{wm_>i z{kXUhJQEg2jS;FTzyPBmp__j4!z$3Ly$-H`3=F=v{t)fzy$;mj!n}#Gi|vrkA_4ayu16zi7IBQR2Z0hEMa6MREmzaY z*%_3O=Rk3$p>(jnSX|2Qx~iw~PKOGhSB=L`5iGJ|T3Z|D94>AaAl3bua6YKO;8>4; zq;g&T)x7+((%8m^2X;gi#5L4zgI1C^|-`JOusD+{`TK{KaciAd&0?4@aRuRC*4v z|HX?JFJHalw3p6~Wnp2#0a@qYpnjTXOhaI72JyuXlG|IIDnfWPII>zftRQO;;8{`E zRb*`u$O;-l8$@0sE30ti8IaJ`<*p9skTQyYf+?xhIy?byX$BVp5El{^)e&(1;{6ZS zj}W_w!Dd9?u=s|8$w~vsd~@%0kba#Dgc&T@1swo!!0&T!sHBQK2Up3_sd}ni^^_`z zgduFK+SU~47*yY;?)epJAO#&AKr9e5f}uVHiXAbZLH$c#Vsd)=i1+V>;9-`jxw%)* z;p68RG2gL?wkGX^LN zWhuRj&mScUyhfReXil<)`u2L+^s0}g?U0%PNY4C$Yd!2}fXs)~H>fzvs*@654c_A}=Y z)B#z!6-pdX!T@=LdyUf&16~-IoJ}SD>{swwAyyE1KucuA;K^Tp$gCeQqaMVyUe)f&KZrq zw`8(0fA%a2NV-Es2Rb%2HR7Fa^-O+-kFW_ET0-{TE@$TcJo1|nz#upybN zcYf!LynOR626GN7Kbtm=P>lo;h1#?zmW7QCANc}eK=?l5nm`#o0W>py@`M%Eks^>S z@gm0^2%Hpe{wu!0=Ap7g*#|#i9eqO;RFvasVp761LHa2o+KV zald;>+~=4Epp%`Q9a4ftp}@D3GT6UjhZQWq=2cgNax&x`KTz3>AiImWEYm=I$sdS} zmXMGrx9HC{kuP?KG?=z#3QQg}Le3wRMr>EFW`fLA<9+l&eFgUR^wJU$B&~Dj&Se5I zMJO@ELU8i)aE}{o*Ek!%L{u*e3ppS?kpY!&Iy=35_wL;VAYf02OClk_=H}-9t~CH1 z0Lrl;P+u{D0I5i&&l(D~F2KWS^TD2=KtV0(s(c*&0V4&)3uc+1-=cio{r%)OZ71T{6W#;XIR|0- zDiqxX1up{mMpPxx=H_fb+X}s5H3W~lfv}4tUVX>{IB@OxxQht}+}R<62oJd|Z2c*g zRmpB`Lc(k3hZf?9Py{eGHxJ(4c5>GR`*|JAB@qx4^V~>!4dQPO)cQczc*zIQ2hk?X zdXkxquUjJc0K^qmP`Kve>K7LmO@QVtOg2f8FiA!r<=fFJB1F&u#}fFjGMZ9qYJn33 zw%og*iHuZPQNvCoV`L<`eEG7t`{vc*&{kM(@QoG#aEMr~Hahd8(p(j(9~hhMgD2*| z_NI8l=>wbl4#b_9w6u3+h6{^}EgcN1btOlXz26TuP;M>4i`uf5_MT7fEpmm zwsBy4cQ;W)ZG#YC1=s<~HgUFCtZ`jNPoH*Uhsn!q$Wy8b*t@!RrU;r#isHXiz6Np; zEg0|ua6A)5wn$ifBhW`-WX%wWj7&{goUc}W`TDg3bT_0jkXOB>YpIb^8`5!fM(1TKn8nEwvas&maf0bO6#qD-NV0y zJP;_#V9qz#Sh8P0zSg(^5hBqlrLj>GoXgn4BH7Br|Lt4utE{Y909$}W8`-yWyMW+) z2a%URIWNp9KuSdYMVefQuzhgpJIE9uU-@NaF+&E*%+C*o@4wY9pYgBmVn$o8FG13H znVud+&1W!mur&`%Ah4yc(!B3?w1oTSIe?l_0>XwSl$e+pEEyRY>xp{W29KRs55Ykb zkYJM`^X9_d3VN z^^L!>qhlF?h7m)U3__j^xU@jGdR1;LKR?#71B&duba?wo8w{F$a#9q= ztbh$*WNeI;FY1ic!NJAUjIn=2hlYp4#(h(N#Y6z3aUJnCENDxJ@bg2#CdN`w2rV!u zCMgp8A##~ zt;;h+hJ?|=40)(7Uk>NhuR}xmR3bR~Zr^l{k%flnv&%Ug*(`l%w^QY|@tUAdekiY# zondu3bhb$wI!vC3LCS|7nsEH`B6*JeA7*9iQx&o#=%Cr~>xN2OI#{u2gq_%Z4*#-2 zNvY5j`dh$Pa-W>;&ysF0{Jco4`=NdKjJhpHm2PG^hM)VD_$8j#)t>o)U_%Ug zFAsV0*ZAu!g`-k_pH=jN#lEIZUT@?VwdB=tq6r*)lez8$^-gB=4J1^sBDM@7zT7-e zLfB|n$vS*4Etgh>Nz@N__O4M^EGtB>7lZa8@{thkEkjR1ApI=WS^jw5Ctgr6NtDF< zw#B-G^@$}%cB$U;oSYow(tF6I&w*lP0dU{n&9Acf^(Jl5eu2fIqEevT9DnfLBOKW8 zF`^{oh364RVGHns&F$13-@0e@L7pKv-bvv2oY{rH;RqLSbAxw62K~(QC@gD0t(XRu zxqdoo2(0wQ;k{SQKN@foxgT*lQ=-)`UQ)x*Jws^TgC;EcH2vHtY0jsU<|q>SeInI# z@fcA;@s&8Ay=Z@dA0O{$&M*K*CR- zYc9~IEizxfuD-~Q$;AT`3yna<1q+qH;FePVp`v&-WNFw|p1~hRDN7lHr=+ZW4i!IF ze_yBgZ3?Zy)%usv`Sf28PK1qyN=?%X|NG_f=boPU|Nh{=B@4Q_?JOLT(|Jim+3==> zu@110`I58*wG-t^W^hcwnCJ9IbE{Eiv{);-#_l|7s&nnlZ8M{g ziN@J7VLRUJz7rw|Z;3&xk#j6fuZdV@ucQ1vDdgO5h!WJJyU{%HvZ}`I@M_Py?wotW z1`D^WuR1w7CoslQO{#VV#&u$$?g%zMue6fc_Yuvm3Sr4}O< z?~WHpQ5r>0qo)o8?WI;PEiW^a8E;^VH~LbCSFg#e91^2YC!O~?-jd^#o%(L|ztQE3 zKeAQ-=2>kzJ^GgQIO#N^o8$&z+Q!C7-NFqMAv|vhVkpj#-MW=LQq01eE}8Xh)%cj< z!d^z;Gk-Z~wp2-eK`V2JULLmD@3y+~bnpaIIKL#|bk(84(QZfjs<>RlbN>h9(zZC{ z3F7sl(FO5ey1MXCdZL%3Mi#zo&ZQlZc6{9$Hhz#t7kklgW@N1?>y47PI~HzzihM#7#L7~Q6;>o`s3@Iw`ltqCE}|E1#ToT zk$aAnRIiB;Fx#E}mXeJP3|Zh|3V!Nv1?_0IDN*S)tOWi_@Xn z;kOYNv%bvn;jI2blkKBu@7F_Fn)9ou3Hrb8XGI@Um#uRfJaAjj!l*bTBBH-kEq9%G zqEK~q2q970{|(R7 z@6r23y@<$pINo5sAqjp9d_|2&*?tWxq! zcrmiTC9mp@d!*xhf1=~Vr`oXC;d$PIId!JV(uoKJ2i16yKcwQ!dK#(Y{z1!388y`& z*U2v|pq5;#PX$xxTp=MN zrr4-NDhI=j{IEeS4uFYV8cdkn6a*pgi`CtnpA&btlbU4V9G!~mxR>zM+*tokuR_>l zW$>O_U6>jDJLr-DSwr6cM~VJryr7^O_gjj^;RR-fF6u{)+B#Z(g@Jja8FEz81i!*+ zvHPM76&V8mlZ6HO`EPHT^^$M&8NdHKq_ikPx03H2oE!D>%e&++Zl}KmkP6#XYRw7UrgRUJ0F|D%zHquoEKr@sTLS%2=%ptU?Fo!JW6>=|Z z(DtPhx)aKM`n?KsUQ#^Mi)v}sA1mMYJ@XG$XwH9g1>FluqdXDx`XPOjBJNY9%VYHh ziz3@td>K+QQy4>NL8J3N-2Mln$PqN!4}G0b1tl$}{@U@A#aO*FOZ;)YPvn7VaI?H2 zsc!YNx!U^}R(jhb>D(tl7ieEFWMw5+xCI_Jx?FA0>gw8WQeYwXyhS)Yoqs=Hg`tpz zJmwYR42kDSNbXKbdh@&X++QYs#N|wny}oZCethWMdNQ&^Ky>sPmr{+R86uX>Q=mN{ zTFcD}ukiQSUgOj(O>hHDOnye+{3r8F#%q~aIh7v^+LC4x+%S0;kO?P2(-^4K z=Rwqzfvyp{nw!B^TMzA=T<-ig6~O$X0>6{^$eBEGw9sFtda`ibHm4*T4~tj;HAd)E zz29pL!e~HOdWd2O$bQhfY%;t$F5<9H5bO3j84Z*LpUvFdX?5^C=-J|Xp#|}Pnl+@` z+uNt1S|YEak|^4Z51bm&vF?J#kQWupm!gbNBSdC{RwiV9ED5{|amZpk5LFQRJQr!0 zcAyar5qcRJ8P(+=Ql8I;suyyG03aH?{-+-vTnb)PD46blV??B^1;d3GAwVAZ^nVW^ z{D1l7?L3m(n7mGq*nvToltI(u<&BLEXl>BYUR_uqaCF?7iIBtOQoyl@m!jv$6G(vB zOUuiDmK*!KxQIxTpwYL#3=1hRf>29Nb(xQ^H2OpTj~}w&`P#Rkvmfk;Y42!l#Q~nk zWDVTGAH)Hw2QbYS|312FXvkz>VDPf*F_ZN$bcmTK!pxC{D$sU!cV`Cb-PZ|ytm`2A zwRCqEB|m`jUlbUGsu)CJsQnA~bq0{p@EhG0U<1R_fNeD1IypJ<+f98EZyG?t1CYV1 z^0Js*QRK7b%|k_1cq9n8GCwPxuqwg_bpJj;id7)ub8&+%aQE!=gcSzz?>kpR_nHQ{ zbU$#zr)qZL5BW`^1(>PlSK)Z82{wDN?)*f^qLYzi%)1`^W=`9(#M0Q*dM z)tee3ABTLa%gxhMh5*aT$_l8e^r8snU+n`E3++{;NS$D1g;-zz^cEVu($b`Zxtanx zZ`Qrn!VANt8G~veAsTO>UsXN$%78GmUbI5_X9EiSSJ~K<803n_p(_qlt9a3%hoCS) z)6i{@menfF?m=sX5<_T##*|p3yaI?Vtx)b+UnQ@Qe7$H{ zYJi36>FMb|{jy6VZb3F$ZQEC>5IPtlmmMOB^c{ez&=E$(W%Tsfb2A3f)~ZKbg9Y>0 z$UdGW$3j(H7qp-6xS%Pj#RW~Be~_Py7HR{&P}~VAH*o(X>t%P1(MX=MQsc#AwUdqs81O~?;imK zs-!!u(>FXQl)Dq+L@&j3qobK**X!3ZWsRU69wa{ibn<4?7g3tv+t&%Pp zYI|x+E4jC?z6fQh*x2|Gzw?bX<%=k}#P@xDpIBzn%()fIxLr35(78lsyIz10r~f~B@}(P!ljYxw7`<`)~1 zH7=`$(E3Ps zJ|4|^PUVVpz|HwP_foD?8M?by`ejq}0!4kpdqS&aTJj%qKkyVb3aST$dCOA6oU
m{D0SD(tp;56Jp~j@ehdr@*zGRU3fb8*Bj<~>YCp1YYL>i!Z4Mb+ zo(syP>O7*NbWpywvnxWC1siiLo~!qNg&G~^nGxgT ztHr==KT447G zb#St8z{+YoJAl$V-4lwXpqTo7NISZ>h+my(^V|Dmf1L93g$w;-uJl8AUuCyzeY5gQ zgFT2}8~t{5G=8jicj95n>BHV5lE0Rg3sS3>1Mv7dH>CEalh)Z$pJ025Yz(gf$z*z5 zuM)gv9yv9z?)ZLh8DC#Q;#kCylQ&hI1V!*_gBT=^mc6lBMX72|$;3Q%Tot;&Md#G% zfr9h9L2ujJzq1)X(`2;yxGUXf%#xLJbarOresdQOM%28cL>-fUNciLF&|SWaay#3- z1nG?K#t$GouhU78XEjSr{|e11d-ssRaC zVqQb3*&Qon@G%WdGe2{J%RZFLob-(g^sI@|ypHD?5J02nz_gqJAUZSyt2MH)j_7+; z4aa#HebTphy0t2r8+309wD?}?PSKr$8>}x);)L?0=iI9{;OEdHnOC82-OIrU|L1Y48&tn+ES)uZAOb!o2(pSQCccFDcQW_%c3jCK3(=`+Ld<}c=|0%OCM z?T1aK7M8J5A)=0ilO#JY6n;8b{45kx=Ha%R9<)Ce@}vU}+4+(~(8zqvPi(k!8i;jS zm8)N`FVjM|WOYS+mwG&JK*lGaHT%xJgf&7gokDk!UhH(dFD`Cj@+%!q}e5iutGClDG|Pji(%2gui4bWnLkkpENmyQby} zX<5{55Mu?b+dd~LnuFe=T5z7O%O9q&-QFPp&fmDP~)5mUEa45XQ^H(wNKn>Xy`cKa~(~_ zL~IXgx$Q1IS3F61q=U#mfi4LQPp=$2!FDM%3@n~!+1Njo5U+8(T*%QyDBF6ZF@EU@ zrI3;_gokvi7e9(_OxBx`&aP!r_iLgnt?qtJ(NwX_qv*UbE_jn{ucC- zY24;e2t}d3pQ+IM4v(?mqh4-b$3lT8`K32Dwy556BznL2OFwJjI{)?Vy<34~oO^77 z{GWgHm0J0BWF|fv`7nbI>3{nBn_hm)X?2l0;o~Xc;=^Cv>=ubFW$(Z0r_$0k)$T-t zb`WdOh(c+U6~`9qn{`0be-vk-DC3#|%{s4sTJ#yms-T1T!!vIRZ#mU~tQdh^+p&N( zIg*xzD4Un-%7^atA&M-B(3Kf>=Kbkz#u0U-j&IibNS$G=Mv)%Xr`I`5anvUT-*|y* z0ak1NrhVYKNA;lzB!qI~4e8`Z+QvJRom3^uqm2$JIvI8Bm6RQ=`!#xB(tkDKPxZ#M zhq>5(*n*@c^mH!k8}A>ltEf!%gd$l<#)Gczp0=g;LK^RjBd})KOh4^+$-C`Rj`Bu^MGq^50hZgJHd&%3Je+PaQn#F)AxfIDnPs3OTNFTJ zrL2{xskw#kpJ|74zH*(^2d=Dj`5%D@X)#*jFWtYnJdN|~qlfIfKK=MdgtMnBx^8y! zA6hEP^vAI3Z>e_m*D`KDxvnR&qgvw3mqr|1lFq32x{7mi<8vQmtp5BM!Kenu*O$XL zhZb&mUG+Qj)rb!XqXwLaT~VZcUcWw=S@@75@laK|O-ioJ^}A>foxts^G}xba43AiD zu+oO-N65fN5|4;(UEL`zJDI;CWHE}Z!s+F#IdaOWENxr`sEh3lW92A6!VfMeza|&!=13*#a4%gA$O3StQoy!{KOp!daR2pixlKFi zNS!6aE7ygHkIy2FA7!CMiD^K|`J%mX;`FQ5o_OzBr^%P-aQn?qOcvv{_@KY>!3?7- zi!XyDZn~tZWdGM%#DJA;_D=%)Ilg3E6v)FiSsDHz8l9zEA%w>3zb*L%ZnURIS|JfF zUaSD0SKrH5%O-Hk z7W%}Ql@;ATPjqenvpL$;a`0S^InT>#+T$Ie-;d2rZ9nV4SI~wNS!=iVzb3PsH53?* z)kD^Ahth;e=ov0fYeWVj6oq4k&IIT}XbHpt?W1{3YmMq-DM(!L zH*Vmd-liTgoJx3*v0uF;gF;c!A-@Y?MH|_^)M+HxHgF)34ydK-QukZ;2g?Hv<SfcXUq^G$n8ZMVnkG(dJob|uPP|I zVF7W3I+i6X^lTOVC^<1B$=JUpKlbhhC0*qz%NAc_yU!BoZTVdeh zye1qON>2W4OQV$g7wQZqencYyHYr~G*Bz3!lbd{W9u%!p%sNo zX_+7xOmbUGiyGK)oG4+;!x=*_&K>CPju5+Pdu6eQ1H?M2VyS8yX`sf&W{G=L!I*7h zu+5;Ov$O2j2gZ^0ri0B}{vnDVXMI;&k^1U&_X>pJTmrQQ^(7H^dgzA*{jUZYC>uUa zib;4Z_Pw+--6Aff;Ub@^SABg_sHz6q2QU8a?|OV3gx<|TkOU#N2qByhWECg?4|u@k6Kc@t55Eb4U!KHk0!|09T?1IK z?93j|l7Q{9-G$4dl;88YTwl|p5BU*BYdKQJtdc<@H&lF8dG$Lj@TA-*jE2$Q!qWBc z%a#P`Qbb$xy{*vT_4}sVwd|JZh&jvw(t>=rvlQKQCg}%Uz-0aA_sx28zL;wi(o>%& z^a&nSko}~k)vh$V0S642ow=b!N|v%;4oMk)6fiD@6RoRiY+o=)Kbi7Ct> zpW6iA3t&1X!gsO0bKJXx^mRdL*geh|eFLeO2SBsUTZ0BVsZzbrEbYU1C`#SFU7!p6Xd6b#=df)3?X&mf*U(M6_?uv|VF{yBp*FzJz(>0V-$aR4uItJCJH5B22FY5IMqI2yRf1j`S_M*A&#t(fbEZ46OEV!Y6Br3*- zqyZJ7G%l37}#B`7rjZ?paKQHg{WR zrmeB?3p9KYmf=i0qj(8i`OYY9fl2cu|m^75{_+WtsUzQ*Xaeg}TrqDUkmB;*9_ zmaB-1hv%?2?t0dTep0TD{454mAqKPj?`zNC+1l@htfH!){(#>!A;6&s4H;1ddluGkpP`6MOY?-kzoEohww z)y6DdUQ09#r5zmQln-7eb#}E{=3ZO7!nJh)`jeDj35CL;o?7c$lra^^U9jNi6|P>p zw%HLuD`ZWtxE!M9bYRN;N0d|LYl~T>y~P>h5cg?FBv;dwk1|4)=1uqDCwU^HuAvh! ztcq=2dAYf8tXXNOmVSnbNp&qC271ov7?et8_p|(aJa_nx%N)g^U&t7S`6_{;FbGyX zbp)CyX~o?LjNhT%4|{9R4ZZ`jmbW$~^88u*o}OcP=EHqFFSW6|D=ZotqHzV*hpp;y z7yK>@#X`SSRHvZ_ai8IeAX~|7aN3xHmjmTH7M*gRlaE%y2A9Z(^C(zn0zTMIs#LFz z*Uq1wyL&I?sS`a%eq8YPgJV^aVYo%36FH1&G( z()W>SS3cPHcRR^r9Mr+8XlP!{gjP1?*G(Y>V|8?JX>#w=Co?a70tSh%aBy@jh@jO~ zz+qHco>mQrUXS&A`9^&2t5?pfJnO9*b81ymhEGX`J_-=TU`Xh>Omi4(8|b+uv4Hw|T{J8t z-XEgY#j>B)aOg8JdDWGcLAL02oo?Ozu`l7!c}&M3i|M`t>v|qq*Ei?aU}^f6!lpG%NKntm@YKNAuO)kt`0cGG@;ojk`B*dS~Vk>?P0peDWd=eje8zn|`!f!`U0#v$N~tmGs`@aj5ha;7n6&Myv0If;OVg!kn=o@>LMFW@aQ)H#o&^+MtZ4-!_$7&JlJr3CH>J==QiydiCbg zIEWn9leg8k)BLu=vfLu6ar z%7~NsQb4jDc3IJBm@#kC5GoPKr5&Y`Y)d_si}*L+tS~*Bo$lhBvf0I*>h1FFcv^Mc z{&2at!$8=U1OaaL(*stFGZ`l{-`J@5NG_w&2o_x|0TFBtP8r2;@#{MRUO z%uf%x31On5I9rb-{T)Kmpff6HpKkxD_E1ijkzYAxEQj8zi(T`?9;3jC>+0ug^6 ztWK2On$Q?1X#F=523;vj)<3&`Wg`(K8d-ZgFKKzoZ5jIsUyAJ~>7q%MRD&_pApYax zOlsWceR8JF4=Wv@+yV&2)O;k0_Aq)hpt+6KaxOUt`#mAxQf90akE~tael`o+djqUW zb#-o3+yZyeX_Q;EET{VvyH_S+l?m<~H%WB*e7kcT_)B1O+u0$x-h!%S63wxs37o&{ z?1okjOX4CkpMuV26@@kW+Si&LZ0e-o_6LSqWf9iZUpv7>tPs6tq1}YCKb1}oR!PV^ zag6zAA0Ck$Svz%MgEBTX*P097SuXo!f>~Jka!6vPj?0LnNK0q5Wo|O9Ow0 z&_R1uQF$#6V#(kCe#2iEFwB%2h_+6Wo^OJr*t)2;`~83E@}g(E^*?yeu)+EP;pw1# zO%r=Uu6~v&I8+Utp%lzbne!kw7wPXC7g_vLujk_R@53T6K{ zIGA(erZ$l#-o%poNW~V@!}Nj=DJdycu*1$zK-}(4|NNS3Q&}7mZUD<$u8GTI-4)D= zigp*x3uTTlg(I8s2H!SEygf{H>P z5zICF&Pc770Vx?9@xXpWfS}$b8`v(bR-f`_n!GlTI(1A#z=Z_%Q`2kj-#7I}BCo3* zQn&wT+S>MdPpF?lYZl2A-hW07z3P1rh<@#Oe)Ga&%8t@4yaW}BV^oioT-e@hZwabW zCjz;5#C6`Kb_5F+3x!hyHx6En(6{iIc*68i5`>LeQlP_h6JUP0Hf z)`ca_a@Q3r&W{@0Z53P@X5CV`ZMBucF{8&y^1N7HaBEcNjRe%JwEDqGs7v>KTz0O{ z#-wlB85CY=zTwJdM{w8KlTWbu?!vSEE{h~WHs^;4SaosxIBp~FB6i4w)htxu-jfAj z-nuhE(UqdZuk_M9gGcWq>tP0-8AT_x3B9+k!;&qzsc3+tgn;umjjyAL;nJ(Kdl2S? zrV5Iz=qp@=(A%u=_(21M?<9R3Ws~_WT>@vd_9K|`#lIjQE7s%aM9oYU0{rT?`U>>~ zPfh^Eu_FjaWjL8e>`a(oj748>tiCrlxJS;AknzasIxRBJ9F^~3MwTzI@H@ck3?EM&UEr!SZV|NEjidkWulNerB8mU+yp;~8QT5j7?aQF{D znr`}2bw3d5FuJKxjt}<3w2(#@heHNP=^)5)zC)QuWN&TAwvRIzH$KvScH3sU%5=)g zb)ZrezYQ^7lI)7v;5gKRD21lqEzFWJvdhVi8v89EPwKN@x&#k9em6{go&9kiMUM}?##7z z5t$J@2i)y73_D%_*et6?4|LP5H9w+T_{ zcU(Ibv;+Y7@(+vvrtB{AggqC0qED+@igjCZlZh{B_QAvd!Jch^SJ8C0_yZif^7D38 z^jq1_iyRwX*xFRcZ>NQ-|3wS9GmxkYU@MIiX5Fy6b3 zrZIT@zT0^XpP!1&r}*;n>Au;|)iU_IL0yV7Q1?x)0=nyVCMv)Iv&4X|EMr6R87!8% z&r|KU+2g#@U!hqtvJ~+59=O=rdN}Nh6c8UsSl~o~${Hm!RXMEzHZ*p(y z?B?yVWIKCpk44KwTM62}bxBoF}rfh%LZF%)k3CjZOz;F8~&6b`oiApkV-- zAA9&*YO)ckUiD2~04q1Gs9-?#2(r9{F6Bu&V5Yf_t|jU7AO+t>%RT)M#ICcX56E?T z#>eLzGka#bvJE88Kz#iUvI}eA3$tNf@>`SF-YWK?3=mMd%o|Vv0^7A~S4>RI5t(bo zh1>c2Gb)v;2$TyRFAS92pHohW2Q~x{V0{NJ3ly|*|77^fjw=73(>Fo7f)=;G1O#mu ZZl%B6`>J0V1^TH9UAqD|F23~O@!y^(OjG~> literal 21716 zcmcGWby$?^*Y8J=7U?cYr8}h=q(MRf5fBhWLPENg8d^jUq$DInMLGo}L_$JPy1Tms z;jG#3@AsZ~uXD~n=eV!E_a!s)%=6s$itqZYbrW%0TZIsx79W8?5UQyn^$-XQ`1g_` zE;jr`YDU-{{)cU;p@KwQp#ORMB|iayxQ0+eDj0aBtxtIRX&qhQY!CfN`c4_siZW7r zMXc~qFT2&&KJ;1%mZ(W$0j+34F|9&WAB*~r_omrmg*3vXq?WCkCrgl_wYe zio#c0`->OHDAPPr_GmA?(0=HXfPldFy**5vtJeet`&eF_FPcFUHN);>6d#B#uXrM508urh}`6r)l zjEd?Oo5@_d)_cLi$~u*)r2toulv^?!In&nGKIt(hKRi5aZfRki5~CAy_%6=0v~;G| zLnOaGQg(gHx*)Rlh!B3ll`Ln>aXE7|=jYt?*I>B$rfQctgHk^n9GvQ!8jIf_jJb?u zACvRr0QfaO&s1_1GBIQ(@*3_qr8{Dv0j;`SW>ZX6B$Es*@rk zo(2AJu~F*5QX)fJ6(g4!SZ_s7Yn3CBxz|`(F#`euP|?xdJI1k!Ts%Bi8X6kdNuyOK zG~bA*SqJ-*lIX6y%E-9N#KaU57PffIqr&VCOQe}DRe^j?LNORYcKx$O3wEIuaiWQ| zy+$rdXG6Epb-L*iy`<~qU~IydQuw+xB9DKv6|xc+QcSWcBMolNdotmW6k-!_vSbUf zG@eSJR7H(cVe3gKNa1HJEZK^%@%!ktnB89)&}+ZO!Xo-|%9rOUxgM9u z4EN3fmp$yx_(h>6_o}f8BkO{OygcG6y~Orck2=q-Z?OzF)vxL9=&La=%+2LQwo!fu z0~Pe#yf5v0sJVLMrIi045f1G%RK89oW!G+oT;SD5j~;2JiV==gIpH)l1PqPWJgvDX zswo{8KbdIH;`54r#Xl8zA@lpA$@SqOvtLv}%kHm$BjxE`%?q97!T!sCPoug@l zZ?SjXJ@EYa(y%p}R#crLie2=De?(tLhuSRLYN2#vEeAI@0WGZ`aq@1vRY<$)lluDl z$oebHml5(D2^8MOSXnQf+`hkH-j$1^7R0b>k8PW4kLzy?3>+-b#jLRV)!dh*6#wcK zo6}I0lX-Wh9MhxT4CSc?e+vtXXMYaXT6%kljE#+VJu)&fs3n{q#Jd0a^Cuge@%#7h zP2b~LOYDAeA)4SCf&ph<-gW&sDpt#VStpB3f|i%D328%6DBLSdvh=^+-%&a}+}JtY zYQ*&NjgwE{(7IjYYF%AhJKK|fQ!>fr?AXQc@4--kNfqDwJ4M)@o}M4>Rp6(jq#%NR zq>AUj7beU2RyO;o#4)x#%JdI|onJ~76sO|5FP|yv|G?g!fQpLhgHefMgOxR%T!3^K zAzc`}cV?})F)TDR^k<5{pWo+)9i)gRBDu4mufaG=BSVFSg~^gnIqZJD$1*iF9W2zx zF*7q$RaL#Q?UBdz(A z=i=nT-o0b@Ib2VfYlxVY*py|fYDf{Z!X7Pu91JG6Xe(#icg<~e5RZn225Xhdy7G1V zcN6JblX|bMF5iAKWssLS<-Xjl z*;x!SGP32hwH)ugMG3F%YeD02CzjU@-rD?lvnzM==1l@RIywtSN8Wo?Pk8e*Q&>hT z9k~&x3>n|fv6R%*sCyN5cve^f#@8+tewBd> zff9F`M1>MiMN;zK!$o8}O*J%6O_@*j3=SGBFE7uhPK}Hh8{NBy&&|y}Gd+DxT3R|J zHa7M}$HV4dzwX#RejLHyaICn$+^+=hMy>uTit6sxJUN5ipm_75P7E+HX>B5r@BqX{}8^nbQcx+E_lDan~W0{33-G-Y-^ z@grG~wYPy$=pm{PQcW-p5#{;$x#ijMPPSqweztNHCD!Ptmz7>SmL%ll2FoYu>FFAO z&x;+B^z*gxFflPLtgKjJ^EnoK(z)M=JMke<5IMTWWc-eA4-XIH!aD?g_HAe8=2&UP zpYVu?h_t?B*C7A%{ziMckn371m@poqd1%Nm%hAB#RlTXgALWq^-jD65FJHcZ8QNqm zEeU2%wxftUH&Q$IpL&=Z!xB3>I`$Wv%Ym6)=`Xd`2({w&9h)@GDD`(;9~R8Y&%gfR zZ4 zJjBe#25)I;X$Z06LmM0X>({SaI5_Y)J$mv4o(xh~SH884O=Pd^spj{GLM}VA%FH3V z<{8Ds#fWQ)ii+f;j#CY$4kM-PQ)eS7;1ArH9*{>a$r}{hRtO0X@5(2l6;VapP8PFl z#my=z;x_x@!=9P+@+HfAgTm|=e}BHyP&6?y86FvlfQ=Wl>88FUe|CPlb+XtMlarGp zASy}$Q9MDX&i}NsZ)m7jSM?Hcx|BO<)sqP>=SB0tzjqMC`P%6(b$lC(_4V~bD=RCH zXQTMZh5nqLE-e?8a#Wd!`%(l zOTxgwi$FoT``eN3nS$C0ZDDcB)(afnc-HmDuqRpQ5(( zd2lcmLeO!voIcar+k4@jYPwvYSyp;!N+M_O%a<=hV3`Gkgh*fWn5$&{Cu7cK(-g7{=UK?nIxJrQ~D-kYC)v)2#b z?^5fE-bt%i`0tBe_$`KR$(D+k`0Z`-1`}1zvFjuL(x52lHR~6OS#Zw!L zT^Bn01_tz#ZS3su5vrP+uXKvQt7B7JSBHv;5TK|GT-25C-o4w`-yeUGmOcdwla-#A z$AP#8<|6w8L>@SoQMoM%ED#9=Md(e>&1!twK zJnw&(8s{o~Q#`AR`Y*zwcPr>}WONw)UG4KEq!r_)$ITXZ1zk z4Gm4rD~P6V-#Aq2ATuGn>g2$aFudw|AKTk=pFzNi1vkDyeyS|F8JP`*#OLzzS46ng zR_#}DnFSs+AwoTvR$o7NDCUZdkB+`_d~%ZLrK@WoeQR)ynzOSZ#YC<#5L0&BqafDv z;)2imbRL2q0%32zhT&x8!WA#9*BOaIeHY~A;=)JVhTMX1VzTev(7+=iGJGs%{l#ks z11#b5=g&>)H$CSmgc04T;`}P7kfUdsLkXPS-P=HI2zsm=Ie-z!uYE8s<3LQeMvyJ{ z=Y%&u8u@FOmLkW-%S#9`;B!gISMWwb-$Ms!|3Bn=i$7W-DY!rTo$M;|R(;UV|9hci zTzXUkHkSo40)GRH@e%@D@^eK6*7l+K)`$+JHFzabQ4s?cGfLFg!q(0%+q^Mg`{<~> zC7h_l|I}mf{T%{`Ww&jQvI2BHeDAnN=jP|Pw727c1}K1idbVWfDSgX8XT-nk+TS$) zU-PCT{NcTm=bqqP;Y3W>h_suYSHaKqZKd zBoRF=O^k^l%+Ah!k&(gY^b{WDzNa1Mcw-mneD-f>YH96&_Wzy_ zd2+UHs{u2}(%)3eI9vPlBD_@O$vCx?loT#Ley8Bp&Rly|VPS-Jx>Od#LG(^#DTR{@ zVKzO-B6whD7rD38OJ#r9Obh-KLTps)#(Dnt$Q;7ZyzSmUYvdhGel?Edm%hgokE&dF zSQv|lh=IjuiBpDD*(D z9IqA?f3_du8?W~!d%D>DwbZ(6dT}vK%yATf05K6;UM_jEsbl zF&2_4Jb}ED5++pMp<>kdU{Y zTa#XY|G1Y}_jH~5PrSdwZSd(nP1&Q~Z?J?re-7;L*Sg`6UDv*haN8KYv*O6DGx!)n z6eS2jkaU#H%*@onHdmgES3iRwg${E!J=U9=o?+_dYcoK;Mb|s1xVU!si3@xo6u1xi zvz3!?dh$tudn|Xghb9_MLG^jr!@~o;4MA0N^YvT*gsoj_PI^Yx_xbA$g${C`}gm+zEuc@+Bpm`3JRvlet+D3 z45cd>1^3-ok3@up$pAqR!;%n#%2(9T&_EaM2;wrEA9_Y+hY6Hha?f^{p(e3xMUm{f zV-Pb)xe?QeKk42XG=uW#X?L-v4YO;Y9e-8BReXF>NLKjX`}_N?_1=5<<)&MyLe`Of z>)dyY%4}kqD=7?~=&&VL{xu)rahj|rA>+`zeH?oUrJHk$Q~C~D2ys^uFPV>z&#u_L zd-p6MQ|-3b?yVL*r}ymaEOH7Vwgmcu5p;aE*Gn+L-}wX`IsGzt^ynaFUQp}WOc_*W z{`m2u4MoBVQZF1t=gX?Cf&4gCZS5Y&^{xy&KdZsT z!|T{NUOCOq$x-(6^DD8rb4yb*8l0O{{K@$3@6=>uIRcZ)F238-&xRWV8>@Yfo;+LP zwOc$omEz=7yNrVq1&Agx@ccvtiu?~vDUXpyFdYkE<$&9 z7rLa!+S=Qrz|K(sqM~q!7|CxWaLxY~hYRF`l-e9m$?;b>RZiL9-L%RTA=e9Sv&wzQ zS?9zx)(g^$it2hSlN=wAgN(VddVD+UQ#L_C-832ByP)A%RpQ@hDIds}YNA+Vh3;xX zE)9X2qBGq6`i;LBDnvwa^x|zvci=n0J1&E|$$z)w*cZAeT(kvR7_`)40NCnqOp<#wp}`o8#jCVsHm7_InRR;C1&c&GHNzCiz? zNT1TW@N_Go)_tuRGB|iFd&4G+x^7Yap=v+W)p-VW4K6xG!DGDzSf|&1 zo7?}-e!LV$BC-4E#V^jN(ny~VmoHxi0!RY+sCQ>}(BWN{d3i+z+9(D;nVPKLXwy#r zW1e1mL&G#M6RE5`1C`D4Xa%=TcQUhknxby1hA(ne{yqHn9abC(3l^uH=-DMSiG19w z2zeS+@B--67c20g`iLYHihTZvv5P*`z{rT&&);8zjXJ(IMdUFlIC^fZ0upJ*b}Ey! z)wqiN{oRXNj}6*9?eu#rUs3q05I&$X4n{*c-i!)%y1MI5|1#&Bw#Sf?@yrL&x5pl>6#~tR64LuLeU?IE%%+dxeOyKYQ9VZp#<z{JJF~Vw2d9LU{@UD(0m2IEcrNossMx^U_8j%eL5sCd)OkMr0j_o7N7NHV z#XsAb$n71_b8^ZGx&=Vd^5mB~wl#jDy_=GfUt3$*dOg|{Oe^o;K07}4f^TVpq7*_N zdu>3Ex$IJ4rY0`)t0!9fv5+MEPB*I1DC50haV$C?fl(Nqn9mycoSdGb(Ni3gT#^)i zq9uT{ukd=LsQvHhnU-*87niSKnthOT3v+Z&5}JK;3;KXGg!`W!&VUG69;@PC?9Ge= zJ8T*lAcH9JLW+?jQ4pYBSZXSr-C)5pDEG6#WYPTXDwAwCOC(#1Vr1k8NR!(K2W{Z^ zGh176@T}Pi!PvGFwW5_yQ?Y<93Ry{YIS(Kef)%65&#hE3N9UtWQ$SA`{r&w$UwlOA z>FIN0RTt<9IJv|ikpQJ&L#M>IZ?7f4d4r}Y1%}0pXJ==`5GmY|EL`k-rnQ&(`T32E zjV(a8+x~h_@#%iebNFt{`S0=I(66OfND!2IxfryxwCE>?q)$Rg83yH@C6JN=voBJ> z{D>(jIpAt~N_cp9xVX6Tz(Qx%*P{pWb*#?MPQ1>J1@9J{5vr)Dz!NMqDMlK+e=<=E zx=_W+z)OgR<5>bI?Y~YOT0sDM8SK@!A=radcjaYDN(7Xr*}z57yAp8m*YgUU*p>MB_}wxcDA8Awk)aS* z*bD(yLZV^?=QOEw_*!IG+*e|y0(^=2+BF1(T|E`HY(?}U!@k}wv(fT9_+^L)f~*71 zi4Tyz;9kZ1R=kMF*iiC9LIwr~V8l3JjNhQt*Z~!5wLV+|;{O?}HlvIW8|>qsAduET z<&m<^i@-|Iz!rJy7WyF}pgv2w8(Wn?l6a3NSCv;31QRGBukGnex8fLM&`Jof?sG8n zo(vgocWD-%UQlol1;RmC=D>qOdgKfEdtfN-wT*yVgYQMJy*fmy3LAiT#$`4$;9{U7 z2+`#xIK*Wd0TQECn6!)xv^RL+))T=b-aY^H zi4T6{vd~G9CgqL~i%u=(K#g{DxIQxn$1Sj(gxIoy^$`WYY?lWPz^J6Qp z(L`!WJ%iQqClj<_A1!EqTv!MNWsn2aKrTaM&=rV#&q!`qBIKta!p^`*H2<;5%2uMNO)*Z+iNO?hE zHvfX=%PT80Af0fDh{CI?#3B5lAEKg7F|tzxiu!1JNm8hwTVZ>F9ftEap6LaQ@bmF$ z!)Cnv&L~j^cneM7(G&qnq*-v2;W|%#^D)I-=tvpx+1lDRwY6cvyObfH*1D}cM-x!~ z)6np6@X7+DwD&IZd^8Cbi5b=wU7w)$ZEYTD*O`W{m+KvvXHJLK0x|bDL~w$qD)S zN^7Lpv81%LIRuYv8bCf6dYAcFGfSju1t_cqC3tQKMPI=J&~SLL*qjMsODF^eQ0Sl; z|9-Uw%}WWEjV2Eo*Ghc)A+W=KWaZ=}EG@}E1wUi3x&(gvy}R3>*5A&K3!P%2*U|#v z=DJQsE5u0Pm@!RF@=(nig+7j0Hig#mtHE3~vdh@mv*4)IqITryNH#n=+6)RAh^u!` zmD`FQ6eT$jW^f4zxWBfpew~<*8S>4A-GE{pA8KGcy}~#&m4c@Q?J6QCLzK8un_(5-6haKjYmU`z2$yZ;0l}wTwY#Y zi-!-{(DBy7?y*6!nJMD-u?|N{!{KzIS$Zx>(f$Ux0qcXMi@aX)4*d>|n(v&d$y+E}%aV1G0iyK;U+>!B16n zbwy~`IUjFZgq~)D?BAL2oSG@?l~n>?K0n*jLGuQ5rx7qh`1CbB9s2ViRSHs4(rF0( z066APoZr{h7{cyn{oRIU(y6=c3zH2+KU(fT#|rpM^sFVGPpE4;y--&aQ;~O zbbVndGcXpk0GgXY^M_|88JR16HBdem9~4@(>wtNk15stVjIY?YIHqEQv7dlw0>IKkG&zFn>U?8;^EyjGer7!$$V0`(-hx^@V*|Z} z5DQ5)Zy}W-WUFF9fXv<>xy*by;QZv#6&fM7USSIBXW(LQ#2oRNFaJ5(;&)FYg{b|c z+;NO@XJ_YxQnzU=wND3f+4lA}sk+KP7b-{wVMYXC`F3@@5fGu#(b2wNze1sBcV?FC z06Y!?9uh`1@TKj19?!06`jU-HaHBT;^{dU&5EgH@gO>F3=QBzY-L%SVqWV+5|@qcFY zeUWyGxXk?J(b3V{{VGiU&KcjotI|t2BhZatBQFW~K3T65Rl9dt^3bPHr0C+Z9sEuL zhO7cA0@_eT`)4q|@fBi+g{371d<7b$fg^=LBjc$N6Z{9XWv)o{}ONc9j9c z$!UwIAtVJRI-nSQ*8zD#0Q%$fasl+bH*2$>Ha@c*u4fk!^-eUqVUp`W>G;Fl0x}a|7N?Q5pUhKr$ zP}A41jQ{kR=a4WkqB`-#dk-7jqc!hGs$jP-b}bS0m6SJARt8Bj-o&9;b1Ez=cX&Lu zCS!oFXfQ;ea-r~1O-Fy|)2C0>&O4-eJjQtWza8JjA`yBoNp}Y3OR7ozXipC}L!+Xi z2IEnxv=D~}M_^Q@#A+#Xdu;!@!|I3zL-=HTVoHSaB$@NVpv&2*lfe4=dTuNd$sV^g z%Bp7wQuEkft(exlEhzzi5OKvz?ID`~}h8z776F8qH4DV?Tz>AEzYC7gQB zM=-mBOcGzf@rih0&h1MfW(`4{e|^z*D9x>G5U_3Yl*N5_`YUFdv^Qz` z1hzUuDE{}ag`9T3D_Vfpyo*&tvZF%D<7p)!GH_8-Hk1ux$JdQXS| zq4z=pk0cSa4Se{{IJbd_P?8ijh2Ehjj6PgMDL5LkB?M&c?Hmg(A;+ihjjoN z6aM|Vf6Wx3*bcVj!kt&A&0rFUgaRMp*2}@)iRVU*w$BCyow?^(DnIP2++Qas%pun0 z%m!a);hL%?q<>yMXi#W1z1X&QJQ2wJrsCwr7gWKk2Qd5fa&`L-6ATWKP|$*y z5qem1bon7ju-)bb{DCT*vuJ|r=U<;UIYRhV98DD;X8erNez=b89=QIwz>=_C=blCK zk(+@)!HrCQPFY5}q~qCdp@Ae9s}$?$Z{F%}ojX4nGn=$I7++DkXl~;7UZbHv>dY$vnA+K%%VIJxUMp1IdQc`$HW5@VVNQO>?kP1>#q>yM&l|WrjpWqS$_<4c8y&c|4tx`2r7qSr3-y&qMz zQSC=vLkPsD%bB-Yf7lP#98!tcPU3@oUYqa6`(Q4@6t^|ia$G}@xJhPDXENT%5c{Z~ z$imtxic8dv(v}WYa0XSIK~HY}yP4yRblcI&;4>a#gwfq9g{FEMRBSB)dVlTO4h}&6WkO6 zAw)iSor9v$6^!mVM)i6|ad((gY;j;_l8&OGY5P{s^ec{uu72 zcc+!0G`U4qrry{8>rV3BoyIcr!Vb0F$9F~TI-@>1{8}5t+&*-mAc=p!6HTrw`C|Lt z&V-Y{N=*BkmPdU~-N@zDJ5A4c>2~G{6sQL(WX#&?<^9IF5kZx)S4Aqy$$}CNgHl7k^NEr63d5>2@9bb9Y|P2D{A3j6Wf)qbAyKfQHk|Ie#+)* zwLdSi&1BD_|7de|=jIqTLLu-87Gii*5^?*J4osUyL8he9E4HwDn451^*o(X`#}utJ zcF~k3sFdYo%jPsUJ%5F4PyKjqi5+b-YIu04YHz|zdyg;>Zz?wsypE1n{(Qsfh`=-2 zpO8R?nYAb2=nhvr5v}~RuQHdE**sZTXpqYJIx#V##+*pWxo^sg*oDp{z&KKdE>MYX zXtQ8zoIc3a9|Mv2@xvuVpwn2$&7tC?<4R)Pl~!QY)~az?W@tNZKl&RwcY6N(L?y}g z!kQo2>~Ajz;bT(>de6A>9HyWOGcM%bSH}W8x$d3wxb7+omm0?B2T7h{m_@Q_%%$r|`RUUHDg1v)hs9K<1_dlr8Ham^34^+-YC4SL?VE)5g7&{P z4Q>Y;E)nZ?%*cRB0B-G0E$GU2c5y~0=Dq>4iAms5H*u?6%(sdn!>o60kGxQw@&5>l z8^BaD14b#2a+s`Ue#;1mh{a2hE?6v$niE&I5<$a2q0R$+0nlCIsbqz$P=%cx4Ds1^ zTwe5-xw7*3v7Soy{reKRA+Er7)FWTQGc(Msh#^nr%Ue8sby6=!bt6BKqRGwq~B z>VOArr;6aiY;zOv1@whUEtr*aztM`=1_3Xa**qxPrs>m{brn;yZ+~O{`9ZVl6$YWp z03lw3s8d57sgfjO=<4b&|NcmqohcjIj%4ziEaHDid**pt&DZaO@ppG-1cZTFg0r&#wy)QNh4rnS7!z@H5{8d;c%7WlfSZLE*SX7gc2S`#xu_c~ z7)`<;;k9jTZ*XY+!G;R^`JDq=+uvDat2afJD(+S2_77i%9HXbga{y*Xyf-h%;xy6v zE2t6i0poHChK7c#s>ob3u*yU%;OL+L%`o0B@xX>h4PcMY*QJ4ZRRkLOTlw{&zmSRw2c#m1IuZ&3AVZ)A|8rVDB9t86 zL}M=bFa4^2_tfC!|8e5`|M`a>t1$$jI`g28L~}>{0Y+Ir^avw>VKs}|h6dU7jSUSU zMiSkU7kJDegUqOc^@!+bB+LZ9?KT$#_-9mMcLy&QhUf2c)|aawxpa%}@}kE;gi_Xn z9VaFxG_Jw(?O1_O<(h-qUQ6qT-xv@;^uYr5SI{@~LO*(Idwu)2)QHJDfEd#ljb zH*F46@KQTY{49DYK+@(0bmV^QeK5FpkCuOpM7Qu8bR7*k;e}U7-4=PCOOAwy)YMx=@um}t5 zpO=I!w4yq^NCV$JRlyPGMf}VODrrbjLYn09Ea+>^3VIx zd~9>`DomX2uaD?MJ%nsiTx-!np(4ZU>TWVhdm#X{{)g?P5G$%OJXcKzyaLm|^i@t% z-=HJ%0?pNP^)zsw!O%BMQ>(1rc=raTd3ktwXO@?7;rj~bh!Y=iO0z1zLXQNa%l0*K z2l>94Yiw+-n23ES!}Be(|w@iSo~-r7xvub!%F{8Mh?e zmWiPogc_@AnK@SD_M81lbC4>cR59xQG`dHNE%7yf2J7yk`+X_*EBP0bQLV>bn$&WP z!ZzJOX!-zPGzuN_F1+`bNxns&&;rj^SkpG05$vG*Emo=5&hqE&ez^$!$y4qC^Gaf= zd-sg$ZAq}N(%*3B$=(eDM?u`LcD`ipx5Mss<;wIy%YBJTTV8jzYzz%ZfLiBr7E_A{ z$pRJ_0e@DmLqJz&h$`R#yDNXXr0wP9Gi$Y^vpq4JnPuL88{Avj70NK4bSXTBQO_or zUbcWq6}VPuWGH!}6Pj}8_oVk1>I8aer4kaJucddQwu_jPg>3{z%N;@+&quKc39r(r z=3+o^;j9)qJ5U~94bRJb(D^bEI=Ol(;18ak%-allC#l6fc+QU+FVq;q3wS{7FX)2} z(=*U1{{1a}tT>PfY(IN-{B_U+6B&n;q#tF?&zZmdAPnLBxfIN@qBvhF^ z@$p8je`u`CspU`cscVvwP*NkUuR_n$6UGn10CQ#yZ3{wW$LmhdlDK#F4acQ;Zf0re zI<(D&!r|F&!bz4sVFCGKiyWw#4Ole0xwGPijCjsguD762d|!OniNQGT~EQ zOY}T8BLj*}^=SL4XYgpl<&Poxp*qp5|5KWT3x@+zIoyPxs9on;G2BjZZ(61-oImc}pyKWd!;Dx?AyQCydEd`eSQ)w6S>qlEXdyiOH+|kZF|0MK&Yf|Dl zD$1YPoqXNslpw%FLcBLK0yT&Z5HIVzcccT3%u%m`I_JUWPj`J#~cFjVwdAzPFEHA&4coiIAouUV&vrYJ~PAe_cpOks`A-% z%gxJu&{XYIBJ%)Cn_)ca$5*I3hk9}5_NPvXT^HGYPUbdesR@hN{=t(vSRo?R(&EV! z#4_wj!$2JCoKjxZ)|xqJp?^1IO3c)$m-c_D#OE$9WKINjuLc~X#*7UWvxrLKiLMSR zZN*#lukP{TcNrYkY3b;MpC(5PGj0p>Wy)TK>Y=~HP|ojAS?8>R57BvLX`80_EAexV+rNFRqudHkzOVt45-+x6DD(*=8k{K~=vL1Soj zGF9@zLPWu~Nx?!N0^z(dZhR&%$bb#DTxmi?o$_Y3b435x`?rC#|1mKS@Wbn0$kqQ-rQ~;bU`nKuYV5c_mc)*F}K8b84+S86-U#8(+pMV zCx0A86&<3bI4p~=$MMt>0YZObQjFf7SNpDU0JHd-SeAdg|2s8dYE^3ormyo~KLPFa zvCtR7`P1W-GF&DmbIm(}0*=mdXNNkU>p!3VpD2jefJ}TX*lsRcd}8RYy*woo6&wvu zMtSR^_lO`yr7r>{uZtAN4x$$4jQ5pzMr)}K=;<&K*wLxcDXiF*6FPf+=S7pCPA?&( z|2JU4Ei370*^DjiUR7xC%ftdZ6FP*m<%Kz5r`FN8WdZ_17>Iqw9nrA5@}?Rq1?t@i zk^TRI8L-T%gkE*%jb}(#$IZwJ@BEr3K_KGrrYuHZ1mKR=V404~-0sS{bhkGZ&yj(F z=WLGuq$%-BSlrj{8O+bt@8m|^PB0*qV6Da(3i3}2y5Rqbr6ocfm#-Nni~htM>cN?N z{a3}p(TV_JG%JjEaaO?)$AtVk`i&BVv_!+B>rj#gMG#PSRkf7X5@Ia=kart2J*h{s z@xqb3+}!Ub;?Dd$=|I8Q6~zlvF7O4j31` z@0KRxcU#sJb{L91f|T_QYPtUjc#wN}Bc=r{BI{8YL5#sFmL0K}yUkb#ASMqUJdB=i z4Uw*@5m-Ba(QLzskIBj!D*OmO4sDpq5VbxO)3S$j4)_1aX)ce@{IdI{(6Y!obN1L* zqT;d1`cXWI?#w?AMLK7WiHJV0OF1(gc|?r3Cc8<2kOxBJp*q~7sQT8+V(~M-_@1EOn#s&f40-ieYNLcTbfvwC6Rk$~ zmQ@gw85U>X|7>_n)_XDj7_GTcUPs}Ihj?K8(Wm4{#S9GZ|IDDgw0HS-eNY9<=yrh` zB!pnVB8um9EZ4>c!ik*Ok1z6#e-+?a2|q=bc97ul9jpF3Mp`+bbocIMQc-$nodqcp zKUAYTp}m^UL;u|xUk{HmiT@TK<5Xk50Pk||N5tf0l|k2xmF;8uxz3RU$;B@Kk7(b# zakiWIGU;_PBXm*piRWNr48z=sk1(&5bDB1y-Z z&20(opC8RRK~j0Ec=P{?oEi|XOCoFQ--|kCOItZY5QV_jfP z68|?3qdEIw@X*@EQfInYVq4{{U#-0y5$I{&i<;`jv2o}*pgIU%Tb1-8H4oNVGe6}N z5ddGa9fe`7_}YI}o9dBYf-&KyvjG`L)_-U{9xAYXcm-i3`bdh8UhcX11>RlTgqN>S z-}_P_G(FdX!_4n3@8r5uT6;s-A*S)cv`Lgxthnbs-?&XSYsM^iF8bb}D!$4em;-c) zZrl&x?s4Q)P0Dt?$i1{G=G+=*AD{vVXuQUSVlk0d_lw8G{jv78Z_{SOTDEJ;&3PGL z&Ta;ny98F%cqhDh4sC<&Sr?IL%^$aOv}KZ$4Y~Grf9mP?m+3mffs#?{2et?VxeNOD z0^mR|nEzWYa^Wt1FDTyZqN1q%@^Zx=sY!1lw{U8*%vi5SCrfFZ6Nrhj)4bMUQu%R# z{n^hDl+khZ<2@E@zE2D2_@vt)CLX`=D$_=_G5bWjL9V&0J7(9j(^ z`V@KVo9+1SfZvj!U~$N|kD~$3BW-^t7SRNKoj(1+(y{?hSJ&jx!LGO7D-?bh6b0Ni zd3mbGTO^0ifA$Y3DU*?{|Mh2hNlMzcE=aA**y1Hsm!?R~+>}AN+Cm*LALNag0^rgP1vwL2UrZ(e*;4*|+_UP{JD}cmw;$Or1 ze`kqQCj`y&ROS2|ReeWVfzeJCB9R;BcW=S00tKVfbuJ>l4C%1^I7Un04(O6YAN?4@ z)}5Z!E<5x+Oa$oD`4KQfAU{oN4hJv@;RK&v`@h~cQ0s2LzcEB)PjzOroJEXMPY5;< zn}uc!&pm^HOdfVUXha+E@t#dx5L2CEG@kOmlERNyM>mAEx+rH-)e?@&8RPM{3uU;f zDxXZ?!})8!zd8I;b%ZaV{wvYY)@&j_Dt=&NL;?-h5NPEBPwS+Ob|o8pq>RI+cOT-Ivsb_zrs)@a zJr!22S9Db8Z8}pi!J&nmJ?W=Z|4>;Lt^vpj%W{EdJ2)_1M<|1dc_l-d(6QF<5{%lo z^RT>YmG+LFqU>oa4EF8+XyWNGPK^u4q6@7^65Y`a)XAN*lc(roX^np&lW)-XbvPnq zT~@9Ps8hn1b}|bKV`jbPn;zawmnga5hhJGhq4AnWq3Q|y>84V)Yc%vBp1}Fo&oGAM zFxG$$cNV5O9PfX>503oC(f4zf!sQ>nVJ7SU+_%b!SyGUMkXEd98OCuOh6Uh|j4DH1 zfizUUuQKQ7AJX6SxPm^t;^ie__9aruM+#?$=K~a$Y#bcTKY!|>0ZU1V;C}!X)Kvzl zw=k53F8ParKIwMs936#cW->9#1@O3&!_dZd;Kqcv!QjGd`_9pKj#7e}D7qxwh?n!f zH0!@>nOvdjMpTVy>34|;cLQ#uW?yKOWO8@%rvUfR_&_T|6%kUuG};)g`~PfXpr7lhKj;fCpQ(x%AM(?;d*bqyP^~5+VV! zp-|}t&_+Na*K=9e;YG{q6rAuP$<0DlXn3QB)s)r$|9o^@3x_!V{bLwRf+InZaF(^y z3g(w!KB%uyKflz9GAa}r|AO$VFEBZkk)NX5Y|2J7!{HP81qCRWLxzLW=dSB8c||Sd zRzRQ^fs!z2c(M_J(uY%4FJ8O|%Fkb7Z->F-{vtzus&}L$ET!OU+0Yme_Z&F4|TlMldT?ySyPo zL_`EbE%(#Zk*zMUa;=4Y9=^R36Y0L2H8_9-G0|63LSKzbNa!V;ZIfewV<60&oE`Ek zBqZ&+u=M$BFb&Vh$Vf%Mx3hz8yo8a$?T5i`NSK22a>DTe-;J3W1sKeNSyZ89b>w^} zdN&4O*ghIi9uFnK?ONkUmX_Hw1}rRw+VExfvjZ>A;a8(vA|@q;`P%Xsxbc6zppz;4 z3jR1oI5!wXpbTul24+cN+Abfa?S>u1CVrE}bG1ykAxLU`_Sc5cgc4@6HC0GRTyW7V zmI6lxVBGCarCJ@hHoIFzo%fKRa~A zh-H=*){a71Tl;E+nwsBE8iqCBVSkVR*@))~&4o1Yy(vvNG9VFfR=uLiGK;KbVYlpq*;G_yq~&l(hucc} z1F2Yv+fVLNvd(S2)ekp0ah{YOa6z_8y4mZrlBfQ-<>q!>$aODS$jRm1M6Mp+W_xkt z!`aWL#xRc!XDjmNP^dZd{gyhn=6b1j11)Gw`0~`ao$kv_m zTk5T^V6^V(+J@(Q86$tgK`?rdRUT3 z;|#~*Q-*dV$8(r<}%52LNGxaGY;&8-~;o~fK|~ac2Z&0s((4&HLk%lG@UlMcpm!lBV}Og zC#kW_e6wpz2zmG&0W)kYv$nA3*yL1Hb9rMvhJ@l(Kd0Pvb>sj2O5)&*zpwG};uYDn z6E1{r)!)|FQL@>e-!^gXIo{tBo#MOb=3Mefe7{YD%(VpHFANb9))p2z*Iv|4Z5~bR zLa6@r<1CKzO&d>h$nUOM-Y~8|-QClctSUO&5ni7xjXxvtU{6i;HXAE}h#9jg?lP(#kYhjrEKsF!mmrMr7U6;v}|Zh7!i$J=dJq(Sur z^``HqlHiI#Y=Oj$QUTGPk@Qk`u+I z8htuJB74yO_|We6d!+}?xZwdKEu!wLx&(Qk!xoljj=$&8X{->am(CnT7;6&oBG={Y5p@;hZZdoR#w>igQFSw8pob z?d{}$y0P_4rmjQ!FE<(Edhb`6x~>M3?fr;3w(AG;>rOPSZgK7Hy>UP6hql-Io=Lxu z&Zxdbgb-L8xqkAeOu2P@AS!Q0+;w(a;clMa_pe{%_Bzjg!ia*$lXkm`^T(T3aGu_N zz{lY(-B}jVZ>RBBO1?+VLVs}&XvN<>$>CVY&_T{iqxXCgPNpAW>Vl8<= zd*uaUy2n%9y9TS5d`whyXGXsGYT5bfgf?c*3*$*YxS#rVZfx4~{vlXPto;1bCVCHg zh^E)yKv7|#)8-SAs!31niBw^p)vJ^E@MBmk762^Osu3R9hTQCIPMbVW*}3Ja_mf~S|)1;=~?0s$z<%LSl8 z(=f6#&#?HtH~D$XCQhaw<=CMEatyiAd$AD^em= zm}m9s7k$rWvjOn&JF^)O-G=$Irc+#8Osn=Cc=W|-ghxg>EQg1u=LN5`GS8G9cXM~Y z;GbV$5S_a9AT;bc0QwAhgtxx@j{Am<#H+eDE9QT|vqJ_j?5-HjoWAt_PuXWL|0`Lp zWIhqO`sj+3$W`p@ZyCHl^+f`Lf_dx9)C-ynoegpbGGMc`RjONVgUs*8wb(YP0kEI{HOaDic0kHj##pdY;(k?64 z<;*7{0=gn4a#cHZEFGKGO8eXIxS&LxK7Ne-Kkszfy!9G2A)sb3o7b+PxVZR&_ZH5X zNuOK7*}H8ETYvbDKDUIk=67G~5)^U`y&f0>Ku*?K0BSdA$jS607o6+xzP%hjbl|db zm3KZ7k@HlfM6O~Dnlz<)4IffhEMfgOUvccv0ah(u$moPPYWVn4keA2N%e<;n9_}6< z3>ftUc{$lkeSA1+yS8)u@Ie;8_a+Nwzeel!9l1U-itxxb1O|uj(Hqm)zW#f1v$NT^ za~pFeJp({gbY}prYuTFOq9Ue0F_P`;zvqjTkC-^@erovnUn&N*>eUBe<-&RVvVCj0 z=Mxb*Pq+3&u43LkzKonSmFZ7D#&bje34mWfAcMwEz^g_LrX`Q$p}RZt+0KIwpZoUs z1kM!{@ZQVMllu8m0NmW%xxIG+{U1%nVzJ=u{Ehk*C8gZ7f`w>=f$+XwIeMeB3bd#T#lsu!i#fL$~B*e z$axmq`4y)k;xgBVhZEWP4iX<8WjYD~*_mfZ+qHw5wQAF_S@W{(60$STkdt+mx}lAo zHg|q*4r#k~QjnKR<7- zYic<;N#&5ATe@eLzyGDzY>7zaqar0@Kxo5;q^BP>9m8%Ny54O%LZxyt{RjXJ8ZNRO zTwvpxZ|EG``LY(ki%8|5A|+x#R8$n9p^aFzc%kVi;$#^oPq1a(x7^#i_g@bdkxEBJ zO2mL_)v7Uc=nz&dnonlNY147Ui9UH}HX*?w^ytyUbQBSB0u?C{Lq?1kLCv5b=1!Yp zI*K^Sk+fe}{?S}sc;R`xyu3`u5fLZQhMXb>Em*LC#Kc5inZJZ@e`MCISt?N? zRha&|MGUf7EG$}-LU2$Z6Ne6@AV1G^ByoV$6-!w7<_u=em_ggNm)^BqL@GBGDG|f` z{ry?;*=J-NOXs;E1Ia#{X*!ZzfmNR@;ML@jj2SbA!Gi~zjw2#YrXnR$0fdExv1ZL` z&Yno;vEFxcbpJlnk>oPjY&Jfe`6|<&e2k$(hcIcZ#09k2T%-aydGaK2aoyRz za|eBfjv%i81Gsy5n2sfXA#K-o=DaYTUv_Nc?YG{dU%!5)BZ-J>FG<6tB8t}NJVw{ z@L|RzC$lIe1;4;RqM~C7zd4GA&6?vI5Qw|Gij}M7_%f78*5b z#H2}+xbMFE^c657S05EAk&5ofkt58TH;-k@ma}=wX7cm&WS?n`8aL)2v9a{**^@3^ zx@da?k*k-Alt^V_wOToP^eCB`nG_WjnT~VS@$>VePMtdV`}>=YA|e%oij;_mh=`rE zv$=?fh={#Zq(nqSMC{}a>^B`kL_|cS5-RwAs^ID00tb*200000NkvXXu0mjf#1q$a diff --git a/tests/core/test_parallelism.py b/tests/core/test_parallelism.py new file mode 100644 index 00000000..b3c84fb0 --- /dev/null +++ b/tests/core/test_parallelism.py @@ -0,0 +1,945 @@ +import asyncio +import dataclasses +import datetime +from random import random +from typing import Any, AsyncGenerator, Callable, Dict, Generator, List, Optional, Union + +import pytest + +from burr.common import types as burr_types +from burr.core import ( + Action, + ApplicationBuilder, + ApplicationContext, + ApplicationGraph, + State, + action, +) +from burr.core.action import Input, Result +from burr.core.graph import GraphBuilder +from burr.core.parallelism import ( + MapActions, + MapActionsAndStates, + MapStates, + RunnableGraph, + SubGraphTask, + TaskBasedParallelAction, + map_reduce_action, +) +from burr.tracking.base import SyncTrackingClient +from burr.visibility import ActionSpan + +old_action = action + + +async def sleep_random(): + await asyncio.sleep(random()) + + +# Single action/callable subgraph +@action(reads=["input_number", "number_to_add"], writes=["output_number"]) +def simple_single_fn_subgraph( + state: State, additional_number: int = 1, identifying_number: int = 1000 +) -> State: + return state.update( + output_number=state["input_number"] + + state["number_to_add"] + + additional_number + + identifying_number + ) + + +# Single action/callable subgraph +@action(reads=["input_number", "number_to_add"], writes=["output_number"]) +async def simple_single_fn_subgraph_async( + state: State, additional_number: int = 1, identifying_number: int = 1000 +) -> State: + await sleep_random() + return state.update( + output_number=state["input_number"] + + state["number_to_add"] + + additional_number + + identifying_number + ) + + +class ClassBasedAction(Action): + def __init__(self, identifying_number: int, name: str = "class_based_action"): + super().__init__() + self._name = name + self.identifying_number = identifying_number + + @property + def reads(self) -> list[str]: + return ["input_number", "number_to_add"] + + def run(self, state: State, **run_kwargs) -> dict: + return { + "output_number": state["input_number"] + + state["number_to_add"] + + run_kwargs.get("additional_number", 1) + + self.identifying_number + } + + @property + def writes(self) -> list[str]: + return ["output_number"] + + def update(self, result: dict, state: State) -> State: + return state.update(**result) + + +class ClassBasedActionAsync(ClassBasedAction): + async def run(self, state: State, **run_kwargs) -> dict: + await sleep_random() + return super().run(state, **run_kwargs) + + +@action(reads=["input_number"], writes=["current_number"]) +def entry_action_for_subgraph(state: State) -> State: + return state.update(current_number=state["input_number"]) + + +@action(reads=["current_number", "number_to_add"], writes=["current_number"]) +def add_number_to_add(state: State) -> State: + return state.update(current_number=state["current_number"] + state["number_to_add"]) + + +@action(reads=["current_number"], writes=["current_number"]) +def add_additional_number_to_add( + state: State, additional_number: int = 1, identifying_number: int = 3000 +) -> State: + return state.update( + current_number=state["current_number"] + additional_number + identifying_number + ) # 1000 is the one that marks this as different + + +@action(reads=["current_number"], writes=["output_number"]) +def final_result(state: State) -> State: + return state.update(output_number=state["current_number"]) + + +@action(reads=["input_number"], writes=["current_number"]) +async def entry_action_for_subgraph_async(state: State) -> State: + await sleep_random() + return entry_action_for_subgraph(state) + + +@action(reads=["current_number", "number_to_add"], writes=["current_number"]) +async def add_number_to_add_async(state: State) -> State: + await sleep_random() + return add_number_to_add(state) + + +@action(reads=["current_number"], writes=["current_number"]) +async def add_additional_number_to_add_async( + state: State, additional_number: int = 1, identifying_number: int = 3000 +) -> State: + await sleep_random() + return add_additional_number_to_add( + state, additional_number=additional_number, identifying_number=identifying_number + ) # 1000 is the one that marks this as different + + +@action(reads=["current_number"], writes=["output_number"]) +async def final_result_async(state: State) -> State: + await sleep_random() + return final_result(state) + + +SubGraphType = Union[Action, Callable, RunnableGraph] + + +def create_full_subgraph(identifying_number: int = 0) -> SubGraphType: + return RunnableGraph( + graph=( + GraphBuilder() + .with_actions( + entry_action_for_subgraph, + add_number_to_add, + add_additional_number_to_add.bind(identifying_number=identifying_number), + final_result, + ) + .with_transitions( + ("entry_action_for_subgraph", "add_number_to_add"), + ("add_number_to_add", "add_additional_number_to_add"), + ("add_additional_number_to_add", "final_result"), + ) + .build() + ), + entrypoint="entry_action_for_subgraph", + halt_after=["final_result"], + ) + + +def create_full_subgraph_async(identifying_number: int = 0) -> SubGraphType: + return RunnableGraph( + graph=GraphBuilder() + .with_actions( + entry_action_for_subgraph=entry_action_for_subgraph_async, + add_number_to_add=add_number_to_add_async, + add_additional_number_to_add=add_additional_number_to_add_async.bind( + identifying_number=identifying_number + ), + final_result=final_result_async, + ) + .with_transitions( + ("entry_action_for_subgraph", "add_number_to_add"), + ("add_number_to_add", "add_additional_number_to_add"), + ("add_additional_number_to_add", "final_result"), + ) + .build(), + entrypoint="entry_action_for_subgraph", + halt_after=["final_result"], + ) + + +FULL_SUBGRAPH: SubGraphType = create_full_subgraph(identifying_number=3000) +FULL_SUBGRAPH_ASYNC: SubGraphType = create_full_subgraph_async(identifying_number=3000) + + +@dataclasses.dataclass +class RecursiveActionTracked: + state_before: Optional[State] + state_after: Optional[State] + action: Action + app_id: str + partition_key: str + sequence_id: int + children: List["RecursiveActionTracked"] = dataclasses.field(default_factory=list) + + +class RecursiveActionTracker(SyncTrackingClient): + """Simple test tracking client for a recursive action""" + + def __init__(self, events: List[RecursiveActionTracked]): + self.events = events + + def copy(self): + """Quick way to copy from the current state. This assumes linearity (which is true in this case, as parallelism is delegated)""" + if self.events: + current_event = self.events[-1] + if current_event.state_after is not None: + raise ValueError("Don't copy if you're not in the middle of an event") + return RecursiveActionTracker(current_event.children) + raise ValueError("Don't copy if you're not in the middle of an event") + + def post_application_create( + self, + *, + app_id: str, + partition_key: Optional[str], + state: "State", + application_graph: "ApplicationGraph", + parent_pointer: Optional[burr_types.ParentPointer], + spawning_parent_pointer: Optional[burr_types.ParentPointer], + **future_kwargs: Any, + ): + pass + + def pre_run_step( + self, + *, + app_id: str, + partition_key: str, + sequence_id: int, + state: "State", + action: "Action", + inputs: Dict[str, Any], + **future_kwargs: Any, + ): + self.events.append( + RecursiveActionTracked( + state_before=state, + state_after=None, + action=action, + app_id=app_id, + partition_key=partition_key, + sequence_id=sequence_id, + ) + ) + + def post_run_step( + self, + *, + app_id: str, + partition_key: str, + sequence_id: int, + state: "State", + action: "Action", + result: Optional[Dict[str, Any]], + exception: Exception, + **future_kwargs: Any, + ): + self.events[-1].state_after = state + + def pre_start_span( + self, + *, + action: str, + action_sequence_id: int, + span: "ActionSpan", + span_dependencies: list[str], + app_id: str, + partition_key: Optional[str], + **future_kwargs: Any, + ): + pass + + def post_end_span( + self, + *, + action: str, + action_sequence_id: int, + span: "ActionSpan", + span_dependencies: list[str], + app_id: str, + partition_key: Optional[str], + **future_kwargs: Any, + ): + pass + + def do_log_attributes( + self, + *, + attributes: Dict[str, Any], + action: str, + action_sequence_id: int, + span: Optional["ActionSpan"], + tags: dict, + app_id: str, + partition_key: Optional[str], + **future_kwargs: Any, + ): + pass + + def pre_start_stream( + self, + *, + action: str, + sequence_id: int, + app_id: str, + partition_key: Optional[str], + **future_kwargs: Any, + ): + pass + + def post_stream_item( + self, + *, + item: Any, + item_index: int, + stream_initialize_time: datetime.datetime, + first_stream_item_start_time: datetime.datetime, + action: str, + sequence_id: int, + app_id: str, + partition_key: Optional[str], + **future_kwargs: Any, + ): + pass + + def post_end_stream( + self, + *, + action: str, + sequence_id: int, + app_id: str, + partition_key: Optional[str], + **future_kwargs: Any, + ): + pass + + +def _group_events_by_app_id( + events: List[RecursiveActionTracked], +) -> Dict[str, List[RecursiveActionTracked]]: + grouped_events = {} + for event in events: + if event.app_id not in grouped_events: + grouped_events[event.app_id] = [] + grouped_events[event.app_id].append(event) + return grouped_events + + +def test_e2e_map_actions_sync_subgraph(): + """Tests map actions over multiple action types (runnable graph, function, action class...)""" + + class MapActionsAllApproaches(MapActions): + def actions( + self, state: State, inputs: Dict[str, Any], context: ApplicationContext + ) -> Generator[Union[Action, Callable, RunnableGraph], None, None]: + for graph_ in [ + simple_single_fn_subgraph.bind(identifying_number=1000), + ClassBasedAction(2000), + create_full_subgraph(3000), + ]: + yield graph_ + + def state(self, state: State, inputs: Dict[str, Any]): + return state.update(input_number=state["input_number_in_state"], number_to_add=10) + + def reduce(self, state: State, states: Generator[State, None, None]) -> State: + # TODO -- ensure that states is in the correct order... + # Or decide to key it? + new_state = state + for output_state in states: + new_state = new_state.append(output_numbers_in_state=output_state["output_number"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["output_numbers_in_state"] + + @property + def reads(self) -> list[str]: + return ["input_number_in_state"] + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_number_in_state"), + map_action=MapActionsAllApproaches(), + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = app.run( + halt_after=["final_action"], inputs={"input_number_in_state": 100} + ) + assert state["output_numbers_in_state"] == [1111, 2111, 3111] # esnsure order correct + assert len(events) == 3 # three parent actions + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 3 # three unique App IDs, one for each launching subgraph + + +async def test_e2e_map_actions_async_subgraph(): + """Tests map actions over multiple action types (runnable graph, function, action class...)""" + + class MapActionsAllApproachesAsync(MapActions): + def actions( + self, state: State, inputs: Dict[str, Any], context: ApplicationContext + ) -> Generator[Union[Action, Callable, RunnableGraph], None, None]: + for graph_ in [ + simple_single_fn_subgraph_async.bind(identifying_number=1000), + ClassBasedActionAsync(2000), + create_full_subgraph_async(3000), + ]: + yield graph_ + + def is_async(self) -> bool: + return True + + def state(self, state: State, inputs: Dict[str, Any]): + return state.update(input_number=state["input_number_in_state"], number_to_add=10) + + async def reduce(self, state: State, states: AsyncGenerator[State, None]) -> State: + # TODO -- ensure that states is in the correct order... + # Or decide to key it? + new_state = state + async for output_state in states: + new_state = new_state.append(output_numbers_in_state=output_state["output_number"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["output_numbers_in_state"] + + @property + def reads(self) -> list[str]: + return ["input_number_in_state"] + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_number_in_state"), + map_action=MapActionsAllApproachesAsync(), + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = await app.arun( + halt_after=["final_action"], inputs={"input_number_in_state": 100} + ) + assert state["output_numbers_in_state"] == [1111, 2111, 3111] # ensure order correct + assert len(events) == 3 # three parent actions + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 3 # three unique App IDs, one for each launching subgraph + + +@pytest.mark.parametrize( + "action", + [ + simple_single_fn_subgraph.bind(identifying_number=0), + ClassBasedAction(0), + create_full_subgraph(0), + ], +) +def test_e2e_map_states_sync_subgraph(action: SubGraphType): + """Tests the map states action with a subgraph that is run in parallel. + Collatz conjecture over different starting points""" + + class MapStatesSync(MapStates): + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + for input_number in state["input_numbers_in_state"]: + yield state.update(input_number=input_number, number_to_add=10) + + def action( + self, state: State, inputs: Dict[str, Any] + ) -> Union[Action, Callable, RunnableGraph]: + return action + + def is_async(self) -> bool: + return False + + def reduce(self, state: State, states: Generator[State, None, None]) -> State: + # TODO -- ensure that states is in the correct order... + # Or decide to key it? + new_state = state + for output_state in states: + new_state = new_state.append(output_numbers_in_state=output_state["output_number"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["output_numbers_in_state"] + + @property + def reads(self) -> list[str]: + return ["input_numbers_in_state"] + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_numbers_in_state"), + map_action=MapStatesSync(), + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = app.run( + halt_after=["final_action"], inputs={"input_numbers_in_state": [100, 200, 300]} + ) + assert state["output_numbers_in_state"] == [111, 211, 311] # ensure order correct + assert len(events) == 3 + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 3 + + +@pytest.mark.parametrize( + "action", + [ + simple_single_fn_subgraph_async.bind(identifying_number=0), + ClassBasedActionAsync(0), + create_full_subgraph_async(0), + ], +) +async def test_e2e_map_states_async_subgraph(action: SubGraphType): + """Tests the map states action with a subgraph that is run in parallel. + Collatz conjecture over different starting points""" + + class MapStatesAsync(MapStates): + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + for input_number in state["input_numbers_in_state"]: + yield state.update(input_number=input_number, number_to_add=10) + + def action( + self, state: State, inputs: Dict[str, Any] + ) -> Union[Action, Callable, RunnableGraph]: + return action + + def is_async(self) -> bool: + return True + + async def reduce(self, state: State, states: AsyncGenerator[State, None]) -> State: + # TODO -- ensure that states is in the correct order... + # Or decide to key it? + new_state = state + async for output_state in states: + new_state = new_state.append(output_numbers_in_state=output_state["output_number"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["output_numbers_in_state"] + + @property + def reads(self) -> list[str]: + return ["input_numbers_in_state"] + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_numbers_in_state"), + map_action=MapStatesAsync(), + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = await app.arun( + halt_after=["final_action"], inputs={"input_numbers_in_state": [100, 200, 300]} + ) + assert state["output_numbers_in_state"] == [111, 211, 311] # ensure order correct + assert len(events) == 3 + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 3 + + +def test_e2e_map_actions_and_states_sync(): + """Tests the map states action with a subgraph that is run in parallel. + Collatz conjecture over different starting points""" + + class MapStatesAsync(MapActionsAndStates): + def actions( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[Union[Action, Callable, RunnableGraph], None, None]: + for graph_ in [ + simple_single_fn_subgraph.bind(identifying_number=1000), + ClassBasedAction(2000), + create_full_subgraph(3000), + ]: + yield graph_ + + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[State, None, None]: + for input_number in state["input_numbers_in_state"]: + yield state.update(input_number=input_number, number_to_add=10) + + def is_async(self) -> bool: + return False + + def reduce(self, state: State, states: Generator[State, None, None]) -> State: + # TODO -- ensure that states is in the correct order... + # Or decide to key it? + new_state = state + for output_state in states: + new_state = new_state.append(output_numbers_in_state=output_state["output_number"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["output_numbers_in_state"] + + @property + def reads(self) -> list[str]: + return ["input_numbers_in_state"] + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_numbers_in_state"), + map_action=MapStatesAsync(), + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = app.run( + halt_after=["final_action"], inputs={"input_numbers_in_state": [100, 200, 300]} + ) + assert state["output_numbers_in_state"] == [ + 1111, + 1211, + 1311, + 2111, + 2211, + 2311, + 3111, + 3211, + 3311, + ] + assert len(events) == 3 + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 9 # cartesian product of 3 actions and 3 states + + +async def test_e2e_map_actions_and_states_async(): + """Tests the map states action with a subgraph that is run in parallel. + Collatz conjecture over different starting points""" + + class MapStatesAsync(MapActionsAndStates): + def actions( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[Union[Action, Callable, RunnableGraph], None, None]: + for graph_ in [ + simple_single_fn_subgraph_async.bind(identifying_number=1000), + ClassBasedActionAsync(2000), + create_full_subgraph_async(3000), + ]: + yield graph_ + + def states( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> AsyncGenerator[State, None]: + for input_number in state["input_numbers_in_state"]: + yield state.update(input_number=input_number, number_to_add=10) + + def is_async(self) -> bool: + return True + + async def reduce(self, state: State, states: AsyncGenerator[State, None]) -> State: + # TODO -- ensure that states is in the correct order... + # Or decide to key it? + new_state = state + async for output_state in states: + new_state = new_state.append(output_numbers_in_state=output_state["output_number"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["output_numbers_in_state"] + + @property + def reads(self) -> list[str]: + return ["input_numbers_in_state"] + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_numbers_in_state"), + map_action=MapStatesAsync(), + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = await app.arun( + halt_after=["final_action"], inputs={"input_numbers_in_state": [100, 200, 300]} + ) + assert state["output_numbers_in_state"] == [ + 1111, + 1211, + 1311, + 2111, + 2211, + 2311, + 3111, + 3211, + 3311, + ] + assert len(events) == 3 + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 9 # cartesian product of 3 actions and 3 states + + +def test_task_level_API_e2e_sync(): + """Tests the map states action with a subgraph that is run in parallel. + Collatz conjecture over different starting points""" + + class TaskBasedAction(TaskBasedParallelAction): + def tasks( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> Generator[SubGraphTask, None, None]: + for j, action in enumerate( + [ + simple_single_fn_subgraph.bind(identifying_number=1000), + ClassBasedAction(2000), + create_full_subgraph(3000), + ] + ): + for i, input_number in enumerate(state["input_numbers_in_state"]): + yield SubGraphTask( + graph=RunnableGraph.create(action), + inputs={}, + state=state.update(input_number=input_number, number_to_add=10), + application_id=f"{i}_{j}", + ) + + def reduce(self, state: State, states: Generator[State, None, None]) -> State: + # TODO -- ensure that states is in the correct order... + # Or decide to key it? + new_state = state + for output_state in states: + new_state = new_state.append(output_numbers_in_state=output_state["output_number"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["output_numbers_in_state"] + + @property + def reads(self) -> list[str]: + return ["input_numbers_in_state"] + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_numbers_in_state"), + map_action=TaskBasedAction(), + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = app.run( + halt_after=["final_action"], inputs={"input_numbers_in_state": [100, 200, 300]} + ) + assert state["output_numbers_in_state"] == [ + 1111, + 1211, + 1311, + 2111, + 2211, + 2311, + 3111, + 3211, + 3311, + ] + assert len(events) == 3 + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 9 # cartesian product of 3 actions and 3 states + + +async def test_task_level_API_e2e_async(): + """Tests the map states action with a subgraph that is run in parallel. + Collatz conjecture over different starting points""" + + class TaskBasedActionAsync(TaskBasedParallelAction): + async def tasks( + self, state: State, context: ApplicationContext, inputs: Dict[str, Any] + ) -> AsyncGenerator[SubGraphTask, None]: + for j, action in enumerate( + [ + simple_single_fn_subgraph.bind(identifying_number=1000), + ClassBasedAction(2000), + create_full_subgraph(3000), + ] + ): + for i, input_number in enumerate(state["input_numbers_in_state"]): + yield SubGraphTask( + graph=RunnableGraph.create(action), + inputs={}, + state=state.update(input_number=input_number, number_to_add=10), + application_id=f"{i}_{j}", + ) + + async def reduce(self, state: State, states: AsyncGenerator[State, None]) -> State: + # TODO -- ensure that states is in the correct order... + # Or decide to key it? + new_state = state + async for output_state in states: + new_state = new_state.append(output_numbers_in_state=output_state["output_number"]) + return new_state + + @property + def writes(self) -> list[str]: + return ["output_numbers_in_state"] + + @property + def reads(self) -> list[str]: + return ["input_numbers_in_state"] + + def is_async(self) -> bool: + return True + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_numbers_in_state"), + map_action=TaskBasedActionAsync(), + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = await app.arun( + halt_after=["final_action"], inputs={"input_numbers_in_state": [100, 200, 300]} + ) + assert state["output_numbers_in_state"] == [ + 1111, + 1211, + 1311, + 2111, + 2211, + 2311, + 3111, + 3211, + 3311, + ] + assert len(events) == 3 + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 9 # cartesian product of 3 actions and 3 states + + +def test_map_reduce_function_e2e(): + mre = map_reduce_action( + action=[ + simple_single_fn_subgraph.bind(identifying_number=1000), + ClassBasedAction(2000), + create_full_subgraph(3000), + ], + reads=["input_numbers_in_state"], + writes=["output_numbers_in_state"], + state=lambda state, context, inputs: ( + state.update(input_number=input_number, number_to_add=10) + for input_number in state["input_numbers_in_state"] + ), + inputs=[], + reducer=lambda state, states: state.extend( + output_numbers_in_state=[output_state["output_number"] for output_state in states] + ), + ) + + app = ( + ApplicationBuilder() + .with_actions( + initial_action=Input("input_numbers_in_state"), + map_action=mre, + final_action=Result("output_numbers_in_state"), + ) + .with_transitions(("initial_action", "map_action"), ("map_action", "final_action")) + .with_entrypoint("initial_action") + .with_tracker(RecursiveActionTracker(events := [])) + .build() + ) + action, result, state = app.run( + halt_after=["final_action"], inputs={"input_numbers_in_state": [100, 200, 300]} + ) + assert state["output_numbers_in_state"] == [ + 1111, + 1211, + 1311, + 2111, + 2211, + 2311, + 3111, + 3211, + 3311, + ] + assert len(events) == 3 + _, map_event, __ = events + grouped_events = _group_events_by_app_id(map_event.children) + assert len(grouped_events) == 9 # cartesian product of 3 actions and 3 states