diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index f1a03e5f..0458ca43 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -6,14 +6,20 @@ on: - main # Triggers the workflow on push events to the main branch pull_request: # Triggers the workflow on pull request events for any branch - types: [ opened, synchronize, reopened ] + types: [opened, synchronize, reopened] permissions: contents: write + pull-requests: write + +concurrency: ci-${{ github.ref }} jobs: docs: runs-on: ubuntu-latest + env: + PR_PATH: pull/${{github.event.number}} + BASE_URL: https://burr.dagworks.io/pull/${{github.event.number}} steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v3 @@ -24,6 +30,15 @@ jobs: run: | sphinx-build docs -b dirhtml _build echo "burr.dagworks.io" > _build/CNAME # keep the cname file which this clobbers -- todo, unhardcode + - name: Comment on PR + uses: hasura/comment-progress@v2.2.0 + if: github.ref != 'refs/heads/main' + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + repository: ${{ github.repository }} + number: ${{ github.event.number }} + id: deploy-preview + message: "Starting deployment of preview ⏳..." - name: Deploy to GitHub Pages uses: peaceiris/actions-gh-pages@v3 if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} @@ -32,3 +47,19 @@ jobs: github_token: ${{ secrets.GITHUB_TOKEN }} publish_dir: _build/ force_orphan: true + - name: Build PR preview website + uses: peaceiris/actions-gh-pages@v3 + if: github.ref != 'refs/heads/main' + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: _build/ + destination_dir: ${{ env.PR_PATH }} # TODO you need to set this if you're using a custom domain. Otherwise you can remove it. + - name: Update comment + uses: hasura/comment-progress@v2.2.0 + if: github.ref != 'refs/heads/main' + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + repository: ${{ github.repository }} + number: ${{ github.event.number }} + id: deploy-preview + message: "A preview of ${{ github.event.after }} is uploaded and can be seen here:\n\n ✨ ${{ env.BASE_URL }} ✨\n\nChanges may take a few minutes to propagate. Since this is a preview of production, content with `draft: true` will not be rendered. The source is here: https://github.com/${{ github.repository }}/tree/gh-pages/${{ env.PR_PATH }}/" diff --git a/README.md b/README.md index 7cecef4e..30687999 100644 --- a/README.md +++ b/README.md @@ -144,7 +144,7 @@ but realized that it has a wide array of applications and decided to release it While Burr is stable and well-tested, we have quite a few tools/features on our roadmap! -1. Parallelism -- support for recursive "sub-agents" through an ergonomic API (not: this is already feasible, see [recursive applications](http://localhost:8000/concepts/recursion/)). +1. Parallelism -- support for recursive "sub-agents" through an ergonomic API (not: this is already feasible, see [recursive applications](https://burr.dagworks.io/recursion/)). 2. Testing & eval curation. Curating data with annotations and being able to export these annotations to create unit & integration tests. 3. Various efficiency/usability improvements for the core library (see [planned capabilities](https://burr.dagworks.io/concepts/planned-capabilities/) for more details). This includes: 1. First-class support for retries + exception management diff --git a/docs/_static/custom.css b/docs/_static/custom.css new file mode 100644 index 00000000..27379940 --- /dev/null +++ b/docs/_static/custom.css @@ -0,0 +1,5 @@ +/* Enable line wrapping for code blocks */ +.highlight pre { + white-space: pre-wrap; + word-wrap: break-word; +} diff --git a/docs/concepts/index.rst b/docs/concepts/index.rst index 42405173..e6dcba2d 100644 --- a/docs/concepts/index.rst +++ b/docs/concepts/index.rst @@ -21,5 +21,6 @@ Overview of the concepts -- read these to get a mental model for how Burr works. state-typing hooks additional-visibility + parallelism recursion planned-capabilities diff --git a/docs/concepts/parallelism.rst b/docs/concepts/parallelism.rst new file mode 100644 index 00000000..8cec41ed --- /dev/null +++ b/docs/concepts/parallelism.rst @@ -0,0 +1,563 @@ +=========== +Parallelism +=========== + +Burr allows for sets of actions/subgraphs to run in parallel. In this section we will go over the use-cases/how to run them! + +General Idea/TL;DR +================== + +TL;DR +----- + +Burr enables graph-level parallelism by having a "parallel action" that delegates to multiple sub-actions/graphs that run in parallel. +While it has a lower-level API that's focused around tasks, Burr provides a blessed path that allows you to run multiple actions over the same +state or multiple states over the same action, and join the results together in a custom way. + +.. image:: ../_static/parallelism.png + :align: center + + + + +Overview +-------- + +Burr allows you to define parallel actions by expanding a single action into multiple individual actions or subgraphs which +will execute them all and joining the results. This is a simple map-reduce pattern. + +Currently, Burr has two separate APIs for building parallel applications -- higher level (use this first), and lower level. +Beyond that, Burr can support parallelism however you wish to run it -- see the advanced use-cases section for more details. + +Higher-level API +---------------- + +You select a set of "configurations" over which you want to run, and Burr launches all of them then joins the result. + +This means you either: +1. Vary the state and run over the same action/subgraph (think tuning LLM parameters/inputs, running simple experiments/optimization routines, etc...) +2. Vary the action and provide the same state (think running multiple LLMs on the same input, running multiple analyses on the same data, etc...) + +Note we do not distinguish between subgraph and action -- under the hood it's all treated as a "sub-application" (more on that later). + +----------------------------------------- +Run the same action over different states +----------------------------------------- + +For case (1) (mapping states over the same action) you implement the `MapStates` class that provides the following: + +1. An ``action()`` function that provides the action/subgraph to run +2. A ``states()`` function that yields a generator of states to run over +3. A ``reduce()`` function that consumes a generator of final states (from the action/subgraph), and combines them as they come in +4. The state fields from which the action reads +5. The state fields to which the action writes + + +This looks as follows -- in this case we're running the same LLM over different prompts: + + +.. code-block:: python + + from burr.core import action, state + from burr.core.parallelism import MapStates, RunnableGraph + from typing import Callable, Generator, List + + @action(reads=["prompt"], writes=["llm_output"]) + def query_llm_action(state: State) -> State: + return state.update(llm_output=_query_my_llm(prompt=state["prompt"])) + + class TestMultiplePrompts(MapStates): + + def action(self) -> Action | Callable | RunnableGraph: + # make sure to add a name to the action + # This is not necessary for subgraphs, as actions will already have names + return query_llm_action.with_name("query_llm_action") + + def states(self, state: State) -> Generator[State, None, None]: + for prompt in [ + "What is the meaning of life?", + "What is the airspeed velocity of an unladen swallow?", + "What is the best way to cook a steak?", + ]: + yield state.update(prompt=prompt) + + + def reduce(self, states: Generator[State, None, None]) -> State: + all_llm_outputs = [] + for state in states: + all_llm_outputs.append(state["llm_output"]) + return state.update(all_llm_outputs=all_llm_outputs) + + def reads() -> List[str]: + return ["prompts"] + + def writes() -> List[str]: + return ["all_llm_outputs"] + +Then, to run it you just treat it is an action! + +.. code-block:: python + + app = ( + ApplicationBuilder() + .with_action( + prompt_generator=generate_prompts, # not defined above, this writes to prompts + multi_prompt_test=TestMultiplePrompts(), + ).with_transitions( + ("prompt_generator", "multi_prompt_test"), + ) + .build() + ) + +----------------------------------------- +Run different actions over the same state +----------------------------------------- + +For case (2) (mapping actions over the same state) you implement te `MapActions` class that provides the following: + +1. A ``actions()`` function that yields a generator of actions to run +2. A ``state()`` function that provides the state to run over +3. A ``reduce()`` function that consumes a generator of final states (from the action/subgraph), and combines them as they come in +4. The state fields from which the action reads +5. The state fields to which the action writes + + +.. code-block:: python + + from burr.core import action, state + from burr.core.parallelism import MapStates, RunnableGraph + from typing import Callable, Generator, List + + @action(reads=["prompt", "model"], writes=["llm_output"]) + def query_llm(state: State, model: str) -> State: + # TODO -- implement _query_my_llm to call litellm or something + return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) + + class TestMultipleModels(MapActions): + + def actions(self, state: State) -> Generator[Action | Callable | RunnableGraph, None, None]: + # make sure to add a name to the action + # This is not necessary for subgraphs, as actions will already have names + for action in [ + query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), + query_llm.bind(model="o1").with_name("o1_answer"), + query_llm.bind(model="claude").with_name("claude_answer"), + ] + yield action + + def state(self, state: State) -> State:: + return state.update(prompt="What is the meaning of life?") + + def reduce(self, states: Generator[State, None, None]) -> State: + all_llm_outputs = [] + for state in states: + all_llm_outputs.append(state["llm_output"]) + return state.update(all_llm_outputs=all_llm_outputs) + + def reads() -> List[str]: + return ["prompts"] + + def writes() -> List[str]: + return ["all_llm_outputs"] + + +Then, it's exactly the same as above: + +.. code-block:: python + + app = ( + ApplicationBuilder() + .with_action( + prompt_generator=generate_prompts, # not defined above, this writes to prompts + multi_prompt_test=TestMultipleModels(), + ).with_transitions( + ("prompt_generator", "multi_prompt_test"), + ) + .build() + ) + + +--------- +Subgraphs +--------- + +While we've been using individual actions above, we can also replace them with subgraphs (E.G. :ref:`using recursion ` inside applications). + +To do this, we use the Graph API and wrap it in a RunnableGraph: + +- The :py:class:`Graph ` API allows us to tell the structure of the action +- The ``RunnableGraph`` is a wrapper that tells the framework other things you need to know to run the graph: + - The entrypoint of the graph + - The exit points (corresponding to ``halt_after`` in :py:meth:`run `) + +This might look as follows -- say we have a simple subflow that takes in a raw prompt from state and returns the LLM output: + +.. code-block:: python + + from burr.core import action, state + from burr.core.graph import Graph + + @action(reads=["prompt"], writes=["processed_prompt"]) + def process_prompt(state: State) -> State: + processed_prompt = f"The user has asked: {state['prompt']}. Please respond directly to that prompt, but only in riddles." + return state.update( + prompt=state["prompt"], + ) + + @action(reads=["processed_prompt"], writes=["llm_output"]) + def query_llm(state: State) -> State: + return state.update(llm_output=_query_my_llm(prompt=state["processed_prompt"])) + + graph = ( + GraphBuilder() + .with_action( + process_prompt=process_prompt, + query_llm=query + ).with_transitions( + ("process_prompt", "query_llm") + ).build() + ) + + runnable_graph = RunnableGraph( + graph=graph, + entrypoint="process_prompt", + halt_after="query_llm" + ) + + class TestMultiplePromptsWithSubgraph(MapStates): + + def action(self) -> Action | Callable | RunnableGraph: + return runnable_graph + + def states(self, state: State) -> Generator[State, None, None]: + for prompt in [ + "What is the meaning of life?", + "What is the airspeed velocity of an unladen swallow?", + "What is the best way to cook a steak?", + ]: + yield state.update(prompt=prompt) + + ... # same as above + +In the code above, we're effectively treating the graph like an action -- due to the single ``entrypoint``/``halt_after`` condition we specified, +it can run just as the single prompt we did above. Note this is also doable for running multiple actions over the same state. + + +-------------- +Passing inputs +-------------- + +.. note:: + + Should ``MapOverInputs`` be its own class? Or should we have ``bind_from_state(prompt="prompt_field_in_state")`` that allows you to pass it in as + state and just use the mapping capabilities? + +Each of these can (optionally) produce ``inputs`` by yielding/returning a tuple from the ``states``/``actions`` function. + +This is useful if you want to vary the inputs. Note this is the same as passing ``inputs=`` to app.run. + + +.. code-block:: python + + from burr.core import action, state + from burr.core.graph import Graph + + @action(reads=["prompt"], writes=["processed_prompt"]) + def process_prompt(state: State) -> State: + processed_prompt = f"The user has asked: {state['prompt']}. Please respond directly to that prompt, but only in riddles." + return state.update( + prompt=state["prompt"], + ) + + @action(reads=["processed_prompt"], writes=["llm_output"]) + def query_llm(state: State, model: str) -> State: + return state.update(llm_output=_query_my_llm(prompt=state["processed_prompt"], model=model)) + + graph = ( + GraphBuilder() + .with_action( + process_prompt=process_prompt, + query_llm=query + ).with_transitions( + ("process_prompt", "query_llm") + ).build() + ) + + runnable_graph = RunnableGraph( + graph=graph, + entrypoint="process_prompt", + halt_after="query_llm" + ) + + class TestMultiplePromptsWithSubgraph(MapStates): + + def action(self) -> Action | Callable | RunnableGraph: + return runnable_graph + + def states(self, state: State) -> Generator[Tuple[State, dict], None, None]: + for prompt in [ + "What is the meaning of life?", + "What is the airspeed velocity of an unladen swallow?", + "What is the best way to cook a steak?", + ]: + yield state.update(prompt=prompt), {"model": "gpt-4"} # pass in the model as an input + + ... # same as above + +---------------------- +Full Cartesian Product +---------------------- + +If you want to run all possible combinations of actions/states, you can use the ``MapActionsAndStates`` class -- this is actually the +base class for the above two classes. For this, you provide a generator of actions and a generator of states, and Burr will run all possible +combinations. + +For tracking which states/actions belong to which actions, we recommend you use the values stored in the state (see example). + +.. code-block:: python + + from burr.core import action, state + from burr.core.parallelism import MapStates, RunnableGraph + from typing import Callable, Generator, List + + @action(reads=["prompt", "model"], writes=["llm_output"]) + def query_llm(state: State, model: str) -> State: + # TODO -- implement _query_my_llm to call litellm or something + return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) + + class TestMultipleModels(MapActions): + + def actions(self, state: State) -> Generator[Action | Callable | RunnableGraph, None, None]: + # make sure to add a name to the action + # This is not necessary for subgraphs, as actions will already have names + for action in [ + query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), + query_llm.bind(model="o1").with_name("o1_answer"), + query_llm.bind(model="claude").with_name("claude_answer"), + ] + yield action + + def states(self, state: State) -> Generator[State, None, None]: + for prompt in [ + "What is the meaning of life?", + "What is the airspeed velocity of an unladen swallow?", + "What is the best way to cook a steak?", + ]: + yield state.update(prompt=prompt) + + def reduce(self, states: Generator[State, None, None]) -> State: + all_llm_outputs = [] + for state in states: + all_llm_outputs.append( + { + "output" : state["llm_output"], + "model" : state["model"], + "prompt" : state["prompt"], + } + ) + return state.update(all_llm_outputs=all_llm_outputs) + + def reads() -> List[str]: + return ["prompts"] + + def writes() -> List[str]: + return ["all_llm_outputs"] + + +Lower-level API +--------------- + +The above compile into a set of "tasks" -- sub-applications to run. If, however, you want to have more control, you +can use the lower-level API to simply define the tasks. This allows you to provide any combination of actions, input, and state +to the tasks. + +For those who are curious, this is what the above APIs extend from. + +.. code-block:: python + + from burr.core import action, state, ApplicationContext + from burr.core.parallelism import MapStates, RunnableGraph + from typing import Callable, Generator, List + + @action(reads=["prompt", "model"], writes=["llm_output"]) + def query_llm(state: State, model: str) -> State: + # TODO -- implement _query_my_llm to call litellm or something + return state.update(llm_output=_query_my_llm(prompt=state["prompt"], model=model)) + + class MultipleTaskExample(TaskBasedParallelAction): + def tasks(state: State, context: ApplicationContext) -> Generator[SubGraphTask, None, None]: + for prompt in state["prompts"]: + for action in [ + query_llm.bind(model="gpt-4").with_name("gpt_4_answer"), + query_llm.bind(model="o1").with_name("o1_answer"), + query_llm.bind(model="claude").with_name("claude_answer"), + ] + yield SubGraphTask( + action=action, # can be a RunnableGraph as well + state=state.update(prompt=prompt), + inputs={}, + # stable hash -- up to you to ensure uniqueness + application_id=hashlib.sha256(context.application_id + action.name + prompt).hexdigest(), + # a few other parameters we might add -- see advanced usage -- failure conditions, etc... + ) + + def reduce(self, states: Generator[State, None, None]) -> State: + all_llm_outputs = [] + for state in states: + all_llm_outputs.append( + { + "output" : state["llm_output"], + "model" : state["model"], + "prompt" : state["prompt"], + } + ) + return state.update(all_llm_outputs=all_llm_outputs) + + +Advanced Usage +-------------- + +We anticipate the above should cover most of what you want to do, but we have a host of advanced tuning capabilities. + +--------- +Execution +--------- + +To enable execution, you need to pass a ``burr.Executor`` to the application, or to the actions themselves. We have a few available executors: + +- ``burr.parallelism.MultiThreadedExecutor`` -- runs the tasks in parallel using threads (default) +- ``burr.parallelism.MultiProcessExecutor`` -- runs the tasks in parallel using processes +- ``burr.parallelism.RayExecutor`` -- runs the tasks in parallel using `Ray `_ +- ``burr.parallelism.Dask`` -- runs the tasks in parallel using `Dask `_ + +For async, we only allow the ``burr.parallelism.AsyncExecutor`` (default), which uses ``asyncio.gather`` to run the tasks in parallel. + +You can pass this either as a global executor for the application, or specify it as part of your class: + +Specifying it as part of the application -- will get routed as the default to all parallel actions: + +.. code-block:: python + + app = ( + ApplicationBuilder() + .with_executor(MultiThreadedExecutor(max_concurrency=10)) + .build() + ) + +Specifying it as part of the action -- will override the global executor: + +.. code-block:: python + + class TestMultiplePrompts(MapStates): + + def action(self) -> Action | Callable | RunnableGraph: + return runnable_graph + + def executor(self) -> Executor: + return MultiThreadedExecutor(max_concurrency=10) + + ... # same as above + + +-------------------- +Persistence/Tracking +-------------------- + +By default, the trackers/persisters will be passed from the parent application to the child application. The application IDs +will form a stable hash (presuming the order is constant) to ensure that the same application ID is used for the same task every time. + +It will also utilize the same persister to load from the prior state, if that is used on the application level (see the state-persistence section). + +This enables the following: + +1. Tracking will automatically be associated with the same application (and sub-application) when reloaded +2. If the concurrent application quits halfway through, bthe application will be able to pick up where it left off, as will all sub-applications + +You can disable either tracking or persistence at the sub-application level by passing ``track=False`` or ``persist=False`` to the constructor of the application method. + +You can also disable it globally using the application builder: + +.. code-block:: python + + class TestMultiplePrompts(MapStates): + + def action(self) -> Action | Callable | RunnableGraph: + return runnable_graph + + def tracker(self, context: ApplicationContext) -> TrackingBehavior | None: + # return "cascade" # default + # return None # no tracking + return LocalTrackingClient(...) # custom tracker + + def persister(self, context: ApplicationContext) -> Persister | None: + # return "cascade" # default + # return None # no persistence + return SQLLitePersister(...) # custom persister + + ... # same as above + +----- +Other +----- + +Things we will consider after the initial release: + +- customizing execution on a per-action basis -- likely a parameter to ``RunnableGraph`` +- Streaming -- interleaving parallel streaming actions and giving results as they come +- More examples for inter-graph communication/cancellation of one action based on the result of another +- graceful failure of sub-actions + + +Under the hood +============== + +Beneath the APIs, all this does is simplify the :ref:`recursion `: API to allow for multiple actions to be run in parallel. + +- ``RunnableGraph`` s are set as subgraphs, and recursively executed by the application, using the executor +- an ``Action`` are turned into a ``RunnableGraph`` by the framework, and executed by the executor + +In the UI, this will show up as a "child" application -- see the :ref:`recursion `: section for more details. + + +Advanced Use-cases +================== + +As this is all just syntactic sugar for recursion, you can use the recursion to get more advanced capabilities. + +This involves instantiating a sub-application inside the action, and running it yourself. + + +Interleaving Generators +----------------------- + +Say you want to provide an agent that provides up-to-date progress on it's thoughts. For example, say you want to providea +a planning agent with a similar interface to OpenAI's o1 model. + + +To do this, you would typically call to :py:meth:`iterate `. Now, say you wanted to run +multiple in parallel! + +While this is not built to be easy with the APIs in this section, it's very doable with the underlying recursion API. + +The basics (code not present now): + +1. Create each sub-application using the ``with_parent_context`` method +2. Run each sub-application in parallel using the executor +3. Combine the generators in parallel, yielding results as they come out + + +Inter-action communication +-------------------------- + +Say you have two LLMs answering the same question -- one that gives immediate results back to the user +as they come in, and another that thinks for a while to give more sophisticated results. The user then has the option to say they're happy +with the solution, or they want to wait for more. You may want to eagerly kick off the second LLM +if you're concerned about latency -- thus if the user wants more or does not respond, the more sophisticated +LLM might come up with a solution. + +To do this, you would: + +1. Run the sub-graph consisting of the first LLM using :py:meth:`iterate ` +2. Simultaneously run the second LLM using :py:meth:`iterate ` as well +3. Join them in parallel, waiting for any user-input if provided +4. Decide after every step of the first graph whether you want to cancel the second graph or not -- E.G. is the user satisfied. diff --git a/docs/concepts/recursion.rst b/docs/concepts/recursion.rst index 323cfeee..e27dd06f 100644 --- a/docs/concepts/recursion.rst +++ b/docs/concepts/recursion.rst @@ -1,3 +1,5 @@ +.. _recursion: + ====================== Recursive Applications ====================== diff --git a/docs/conf.py b/docs/conf.py index 375b9f09..7713e750 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -32,6 +32,10 @@ html_theme = "furo" html_static_path = ["_static"] +html_css_files = [ + "custom.css", +] + html_title = "Burr" html_theme_options = { "source_repository": "https://github.com/dagworks-inc/burr",