From 8100c66357c5a1bdfc18c8d5376ceed9fd4309ff Mon Sep 17 00:00:00 2001 From: elijahbenizzy Date: Mon, 18 Nov 2024 21:27:19 -0800 Subject: [PATCH] Adds parallelism example - WIP but we want to get out before the Show HN tomorrow --- examples/parallelism/README.md | 7 + examples/parallelism/notebook.ipynb | 632 ++++++++++++++++++++++++++++ examples/validate_examples.py | 1 + 3 files changed, 640 insertions(+) create mode 100644 examples/parallelism/README.md create mode 100644 examples/parallelism/notebook.ipynb diff --git a/examples/parallelism/README.md b/examples/parallelism/README.md new file mode 100644 index 00000000..87cb8972 --- /dev/null +++ b/examples/parallelism/README.md @@ -0,0 +1,7 @@ +# Parallelism + +In this example we go over Burr's parallelism capabilities. It is based on the documentation (https://burr.dagworks.io/pull/370/concepts/parallelism/), demonstrating the `MapStates` capabilities. + +See [the notebook](./notebook.ipynb) for the full example. + +You can follow along with this [Loom video](https://www.loom.com/share/a871abe84f974a499d7fb5bae42c35ae) diff --git a/examples/parallelism/notebook.ipynb b/examples/parallelism/notebook.ipynb new file mode 100644 index 00000000..b727bf64 --- /dev/null +++ b/examples/parallelism/notebook.ipynb @@ -0,0 +1,632 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "7a4c338b-90dc-4866-9cc9-5c9af38f9537", + "metadata": {}, + "source": [ + "# Parallelism\n", + "\n", + "In this notebook we're going to go over how to run parallel sub-applications, using one of the examples from [the documentation](https://burr.dagworks.io/concepts/parallelism/).\n", + "\n", + "We will be showing how to write a simple Burr application that compares models for the same prompt. We will go through the following, showing how Parallelism simplifies it:\n", + "\n", + "1. Creating recursive sub-applications yourself\n", + "2. Wiring through tracking information with [recursive tracking capabilities](https://burr.dagworks.io/concepts/recursion/)\n", + "3. Using the map/reduce capabilities with Burr's [parallelism tooling](https://burr.dagworks.io/concepts/parallelism/)" + ] + }, + { + "cell_type": "markdown", + "id": "10459bdd-2154-43c6-84d2-7bee4825ab9b", + "metadata": {}, + "source": [ + "# Imports" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "e95aa6fe-1eaa-490d-b7fd-ff956306bcc5", + "metadata": {}, + "outputs": [], + "source": [ + "from burr.core import action, State, ApplicationBuilder, ApplicationContext\n", + "from burr.core.parallelism import MapStates, RunnableGraph\n", + "from typing import Callable, Generator, List, Dict, Any\n", + "\n", + "from IPython.display import HTML, IFrame\n", + "\n", + "\n", + "import pprint\n", + "\n", + "import openai\n", + "\n", + "\n", + "from concurrent.futures import ThreadPoolExecutor" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "e59f1418-68f8-47c2-8034-4b928211acbe", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The burr.integrations.notebook extension is already loaded. To reload it, use:\n", + " %reload_ext burr.integrations.notebook\n", + "Burr UI already running at http://127.0.0.1:7241\n" + ] + } + ], + "source": [ + "%load_ext burr.integrations.notebook\n", + "%burr_ui" + ] + }, + { + "cell_type": "markdown", + "id": "948a51e7-75e8-4d47-a23f-c24a89e8be60", + "metadata": {}, + "source": [ + "# Helper/Shared Functions\n", + "\n", + "In this we'll set up the application -- creating our action functions as well as a few different helper functions we'll want to use.\n", + "\n", + "The graph will have a simple (3-node) structure:\n", + "\n", + "1. `process_input` -- pulls data in from the user at runtime\n", + "2. `run_llms` -- runs the different models in parallel\n", + "3. `join_outputs` -- writes a map of LLM -> result\n", + "\n", + "We will first define nodes (1) and (3), then redefine (2) according to the approaches above." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "0998576a-c8cb-45a2-94ce-d8ce326e3be7", + "metadata": {}, + "outputs": [], + "source": [ + "MODELS = [\n", + " \"gpt-4\",\n", + " \"gpt-4-turbo\",\n", + " \"gpt-3.5-turbo\",\n", + " \"gpt-3.5-turbo-16k\",\n", + " \"text-davinci-003\",\n", + " \"text-davinci-002\",\n", + "]\n", + "\n", + "def _query_llm(model: str, prompt: str) -> str:\n", + " \"\"\"Simple function to query our LLM -- we'll only support OpenAI for now.\"\"\"\n", + " \n", + " if model not in MODELS:\n", + " raise ValueError(f\"Model '{model}' is not an OpenAI-supported model.\")\n", + "\n", + " client = openai.Client()\n", + " \n", + " response = client.chat.completions.create(\n", + " model=model,\n", + " messages=[{\"role\": \"user\", \"content\": prompt}]\n", + " )\n", + " return response.choices[0].message.content\n", + "\n", + "\n", + "@action(reads=[], writes=[\"prompt\", \"models\"])\n", + "def process_input(state: State, prompt: str, models: List[str]) -> State:\n", + " \"\"\"First node in our graph -- will take the prompt input and write to state\"\"\"\n", + " return state.update(prompt=prompt, models=models)\n", + "\n", + "\n", + "@action(reads=[\"responses\", \"models\"], writes=[\"all_responses\"])\n", + "def join_outputs(state: State) -> State:\n", + " \"\"\"Final node, just joins in a dictionary\"\"\"\n", + " joined_results = {}\n", + " for response, model in zip(state[\"responses\"], state[\"models\"]):\n", + " joined_results[model] = response\n", + " return state.update(all_responses=joined_results)" + ] + }, + { + "cell_type": "markdown", + "id": "971e9a62-543a-4866-854c-60bbe89039ec", + "metadata": {}, + "source": [ + "# Approach (1) -- doing this manually\n", + "\n", + "Nothing stops you from running multiple queries in the same action. In this we're going to just define the middle action to run in parallel -- it will launch an application in Burr, and wait for the results.\n", + "\n", + "This is the \"manual\" approach -- how you might achieve parallelism without Burr. In it, we're going to do the following:\n", + "\n", + "1. Create a sub-application with a single node that queries the LLM\n", + "2. Run multiple variants of it in parallel\n", + "3. Join the result\n", + "\n", + "This is a useful approach as it is simple and looks like standard python code. That said, it lacks in visibility -- you have no way to know what the sub-LLM calls are doing.\n", + "\n", + "First, we'll define the middle action. Next, we'll define the application. Then we can run!" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "27ee761e-f7e8-4da7-82f2-349653cc6a7e", + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "process_input\n", + "\n", + "process_input\n", + "\n", + "\n", + "\n", + "query_multiple_models\n", + "\n", + "query_multiple_models\n", + "\n", + "\n", + "\n", + "process_input->query_multiple_models\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__models\n", + "\n", + "input: models\n", + "\n", + "\n", + "\n", + "input__models->process_input\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__prompt\n", + "\n", + "input: prompt\n", + "\n", + "\n", + "\n", + "input__prompt->process_input\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "join_outputs\n", + "\n", + "join_outputs\n", + "\n", + "\n", + "\n", + "query_multiple_models->join_outputs\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "@action(reads=[\"models\", \"prompt\"], writes=[\"responses\"])\n", + "def query_multiple_models(state: State) -> State:\n", + " \"\"\"Query multiple models in parallel and store the results in the state.\"\"\"\n", + " models: List[str] = state[\"models\"]\n", + " prompt: str = state[\"prompt\"]\n", + " \n", + " def query_model(model: str) -> str:\n", + " return _query_llm(model, prompt)\n", + "\n", + " with ThreadPoolExecutor() as executor:\n", + " futures = {executor.submit(query_model, model): model for model in models}\n", + " results = {}\n", + " for future in futures:\n", + " model = futures[future]\n", + " results[model] = future.result()\n", + " \n", + " return state.update(responses=results)\n", + "\n", + "app = (\n", + " ApplicationBuilder()\n", + " .with_actions(\n", + " process_input,\n", + " query_multiple_models,\n", + " join_outputs\n", + " ).with_entrypoint(\"process_input\")\n", + " .with_tracker(project=\"demo_parallelism\")\n", + " .with_transitions(\n", + " (\"process_input\", \"query_multiple_models\"),\n", + " (\"query_multiple_models\", \"join_outputs\")\n", + " )\n", + " .build()\n", + ")\n", + "app" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "ab7e1041-fd7e-438e-8a78-aa468830e2f6", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "action_, _, state = app.run(\n", + " inputs={\"prompt\": \"what is the meaning of life?\", \"models\" : [\"gpt-4\", \"gpt-4-turbo\", \"gpt-3.5-turbo\"]},\n", + " halt_after=[\"join_outputs\"]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "c99f7a75-1410-4aa6-b60d-16a592c3ae5f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "IFrame(f'http://localhost:7241/project/demo_parallelism/null/{app.uid}', width=\"100%\", height=\"700px\")" + ] + }, + { + "cell_type": "markdown", + "id": "becfab1b-f89d-49a0-b1af-2f01359bfefa", + "metadata": {}, + "source": [ + "# Approach (2) -- using recursion\n", + "\n", + "Burr allows you to wire through tracking in the UI so you can visualize sub-applications. For this, we will be representing the LLM calls with different models as their own (single-node) application. While it is built for more complex sub-application shapes, it works quite well with the simplicity of a single node app." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14a89b97-ae8f-4ae7-b1fc-a91182d5658d", + "metadata": {}, + "outputs": [], + "source": [ + "@action(reads=[\"model\", \"prompt\"], writes=[\"response\"])\n", + "def inner_query_model(state: State) -> State:\n", + " return state.update(response=_query_llm(state[\"model\"], state[\"prompt\"]))\n", + " \n", + "@action(reads=[\"models\", \"prompt\"], writes=[\"responses\"])\n", + "def query_multiple_models(state: State, __context: ApplicationContext) -> State:\n", + " \"\"\"Query multiple models in parallel and store the results in the state.\"\"\"\n", + " apps = []\n", + " for model in state[\"models\"]:\n", + " apps.append(\n", + " (\n", + " model, \n", + " (\n", + " ApplicationBuilder()\n", + " .with_actions(query_model=inner_query_model)\n", + " .with_state(model=model, prompt=state[\"prompt\"])\n", + " .with_tracker(project=\"demo_parallelism\")\n", + " .with_spawning_parent(\n", + " app_id=__context.app_id,\n", + " sequence_id=__context.sequence_id\n", + " )\n", + " .with_entrypoint(\"query_model\")\n", + " )\n", + " .build())\n", + " )\n", + " \n", + " with ThreadPoolExecutor() as executor:\n", + " futures = {executor.submit(app.run, halt_after=[\"query_model\"]): model for model, app in apps}\n", + " results = {}\n", + " for future in futures:\n", + " model = futures[future]\n", + " results[model] = future.result()\n", + " \n", + " return state.update(responses=results)\n", + "\n", + "app_recursion = (\n", + " ApplicationBuilder()\n", + " .with_actions(\n", + " process_input,\n", + " query_multiple_models,\n", + " join_outputs\n", + " ).with_entrypoint(\"process_input\")\n", + " .with_tracker(project=\"demo_parallelism\")\n", + " .with_transitions(\n", + " (\"process_input\", \"query_multiple_models\"),\n", + " (\"query_multiple_models\", \"join_outputs\")\n", + " )\n", + " .build()\n", + ")\n", + "app_recursion" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ff28d579-084f-463f-9761-b9e85a6d1460", + "metadata": {}, + "outputs": [], + "source": [ + "action_, _, state = app_recursion.run(\n", + " inputs={\"prompt\": \"what is the meaning of life?\", \"models\" : [\"gpt-4\", \"gpt-4-turbo\", \"gpt-3.5-turbo\"]},\n", + " halt_after=[\"join_outputs\"]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "adb02469-9dd6-404e-8a9f-0e50fe21123f", + "metadata": {}, + "outputs": [], + "source": [ + "IFrame(f'http://localhost:7241/project/demo_parallelism/null/{app_recursion.uid}', width=\"100%\", height=\"700px\")" + ] + }, + { + "cell_type": "markdown", + "id": "f8c09d92-045a-4285-840a-f0099fe8b7cf", + "metadata": {}, + "source": [ + "# Approach (3) -- using parallel constructs\n", + "\n", + "We can use Burr's parallel construct to make this even easier. We will build a class that does mapping over states -- in this case, the class will create a recursive action for each model in the `models` field, \n", + "enabling you to run the models in parallel. Burr's `MapStates` API will create the applications and run them in parallel. " + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "ce90216e-5470-4bdb-9d32-c9b7c3e64eb3", + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "process_input\n", + "\n", + "process_input\n", + "\n", + "\n", + "\n", + "query_multiple_models\n", + "\n", + "query_multiple_models\n", + "\n", + "\n", + "\n", + "process_input->query_multiple_models\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__models\n", + "\n", + "input: models\n", + "\n", + "\n", + "\n", + "input__models->process_input\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__prompt\n", + "\n", + "input: prompt\n", + "\n", + "\n", + "\n", + "input__prompt->process_input\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "join_outputs\n", + "\n", + "join_outputs\n", + "\n", + "\n", + "\n", + "query_multiple_models->join_outputs\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "@action(reads=[\"model\", \"prompt\"], writes=[\"response\"])\n", + "def inner_query_model(state: State) -> State:\n", + " return state.update(response=_query_llm(state[\"model\"], state[\"prompt\"]))\n", + " \n", + "class RunOverMultiplePrompts(MapStates):\n", + " def states(\n", + " self, state: State, context: ApplicationContext, inputs: Dict[str, Any]\n", + " ) -> Generator[State, None, None]:\n", + " for model in state[\"models\"]:\n", + " yield state.update(model=model)\n", + "\n", + " def action(self, state: State, inputs: Dict[str, Any]) -> Callable:\n", + " return inner_query_model\n", + "\n", + " def reduce(self, state: State, results: Generator[State, None, None]) -> State:\n", + " state = state.update(results=[])\n", + " responses = {model: output_state[\"response\"] for model, output_state in zip(state[\"models\"], results)}\n", + " return state.update(responses=responses)\n", + " \n", + " @property\n", + " def reads(self) -> List[str]:\n", + " return [\"models\", \"prompt\"]\n", + "\n", + " @property\n", + " def writes(self) -> List[str]:\n", + " return [\"responses\"]\n", + "\n", + "app_parallelism = (\n", + " ApplicationBuilder()\n", + " .with_actions(\n", + " process_input,\n", + " join_outputs,\n", + " query_multiple_models=RunOverMultiplePrompts(),\n", + " ).with_entrypoint(\"process_input\")\n", + " .with_tracker(project=\"demo_parallelism\")\n", + " .with_transitions(\n", + " (\"process_input\", \"query_multiple_models\"),\n", + " (\"query_multiple_models\", \"join_outputs\")\n", + " )\n", + " .build()\n", + ")\n", + "app_parallelism" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "816cf0d7-1f89-4145-a5e3-c2878cec12e1", + "metadata": {}, + "outputs": [], + "source": [ + "action_, _, state = app_parallelism.run(\n", + " inputs={\"prompt\": \"what is the meaning of life?\", \"models\" : [\"gpt-4\", \"gpt-4-turbo\", \"gpt-3.5-turbo\"]},\n", + " halt_after=[\"join_outputs\"]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "7ece3113-cf60-47b1-b375-735070c70b67", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "IFrame(f'http://localhost:7241/project/demo_parallelism/null/{app_parallelism.uid}', width=\"100%\", height=\"700px\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5e31849-5dd6-4e6c-9fd5-0ca3c3e11abe", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/validate_examples.py b/examples/validate_examples.py index 342cee9c..f46bf41a 100644 --- a/examples/validate_examples.py +++ b/examples/validate_examples.py @@ -25,6 +25,7 @@ "templates", "deployment", "talks", + "parallelism", # TODO - remove this shortly ]