diff --git a/examples/hamilton-integration/README.md b/examples/hamilton-integration/README.md
new file mode 100644
index 00000000..58e15ac7
--- /dev/null
+++ b/examples/hamilton-integration/README.md
@@ -0,0 +1,8 @@
+# Modular RAG with Burr and Hamilton
+
+This examples shows the "2-layer" approach to building RAG and LLM agents using Burr and Hamilton.
+
+You will find:
+
+- `notebook.ipynb` contains a guide on how to build a modular RAG application. It details how a typicaly project evolves and how Burr and Hamilton can help you achieve the desired modularity.
+- `application.py` and `actions/` contain the code from the final application version showed in the notebook.
diff --git a/examples/hamilton-integration/__init__.py b/examples/hamilton-integration/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/examples/hamilton-integration/actions/__init__.py b/examples/hamilton-integration/actions/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/examples/hamilton-integration/actions/ask_question.py b/examples/hamilton-integration/actions/ask_question.py
new file mode 100644
index 00000000..b33931eb
--- /dev/null
+++ b/examples/hamilton-integration/actions/ask_question.py
@@ -0,0 +1,32 @@
+import lancedb
+import openai
+
+
+def relevant_chunks(user_query: str) -> list[dict]:
+ chunks_table = lancedb.connect("./blogs").open_table("chunks")
+ search_results = (
+ chunks_table.search(user_query).select(["text", "url", "position"]).limit(3).to_list()
+ )
+ return search_results
+
+
+def system_prompt(relevant_chunks: list[dict]) -> str:
+ relevant_content = "\n".join([c["text"] for c in relevant_chunks])
+ return (
+ "Answer the user's questions based on the provided blog post content. "
+ "Answer in a concise and helpful manner, and tell the user "
+ "if you don't know the answer or you're unsure.\n\n"
+ f"BLOG CONTENT:\n{relevant_content}"
+ )
+
+
+def llm_answer(system_prompt: str, user_query: str) -> str:
+ client = openai.OpenAI()
+ response = client.chat.completions.create(
+ model="gpt-4o-mini",
+ messages=[
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_query},
+ ],
+ )
+ return response.choices[0].message.content
diff --git a/examples/hamilton-integration/actions/ingest_blog.py b/examples/hamilton-integration/actions/ingest_blog.py
new file mode 100644
index 00000000..eb9a0c8b
--- /dev/null
+++ b/examples/hamilton-integration/actions/ingest_blog.py
@@ -0,0 +1,54 @@
+import re
+
+import lancedb
+import requests
+from bs4 import BeautifulSoup
+from lancedb.embeddings import get_registry
+from lancedb.pydantic import LanceModel, Vector
+
+embedding_model = get_registry().get("openai").create()
+
+
+class TextDocument(LanceModel):
+ """Simple data structure to hold a piece of text associated with a url."""
+
+ url: str
+ position: int
+ text: str = embedding_model.SourceField()
+ vector: Vector(dim=embedding_model.ndims()) = embedding_model.VectorField()
+
+
+def html_content(blog_post_url: str) -> str:
+ return requests.get(blog_post_url).text
+
+
+def parsed_text(html_content: str) -> str:
+ soup = BeautifulSoup(html_content, "html.parser")
+ return soup.get_text(separator=" ", strip=True)
+
+
+def sentences(parsed_text: str) -> list[str]:
+ return [sentence.strip() for sentence in re.split(r"[.!?]+", parsed_text) if sentence.strip()]
+
+
+def overlapping_chunks(
+ sentences: list[str], window: int = 5, stride: int = 3, min_window_size: int = 2
+) -> list[str]:
+ overlapping_chunks = []
+ n_chunks = len(sentences)
+ for start_i in range(0, n_chunks, stride):
+ if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size):
+ overlapping_chunks.append(
+ " ".join(sentences[start_i : min(start_i + window, n_chunks)])
+ )
+ return overlapping_chunks
+
+
+def embed_chunks(overlapping_chunks: list[str], blog_post_url: str) -> dict:
+ # embed and store the chunks using LanceDB
+ con = lancedb.connect("./blogs")
+ table = con.create_table("chunks", exist_ok=True, schema=TextDocument)
+ table.add(
+ [{"text": c, "url": blog_post_url, "position": i} for i, c in enumerate(overlapping_chunks)]
+ )
+ return {"n_chunks_embedded": len(overlapping_chunks)}
diff --git a/examples/hamilton-integration/application.py b/examples/hamilton-integration/application.py
new file mode 100644
index 00000000..8f697d27
--- /dev/null
+++ b/examples/hamilton-integration/application.py
@@ -0,0 +1,54 @@
+from hamilton.driver import Builder, Driver
+
+from burr.core import ApplicationBuilder, State, action
+
+
+@action(reads=[], writes=[])
+def ingest_blog(state: State, blog_post_url: str, dr: Driver) -> State:
+ """Download a blog post and parse it"""
+ dr.execute(["embed_chunks"], inputs={"blog_post_url": blog_post_url})
+ return state
+
+
+@action(reads=[], writes=["llm_answer"])
+def ask_question(state: State, user_query: str, dr: Driver) -> State:
+ """Reply to the user's query using the blog's content."""
+ results = dr.execute(["llm_answer"], inputs={"user_query": user_query})
+ return state.update(llm_answer=results["llm_answer"])
+
+
+if __name__ == "__main__":
+ # renames to avoid name conflicts with the @action functions
+ from actions import ask_question as ask_module
+ from actions import ingest_blog as ingest_module
+ from hamilton.plugins.h_opentelemetry import OpenTelemetryTracer
+ from opentelemetry.instrumentation.lancedb import LanceInstrumentor
+ from opentelemetry.instrumentation.openai import OpenAIInstrumentor
+
+ OpenAIInstrumentor().instrument()
+ LanceInstrumentor().instrument()
+
+ dr = (
+ Builder()
+ .with_modules(ingest_module, ask_module)
+ .with_adapters(OpenTelemetryTracer())
+ .build()
+ )
+
+ app = (
+ ApplicationBuilder()
+ .with_actions(ingest_blog.bind(dr=dr), ask_question.bind(dr=dr))
+ .with_transitions(("ingest_blog", "ask_question"))
+ .with_entrypoint("ingest_blog")
+ .with_tracker(project="modular-rag", use_otel_tracing=True)
+ .build()
+ )
+
+ action_name, results, state = app.run(
+ halt_after=["ask_question"],
+ inputs={
+ "blog_post_url": "https://blog.dagworks.io/p/from-blog-to-bot-build-a-rag-app",
+ "user_query": "What do you need to monitor in a RAG app?",
+ },
+ )
+ print(state["llm_answer"])
diff --git a/examples/hamilton-integration/burr_ui_app_v2.png b/examples/hamilton-integration/burr_ui_app_v2.png
new file mode 100644
index 00000000..a407a8b9
Binary files /dev/null and b/examples/hamilton-integration/burr_ui_app_v2.png differ
diff --git a/examples/hamilton-integration/burr_ui_app_v3.png b/examples/hamilton-integration/burr_ui_app_v3.png
new file mode 100644
index 00000000..ab4f0ffa
Binary files /dev/null and b/examples/hamilton-integration/burr_ui_app_v3.png differ
diff --git a/examples/hamilton-integration/hamilton_ui.png b/examples/hamilton-integration/hamilton_ui.png
new file mode 100644
index 00000000..f06713a3
Binary files /dev/null and b/examples/hamilton-integration/hamilton_ui.png differ
diff --git a/examples/hamilton-integration/notebook.ipynb b/examples/hamilton-integration/notebook.ipynb
new file mode 100644
index 00000000..f7f02164
--- /dev/null
+++ b/examples/hamilton-integration/notebook.ipynb
@@ -0,0 +1,1635 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "/home/tjean/projects/dagworks/burr/.venv/bin/python: No module named pip\n",
+ "Note: you may need to restart the kernel to use updated packages.\n"
+ ]
+ }
+ ],
+ "source": [
+ "# execute this cell to install dependencies in the current environment\n",
+ "# useful when using Google Colab notebooks\n",
+ "%pip install sf-hamilton[visualization] requests openai lancedb burr[start,opentelemetry] pydantic pyarrow opentelemetry-instrumentation-openai opentelemetry-instrumentation-lancedb"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Use the 2-layer approach for a maintainable RAG system [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dagworks-inc/burr/blob/main/examples/hamilton-integration/notebook.ipynb) [![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)](https://github.com/dagworks-inc/burr/blob/main/examples/hamilton-integration/notebook.ipynb)\n",
+ "\n",
+ "Ready-made solutions can get you started with GenAI, but building reliable product features with retrieval augmented generation (RAG) and LLM agents inevitably required custom code. This post shares the 2-layer approach to build a maintainable RAG application that will evolve with your needs. To illustrate these ideas, we will show how a typical RAG project might evolve.\n",
+ "\n",
+ "The 2-layer approach is about separating the high-level logic of your application from its implementation details. In Burr, the separation is represented by the `Application` (high-level) and the `action` (low-level). Over the lifetime of your product, you will iterate on both at different times.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# execute to load the Burr and Hamilton extensions\n",
+ "%load_ext burr.integrations.notebook\n",
+ "%load_ext hamilton.plugins.jupyter_magic"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# set your OpenAI API key\n",
+ "import os\n",
+ "os.environ[\"OPENAI_API_KEY\"] = \"...\""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## V1: A simple LLM pipeline\n",
+ "### Layer 2: Define the `Application` logic\n",
+ "\n",
+ "Maybe counter-inuitively, you should start with the high-level logic, the layer 2, to determine how your application should behave. For instance:\n",
+ "\n",
+ "1. actions: what are the things your application can do?\n",
+ "2. transitions & conditions: when should an action be performed?\n",
+ "3. inputs, results, & state: what data is required to execute an action or to decide which action to perform?\n",
+ "\n",
+ "Skipping these initial questions usually leads to refactoring and slows down development.\n",
+ "\n",
+ "In the next snippet, we build a simple application that ingests a blog post and allows to ask questions over its content. We outline our application logic without writing the body of the `@action` functions (level 2) and build it with the `ApplicationBuilder` to view the graph."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from burr.core import action, State, ApplicationBuilder\n",
+ "\n",
+ "# `reads` and `writes` specify what data is read/written via the `State`.\n",
+ "@action(reads=[], writes=[\"blog_content\"])\n",
+ "def ingest_blog(state: State, blog_post_url: str) -> State:\n",
+ " \"\"\"Download a blog post and parse it\"\"\"\n",
+ " blog_content = ...\n",
+ " return state.update(blog_content=blog_content)\n",
+ "\n",
+ "\n",
+ "@action(reads=[\"blog_content\", \"history\"], writes=[\"history\"])\n",
+ "def ask_question(state: State, user_query: str) -> State:\n",
+ " \"\"\"Reply to the user's query using the blog's content.\"\"\"\n",
+ " history = state[\"history\"]\n",
+ " blog_content = state[\"blog_content\"]\n",
+ " response = ...\n",
+ " return state.append(history=response)\n",
+ "\n",
+ "\n",
+ "# the `ApplicationBuilder` receives the actions,\n",
+ "# specifies the transitions, and sets the starting action.\n",
+ "(\n",
+ " ApplicationBuilder()\n",
+ " .with_actions(ingest_blog, ask_question)\n",
+ " .with_transitions((\"ingest_blog\", \"ask_question\"))\n",
+ " .with_entrypoint(\"ingest_blog\")\n",
+ " .build()\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "This outline sets \"contracts\" for the role and input/output of each actions. As you implement individual actions, you might decide to merge/split actions, modify state variables, etc. Then, you can revisit and update your application outline."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Layer 1: Implement individual `@action`\n",
+ "\n",
+ "At the start of our project, with might come up with this simple `Application`:\n",
+ "1. `ingest_blog` downloads an HTML page, parses it into a plain text.\n",
+ "2. `ask_question` prompts an OpenAI LLM with the full blog in the prompt and the user query.\n",
+ "\n",
+ "By using \"plain\" Python libraries instead of LLM frameworks, it will be easier to find bugs and modify our code as our application needs evolve."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import requests\n",
+ "import openai\n",
+ "from bs4 import BeautifulSoup\n",
+ "from burr.core import action, State, ApplicationBuilder\n",
+ "\n",
+ "\n",
+ "@action(reads=[], writes=[\"blog_content\"])\n",
+ "def ingest_blog_v1(state: State, blog_post_url: str) -> State:\n",
+ " \"\"\"Download a blog post and parse it\"\"\"\n",
+ " html_content = requests.get(blog_post_url).text\n",
+ " soup = BeautifulSoup(html_content, \"html.parser\")\n",
+ " blog_content = soup.get_text(separator=\" \", strip=True)\n",
+ " return state.update(blog_content=blog_content)\n",
+ "\n",
+ "\n",
+ "@action(reads=[\"blog_content\"], writes=[\"llm_answer\"])\n",
+ "def ask_question_v1(state: State, user_query: str) -> State:\n",
+ " \"\"\"Reply to the user's query using the blog's content.\"\"\"\n",
+ " blog_content = state[\"blog_content\"]\n",
+ "\n",
+ " system_prompt = (\n",
+ " \"Answer the user's questions based on the provided blog post content. \"\n",
+ " \"Answer in a concise and helpful manner, and tell the user \"\n",
+ " \"if you don't know the answer or you're unsure.\\n\\n\"\n",
+ " f\"BLOG CONTENT:\\n{blog_content}\"\n",
+ " )\n",
+ "\n",
+ " client = openai.OpenAI()\n",
+ " response = client.chat.completions.create(\n",
+ " model=\"gpt-4o-mini\",\n",
+ " messages=[\n",
+ " {\"role\": \"system\", \"content\": system_prompt},\n",
+ " {\"role\": \"user\", \"content\": user_query}\n",
+ " ],\n",
+ " )\n",
+ " llm_answer = response.choices[0].message.content\n",
+ " return state.update(llm_answer=llm_answer)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 7,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "app_v1 = (\n",
+ " ApplicationBuilder()\n",
+ " .with_actions(ingest_blog=ingest_blog_v1, ask_question=ask_question_v1)\n",
+ " .with_transitions((\"ingest_blog\", \"ask_question\"))\n",
+ " .with_entrypoint(\"ingest_blog\")\n",
+ " .build()\n",
+ ")\n",
+ "app_v1"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "In a RAG application, you need to monitor:\n",
+ "\n",
+ "1. **User Behavior Patterns** - Analyze frequent topics and difficult queries.\n",
+ "2. **Application Performance** - Track aspects like guardrails, LLM randomness, and user ratings.\n",
+ "3. **System Performance** - Measure latency, throughput, resource usage, and API calls.\n",
+ "4. **System Errors** - Identify bugs in the code or interactions between code and data.\n",
+ "\n",
+ "Using tools like OpenTelemetry can help gather and export this telemetry data effectively.\n"
+ ]
+ }
+ ],
+ "source": [
+ "action_name, results, state = app_v1.run(\n",
+ " halt_after=[\"ask_question\"],\n",
+ " inputs={\n",
+ " \"blog_post_url\": \"https://blog.dagworks.io/p/from-blog-to-bot-build-a-rag-app\",\n",
+ " \"user_query\": \"What do you need to monitor in a RAG app?\"\n",
+ " }\n",
+ ")\n",
+ "print(state[\"llm_answer\"])"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## V2: Let's use RAG\n",
+ "\n",
+ "While `V1` simply ingested the blog and prompted the LLM, `V2` will add RAG capabilities.\n",
+ "\n",
+ "We have to ask ourselves: are we making changes to layer 1 or 2? For now, we'll focus on modifying the actions (layer 1) rather than the application logic (layer 2)."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Layer 1: Adding RAG\n",
+ "In the next snippet, you'll notice:\n",
+ "- we define a `TextDocument` model to create the schema of our LanceDB table. It specifies to embed the `text` field using the OpenAI model.\n",
+ "- because text chunks are stored on disk, we don't need to pass them via the `Application` state."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/home/tjean/projects/dagworks/burr/.venv/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n",
+ " from .autonotebook import tqdm as notebook_tqdm\n"
+ ]
+ }
+ ],
+ "source": [
+ "import re\n",
+ "import requests\n",
+ "import openai\n",
+ "import lancedb\n",
+ "from bs4 import BeautifulSoup\n",
+ "from burr.core import action, State, ApplicationBuilder\n",
+ "from lancedb.pydantic import LanceModel, Vector\n",
+ "from lancedb.embeddings import get_registry\n",
+ "\n",
+ "\n",
+ "embedding_model = get_registry().get(\"openai\").create()\n",
+ "\n",
+ "class TextDocument(LanceModel):\n",
+ " \"\"\"Simple data structure to hold a piece of text associated with a url.\"\"\"\n",
+ " url: str\n",
+ " position: int\n",
+ " text: str = embedding_model.SourceField()\n",
+ " vector: Vector(dim=embedding_model.ndims()) = embedding_model.VectorField()\n",
+ "\n",
+ "\n",
+ "@action(reads=[], writes=[])\n",
+ "def ingest_blog_v2(state: State, blog_post_url: str) -> State:\n",
+ " \"\"\"Download a blog post and parse it\"\"\"\n",
+ " # get the blog post as plain text\n",
+ " html_content = requests.get(blog_post_url).text\n",
+ " soup = BeautifulSoup(html_content, \"html.parser\")\n",
+ " blog_content = soup.get_text(separator=\" \", strip=True)\n",
+ " # split the text by sentence\n",
+ " SENTENCE_ENDINGS = r\"[.!?]+\"\n",
+ "\n",
+ " chunks = []\n",
+ " for sentence in re.split(SENTENCE_ENDINGS, blog_content):\n",
+ " sentence = sentence.strip()\n",
+ " if sentence:\n",
+ " chunks.append(sentence)\n",
+ "\n",
+ " # join sentences to create larger overlapping chunks\n",
+ " window = 5\n",
+ " stride = 3\n",
+ " min_window_size = 2\n",
+ "\n",
+ " overlapping_chunks = []\n",
+ " n_chunks = len(chunks)\n",
+ " for start_i in range(0, n_chunks, stride):\n",
+ " if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size):\n",
+ " overlapping_chunks.append(\" \".join(chunks[start_i : min(start_i + window, n_chunks)]))\n",
+ "\n",
+ " # embed and store the chunks using LanceDB\n",
+ " con = lancedb.connect(\"./blogs\")\n",
+ " table = con.create_table(\"chunks\", exist_ok=True, schema=TextDocument)\n",
+ " table.add([\n",
+ " {\"text\": c, \"url\": blog_post_url, \"position\": i}\n",
+ " for i, c in enumerate(overlapping_chunks)\n",
+ " ])\n",
+ " \n",
+ " return state\n",
+ "\n",
+ "\n",
+ "@action(reads=[], writes=[\"llm_answer\"])\n",
+ "def ask_question_v2(state: State, user_query: str) -> State:\n",
+ " \"\"\"Reply to the user's query using the blog's content.\"\"\"\n",
+ " # retrieve the most relevant chunks\n",
+ " chunks_table = lancedb.connect(\"./blogs\").open_table(\"chunks\")\n",
+ " search_results = (\n",
+ " chunks_table\n",
+ " .search(user_query)\n",
+ " .select([\"text\", \"url\", \"position\"])\n",
+ " .limit(3)\n",
+ " .to_list()\n",
+ " )\n",
+ " relevant_content = \"\\n\".join([r[\"text\"] for r in search_results])\n",
+ "\n",
+ " # prompt the LLM with the relevant content\n",
+ " system_prompt = (\n",
+ " \"Answer the user's questions based on the provided blog post content. \"\n",
+ " f\"BLOG CONTENT:\\n{relevant_content}\"\n",
+ " )\n",
+ "\n",
+ " client = openai.OpenAI()\n",
+ " response = client.chat.completions.create(\n",
+ " model=\"gpt-4o-mini\",\n",
+ " messages=[\n",
+ " {\"role\": \"system\", \"content\": system_prompt},\n",
+ " {\"role\": \"user\", \"content\": user_query}\n",
+ " ],\n",
+ " )\n",
+ " llm_answer = response.choices[0].message.content\n",
+ " return state.update(llm_answer=llm_answer)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Layer 2: Tracking and observing your application\n",
+ "\n",
+ "A keep requirement to improve your application is being able to observe and track its behavior. Burr makes it simple to add a tracker to your application, which is compatible with OpenTelemetry.\n",
+ "\n",
+ "> NOTE. Some lines are commented out because OpenTelemetry doesn't work properly in notebooks. See `application.py` for an example"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 10,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "# \"instrumenting\" patches the libraries to log OpenTelemetry data\n",
+ "# from opentelemetry.instrumentation.lancedb import LanceInstrumentor\n",
+ "# from opentelemetry.instrumentation.openai import OpenAIInstrumentor\n",
+ "\n",
+ "# OpenAIInstrumentor().instrument()\n",
+ "# LanceInstrumentor().instrument()\n",
+ "\n",
+ "app_v2 = (\n",
+ " ApplicationBuilder()\n",
+ " .with_actions(ingest_blog=ingest_blog_v2, ask_question=ask_question_v2)\n",
+ " .with_transitions((\"ingest_blog\", \"ask_question\"))\n",
+ " .with_entrypoint(\"ingest_blog\")\n",
+ " # enable the tracker and enable OpenTelemetry via the argument\n",
+ " .with_tracker(project=\"modular-rag\", use_otel_tracing=True)\n",
+ " .build()\n",
+ ")\n",
+ "app_v2"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "In a RAG application, you need to monitor the following aspects:\n",
+ "\n",
+ "1. **User Behavior Patterns**: This includes frequent topics and difficult queries that users engage with.\n",
+ "2. **Application Performance**: You should track metrics such as guardrails, LLM randomness, and user ratings.\n",
+ "3. **System Performance**: Monitor system metrics like latency, throughput, resource usage, and API calls.\n",
+ "4. **System Errors**: Keep an eye on bugs that may arise from code or the interaction between code and data.\n"
+ ]
+ }
+ ],
+ "source": [
+ "action_name, results, state = app_v2.run(\n",
+ " halt_after=[\"ask_question\"],\n",
+ " inputs={\n",
+ " \"blog_post_url\": \"https://blog.dagworks.io/p/from-blog-to-bot-build-a-rag-app\",\n",
+ " \"user_query\": \"What do you need to monitor in a RAG app?\"\n",
+ " }\n",
+ ")\n",
+ "print(state[\"llm_answer\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Burr UI: http://127.0.0.1:7241\n"
+ ]
+ }
+ ],
+ "source": [
+ "# execute cell to launch the UI\n",
+ "%burr_ui"
+ ]
+ },
+ {
+ "attachments": {
+ "image.png": {
+ "image/png": ""
+ }
+ },
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "By navigating to the `modular-rag` project, you can see logged runs. With OpenTelemetry support, we can inspect the LanceDB operations and the OpenAI calls. For instance, this allows us to see which were the relevant chunks retrieved and passed to the LLM.\n",
+ "\n",
+ "![image.png](burr_ui_app_v2.png)\n",
+ "\n",
+ "The Burr UI has other useful features:\n",
+ "- view token usage\n",
+ "- annotate state values and logged attributes\n",
+ "- create test fixtures from application state\n",
+ "- and more "
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## V3: Keeping your code modular\n",
+ "\n",
+ "You probably noticed that actions functions in `V2` started to be long and do several things. It would be a good time to refactor them into smaller functions for easier development and maintenance. Again, those would be improvement at the layer 1."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Layer 1: Modular actions\n",
+ "\n",
+ "In th next snippets, we refactor actions using Hamilton, a lightweight library to structure data transformations as directed acyclic graphs (DAGs). Hamilton uses the function and parameter names to infer the dependencies between functions and the graph structure.\n",
+ "\n",
+ "The next cell reimplements the `ingest_blog` action from `V2`. It uses `%%cell_to_module` from the Hamilton notebook extension to define a DAG in a single cell and view it ([see tutorial](https://github.com/DAGWorks-Inc/hamilton/blob/main/examples/jupyter_notebook_magic/example.ipynb))."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module ingest_blog --display\n",
+ "import re\n",
+ "import requests\n",
+ "import lancedb\n",
+ "from bs4 import BeautifulSoup\n",
+ "from lancedb.pydantic import LanceModel, Vector\n",
+ "from lancedb.embeddings import get_registry\n",
+ "\n",
+ "\n",
+ "embedding_model = get_registry().get(\"openai\").create()\n",
+ "\n",
+ "class TextDocument(LanceModel):\n",
+ " \"\"\"Simple data structure to hold a piece of text associated with a url.\"\"\"\n",
+ " url: str\n",
+ " position: int\n",
+ " text: str = embedding_model.SourceField()\n",
+ " vector: Vector(dim=embedding_model.ndims()) = embedding_model.VectorField()\n",
+ "\n",
+ "\n",
+ "def html_content(blog_post_url: str) -> str:\n",
+ " return requests.get(blog_post_url).text\n",
+ "\n",
+ "\n",
+ "def parsed_text(html_content: str) -> str:\n",
+ " soup = BeautifulSoup(html_content, \"html.parser\")\n",
+ " return soup.get_text(separator=\" \", strip=True)\n",
+ " \n",
+ "\n",
+ "def sentences(parsed_text: str) -> list[str]:\n",
+ " return [\n",
+ " sentence.strip()\n",
+ " for sentence in re.split(r\"[.!?]+\", parsed_text)\n",
+ " if sentence.strip()\n",
+ " ]\n",
+ "\n",
+ "\n",
+ "def overlapping_chunks(\n",
+ " sentences: list[str],\n",
+ " window: int = 5,\n",
+ " stride: int = 3,\n",
+ " min_window_size: int = 2\n",
+ ") -> list[str]:\n",
+ " overlapping_chunks = []\n",
+ " n_chunks = len(sentences)\n",
+ " for start_i in range(0, n_chunks, stride):\n",
+ " if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size):\n",
+ " overlapping_chunks.append(\" \".join(sentences[start_i : min(start_i + window, n_chunks)]))\n",
+ " return overlapping_chunks\n",
+ "\n",
+ "\n",
+ "def embed_chunks(overlapping_chunks: list[str], blog_post_url: str) -> dict:\n",
+ " # embed and store the chunks using LanceDB\n",
+ " con = lancedb.connect(\"./blogs\")\n",
+ " table = con.create_table(\"chunks\", exist_ok=True, schema=TextDocument)\n",
+ " table.add([\n",
+ " {\"text\": c, \"url\": blog_post_url, \"position\": i}\n",
+ " for i, c in enumerate(overlapping_chunks)\n",
+ " ])\n",
+ " return {\"n_chunks_embedded\": len(overlapping_chunks)}"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "While this may seem trivial, these simple functions are easier to test, debug, and modify. It also allows us to develop and test actions outside of Burr. Hamilton has a great caching feature that can speed up development significantly when iterating over your application.\n",
+ "\n",
+ "Here, we create a `Driver` from the `ingest_blog` module we defined and enable caching."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from hamilton.driver import Builder\n",
+ "\n",
+ "ingest_dr = Builder().with_modules(ingest_blog).with_cache().build()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Then, we request the `sentences` node to inspect results. This way, the `embed_chunks` node won't be executed and we won't spend time/credits on computing embeddings during development"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "We evaluated and completed proof-of-concepts with the top five commercial and open-source providers in the market\n"
+ ]
+ }
+ ],
+ "source": [
+ "results = ingest_dr.execute(\n",
+ " [\"sentences\"],\n",
+ " inputs={\"blog_post_url\": \"https://blog.dagworks.io/p/building-a-better-feature-platform\"},\n",
+ ")\n",
+ "print(results[\"sentences\"][17])"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Next, we implement the `ask_question` action with Hamilton.\n",
+ "\n",
+ "We encourage considering prompt as code, which allows us to commit and version our prompt with the rest of our code. For instance, the function `system_prompt()` helps understand what information goes into the prompt and an additional docstring can add context to it ([related blog](https://blog.dagworks.io/p/llmops-production-prompt-engineering))."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "%%cell_to_module ask_question --display\n",
+ "import openai\n",
+ "import lancedb\n",
+ "\n",
+ "def relevant_chunks(user_query: str) -> list[dict]:\n",
+ " chunks_table = lancedb.connect(\"./blogs\").open_table(\"chunks\")\n",
+ " search_results = (\n",
+ " chunks_table\n",
+ " .search(user_query)\n",
+ " .select([\"text\", \"url\", \"position\"])\n",
+ " .limit(3)\n",
+ " .to_list()\n",
+ " )\n",
+ " return search_results\n",
+ "\n",
+ "\n",
+ "def system_prompt(relevant_chunks: list[dict]) -> str:\n",
+ " relevant_content = \"\\n\".join([c[\"text\"] for c in relevant_chunks])\n",
+ " return (\n",
+ " \"Answer the user's questions based on the provided blog post content. \"\n",
+ " \"Answer in a concise and helpful manner, and tell the user \"\n",
+ " \"if you don't know the answer or you're unsure.\\n\\n\"\n",
+ " f\"BLOG CONTENT:\\n{relevant_content}\"\n",
+ " )\n",
+ "\n",
+ "def llm_answer(system_prompt: str, user_query: str) -> str:\n",
+ " client = openai.OpenAI()\n",
+ " response = client.chat.completions.create(\n",
+ " model=\"gpt-4o-mini\",\n",
+ " messages=[\n",
+ " {\"role\": \"system\", \"content\": system_prompt},\n",
+ " {\"role\": \"user\", \"content\": user_query}\n",
+ " ],\n",
+ " )\n",
+ " return response.choices[0].message.content"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Layer 2: Using Hamilton with Burr\n",
+ "\n",
+ "Since Burr is not opinionated about the implementation of `@action` functions, you can simply call Hamilton from it. We add the `OpenTelemetryTracer` to the Hamilton `Driver` to get tracing in Burr UI.\n",
+ "\n",
+ "You'll see that the Layer 2 becomes much lighter and only the high-level logic remains."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from hamilton.driver import Builder\n",
+ "from hamilton.plugins.h_opentelemetry import OpenTelemetryTracer\n",
+ "from burr.core import action, State, ApplicationBuilder\n",
+ "\n",
+ "# these imports will only work if you executed the cells that\n",
+ "# defines them using the `%%cell_to_module` magic \n",
+ "import ingest_blog\n",
+ "import ask_question\n",
+ "\n",
+ "\n",
+ "@action(reads=[], writes=[])\n",
+ "def ingest_blog_v3(state: State, blog_post_url: str) -> State:\n",
+ " \"\"\"Download a blog post and parse it\"\"\"\n",
+ " dr = (\n",
+ " Builder()\n",
+ " .with_modules(ingest_blog)\n",
+ " .with_adapters(OpenTelemetryTracer())\n",
+ " .build()\n",
+ " )\n",
+ " dr.execute([\"embed_chunks\"], inputs={\"blog_post_url\": blog_post_url}) \n",
+ " return state\n",
+ "\n",
+ "\n",
+ "@action(reads=[], writes=[\"llm_answer\"])\n",
+ "def ask_question_v3(state: State, user_query: str) -> State:\n",
+ " \"\"\"Reply to the user's query using the blog's content.\"\"\"\n",
+ " dr = (\n",
+ " Builder()\n",
+ " .with_modules(ask_question)\n",
+ " .with_adapters(OpenTelemetryTracer())\n",
+ " .build()\n",
+ " )\n",
+ " results = dr.execute([\"llm_answer\"], inputs={\"user_query\": user_query}) \n",
+ " return state.update(llm_answer=results[\"llm_answer\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 18,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "app_v3 = (\n",
+ " ApplicationBuilder()\n",
+ " .with_actions(ingest_blog=ingest_blog_v3, ask_question=ask_question_v3)\n",
+ " .with_transitions((\"ingest_blog\", \"ask_question\"))\n",
+ " .with_entrypoint(\"ingest_blog\")\n",
+ " .with_tracker(project=\"modular-rag\", use_otel_tracing=True)\n",
+ " .build()\n",
+ ")\n",
+ "app_v3"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "This is trying to return without having computed a single action -- we'll end up just returning some Nones. This means that nothing was executed (E.G. that the state machine had nowhere to go). Either fix the state machine orthe halt conditions, or both... Halt conditions are: halt_before=[], halt_after=['ask_question'].Note that this is considered undefined behavior -- if you get here, you should fix!\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "In a RAG application, you need to monitor the following aspects:\n",
+ "\n",
+ "1. **User Behavior Patterns**: This includes frequent topics and difficult queries that users engage with.\n",
+ "2. **Application Performance**: You should track metrics such as guardrails, LLM randomness, and user ratings.\n",
+ "3. **System Performance**: Monitor system metrics like latency, throughput, resource usage, and API calls.\n",
+ "4. **System Errors**: Keep an eye on bugs that may arise from code or the interaction between code and data.\n"
+ ]
+ }
+ ],
+ "source": [
+ "action_name, results, state = app_v2.run(\n",
+ " halt_after=[\"ask_question\"],\n",
+ " inputs={\n",
+ " \"blog_post_url\": \"https://blog.dagworks.io/p/from-blog-to-bot-build-a-rag-app\",\n",
+ " \"user_query\": \"What do you need to monitor in a RAG app?\"\n",
+ " }\n",
+ ")\n",
+ "print(state[\"llm_answer\"])"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The biggest benefit of Burr + Hamilton is the unbeatable observability you get. The Burr UI will show a granular breakdown of the operations.\n",
+ "\n",
+ "![image.png](burr_ui_app_v3.png)\n",
+ "\n",
+ "Hamilton even has its dedicated [Hamilton UI](https://hamilton.dagworks.io/en/latest/hamilton-ui/ui/) that tracks execution, catalogs data transformations, and provides in-depth introspection. We're looking to integrate the two further together!\n",
+ "\n",
+ "![image.png](hamilton_ui.png)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Actually, two tricks can help simplify the code:\n",
+ "- Build a single `Driver` from both action modules\n",
+ "- Pass the `Driver` as an `@action` input using `.bind()` instead of building the `Driver` each time"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 20,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dr = Builder().with_modules(ingest_blog, ask_question).with_adapters(OpenTelemetryTracer()).build()\n",
+ "dr"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 21,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from hamilton.driver import Driver, Builder\n",
+ "from burr.core import action, State, ApplicationBuilder\n",
+ "\n",
+ "\n",
+ "@action(reads=[], writes=[])\n",
+ "def ingest_blog_v3_2(state: State, blog_post_url: str, dr: Driver) -> State:\n",
+ " \"\"\"Download a blog post and parse it\"\"\"\n",
+ " dr.execute([\"embed_chunks\"], inputs={\"blog_post_url\": blog_post_url}) \n",
+ " return state\n",
+ "\n",
+ "\n",
+ "@action(reads=[], writes=[\"llm_answer\"])\n",
+ "def ask_question_v3_2(state: State, user_query: str, dr: Driver) -> State:\n",
+ " \"\"\"Reply to the user's query using the blog's content.\"\"\"\n",
+ " results = dr.execute([\"llm_answer\"], inputs={\"user_query\": user_query}) \n",
+ " return state.update(llm_answer=results[\"llm_answer\"])\n",
+ "\n",
+ "\n",
+ "app_v3_2 = (\n",
+ " ApplicationBuilder()\n",
+ " .with_actions(\n",
+ " ingest_blog=ingest_blog_v3_2.bind(dr=dr),\n",
+ " ask_question=ask_question_v3_2.bind(dr=dr)\n",
+ " )\n",
+ " .with_transitions((\"ingest_blog\", \"ask_question\"))\n",
+ " .with_entrypoint(\"ingest_blog\")\n",
+ " .build()\n",
+ ")\n",
+ "app_v3_2"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Conclusion\n",
+ "\n",
+ "We presented the 2-layer approach to building a RAG application, which separates the high-level logic from the implementation of individual actions.\n",
+ "\n",
+ "The key lesson is that you should adopt frameworks incrementally. A tool shouldn't lock you in and limit the evolution of your application. Adopting Burr from the start helps you develop in a principled way, and adding Hamilton as the complexity increases helps create a maintainable application. "
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": ".venv",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.9"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
diff --git a/examples/hamilton-integration/requirements.txt b/examples/hamilton-integration/requirements.txt
new file mode 100644
index 00000000..ae56b1db
--- /dev/null
+++ b/examples/hamilton-integration/requirements.txt
@@ -0,0 +1,9 @@
+burr[start,opentelemetry]
+lancedb
+openai
+opentelemetry-instrumentation-lancedb
+opentelemetry-instrumentation-openai
+pyarrow
+pydantic
+requests
+sf-hamilton[visualization]
diff --git a/examples/hamilton-integration/statemachine.png b/examples/hamilton-integration/statemachine.png
new file mode 100644
index 00000000..6fafb597
Binary files /dev/null and b/examples/hamilton-integration/statemachine.png differ