diff --git a/examples/rag-lancedb-ingestion/README.md b/examples/rag-lancedb-ingestion/README.md new file mode 100644 index 00000000..65098ef2 --- /dev/null +++ b/examples/rag-lancedb-ingestion/README.md @@ -0,0 +1,22 @@ +# Burr RAG with LanceDB and dlt document ingestion + +This example shows how to build a chatbot with RAG over Substack blogs (or any RSS feed) stored into LanceDB. + +![burr ui](burr-ui.gif) + +> Burr UI brings a new level of observability to your RAG application via OpenTelemetry + +Burr + [LanceDB](https://lancedb.github.io/lancedb/) constitute a powerful, but lightweight combo to build retrieval-augmented generative (RAG) agents. Burr allows to define complex agents in an easy-to-understand and debug manner. It also provides all the right features to help you productionize agents including: monitoring, storing interactions, streaming, and a fully-featured open-source UI. + +LanceDB makes it easy to swap embedding providers, and hides this concern from the Burr application layer. For this example, we'll be using [OpenAI](https://github.com/openai/openai-python) for embedding and response generation. + +By leveraging the [Burr integration with OpenTelemetry](https://blog.dagworks.io/p/building-generative-ai-agent-based), we get full visibility into the OpenAI API requests/responses and the LanceDB operations for free. + +To ingest data, we use [dlt and its LanceDB integration](https://dlthub.com/devel/dlt-ecosystem/destinations/lancedb), which makes it very simple to query, embed, and store blogs from the web into LanceDB tables. + +## Content + +- `notebook.ipynb` contains a tutorial +- `application.py` has the `burr` code for the chatbot +- `ingestion.py` has the `dlt` code for document ingestion +- `utils.py` contains functions utility functions to setup `OpenTelemetry` instrumentation and environment variables diff --git a/examples/rag-lancedb-ingestion/__init__.py b/examples/rag-lancedb-ingestion/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/rag-lancedb-ingestion/application.py b/examples/rag-lancedb-ingestion/application.py new file mode 100644 index 00000000..2b693000 --- /dev/null +++ b/examples/rag-lancedb-ingestion/application.py @@ -0,0 +1,105 @@ +import os +import textwrap + +import lancedb +import openai + +from burr.core import Application, ApplicationBuilder, State, action +from burr.lifecycle import PostRunStepHook + + +@action(reads=[], writes=["relevant_chunks", "chat_history"]) +def relevant_chunk_retrieval( + state: State, + user_query: str, + lancedb_con: lancedb.DBConnection, +) -> State: + """Search LanceDB with the user query and return the top 4 results""" + text_chunks_table = lancedb_con.open_table("dagworks___contexts") + search_results = ( + text_chunks_table.search(user_query).select(["text", "id__"]).limit(4).to_list() + ) + + return state.update(relevant_chunks=search_results).append(chat_history=user_query) + + +@action(reads=["chat_history", "relevant_chunks"], writes=["chat_history"]) +def bot_turn(state: State, llm_client: openai.OpenAI) -> State: + """Collect relevant chunks and produce a response to the user query""" + user_query = state["chat_history"][-1] + relevant_chunks = state["relevant_chunks"] + + system_prompt = textwrap.dedent( + """You are a conversational agent designed to discuss and provide \ + insights about various blog posts. Your task is to engage users in \ + meaningful conversations based on the content of the blog articles they mention. + """ + ) + joined_chunks = " ".join([c["text"] for c in relevant_chunks]) + user_prompt = "BLOGS CONTENT\n" + joined_chunks + "\nUSER QUERY\n" + user_query + + response = llm_client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + ) + bot_answer = response.choices[0].message.content + + return state.append(chat_history=bot_answer) + + +class PrintBotAnswer(PostRunStepHook): + """Hook to print the bot's answer""" + + def post_run_step(self, *, state, action, **future_kwargs): + if action.name == "bot_turn": + print("\nšŸ¤–: ", state["chat_history"][-1]) + + +def build_application() -> Application: + """Create the Burr `Application`. This is responsible for instantiating the + OpenAI client and the LanceDB connection + """ + llm_client = openai.OpenAI() + lancedb_con = lancedb.connect(os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"]) + + return ( + ApplicationBuilder() + .with_actions( + relevant_chunk_retrieval.bind(lancedb_con=lancedb_con), + bot_turn.bind(llm_client=llm_client), + ) + .with_transitions( + ("relevant_chunk_retrieval", "bot_turn"), + ("bot_turn", "relevant_chunk_retrieval"), + ) + .with_entrypoint("relevant_chunk_retrieval") + .with_tracker("local", project="substack-rag", use_otel_tracing=True) + .with_hooks(PrintBotAnswer()) + .build() + ) + + +if __name__ == "__main__": + import utils + + utils.set_environment_variables() # set environment variables for LanceDB + utils.instrument() # register the OpenTelemetry instrumentation + + # build the Burr `Application` + app = build_application() + app.visualize("statemachine.png") + + # Launch the Burr application in a `while` loop + print("\n## Lauching RAG application ##") + while True: + user_query = input("\nAsk something or type `quit/q` to exit: ") + if user_query.lower() in ["quit", "q"]: + break + + _, _, _ = app.run( + halt_after=["bot_turn"], + inputs={"user_query": user_query}, + ) diff --git a/examples/rag-lancedb-ingestion/burr-ui.gif b/examples/rag-lancedb-ingestion/burr-ui.gif new file mode 100644 index 00000000..206df702 Binary files /dev/null and b/examples/rag-lancedb-ingestion/burr-ui.gif differ diff --git a/examples/rag-lancedb-ingestion/ingestion.py b/examples/rag-lancedb-ingestion/ingestion.py new file mode 100644 index 00000000..ae71abb7 --- /dev/null +++ b/examples/rag-lancedb-ingestion/ingestion.py @@ -0,0 +1,112 @@ +import re +from typing import Generator + +import dlt +import feedparser +import requests +import utils +from bs4 import BeautifulSoup + + +def split_text(text): + """Split text on punction (., !, ?).""" + sentence_endings = r"[.!?]+" + for sentence in re.split(sentence_endings, text): + sentence = sentence.strip() + if sentence: + yield sentence + + +def contextualize(chunks: list[str], window=5, stride=3, min_window_size=2): + """Rolling window operation to join consecutive sentences into larger chunks.""" + n_chunks = len(chunks) + for start_i in range(0, n_chunks, stride): + if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size): + yield " ".join(chunks[start_i : min(start_i + window, n_chunks)]) + + +@dlt.resource(name="substack", write_disposition="merge", primary_key="id") +def rss_entries(substack_url: str) -> Generator: + """Substack blog entries retrieved from a RSS feed""" + FIELDS_TO_EXCLUDE = [ + "published_parsed", + "title_detail", + "summary_detail", + "author_detail", + "guidislink", + "authors", + "links", + ] + + r = requests.get(f"{substack_url}/feed") + rss_feed = feedparser.parse(r.content) + for entry in rss_feed["entries"]: + for field in FIELDS_TO_EXCLUDE: + entry.pop(field) + + yield entry + + +@dlt.transformer(primary_key="id") +def parsed_html(rss_entry: dict): + """Parse the HTML from the RSS entry""" + soup = BeautifulSoup(rss_entry["content"][0]["value"], "html.parser") + parsed_text = soup.get_text(separator=" ", strip=True) + yield {"id": rss_entry["id"], "text": parsed_text} + + +@dlt.transformer(primary_key="chunk_id") +def chunks(parsed_html: dict) -> list[dict]: + """Chunk text""" + return [ + dict( + document_id=parsed_html["id"], + chunk_id=idx, + text=text, + ) + for idx, text in enumerate(split_text(parsed_html["text"])) + ] + + +# order is important for reduce / rolling step +# default to order of the batch or specifying sorting key +@dlt.transformer(primary_key="context_id") +def contexts(chunks: list[dict]) -> Generator: + """Assemble consecutive chunks into larger context windows""" + # first handle the m-to-n relationship + # set of foreign keys (i.e., "chunk_id") + chunk_id_set = set(chunk["chunk_id"] for chunk in chunks) + context_id = utils.hash_set(chunk_id_set) + + # create a table only containing the keys + for chunk_id in chunk_id_set: + yield dlt.mark.with_table_name( + {"chunk_id": chunk_id, "context_id": context_id}, + "chunks_to_contexts_keys", + ) + + # main transformation logic + for contextualized in contextualize([chunk["text"] for chunk in chunks]): + yield dlt.mark.with_table_name( + {"context_id": context_id, "text": contextualized}, "contexts" + ) + + +if __name__ == "__main__": + import dlt + from dlt.destinations.adapters import lancedb_adapter + + utils.set_environment_variables() + + pipeline = dlt.pipeline( + pipeline_name="substack-blog", destination="lancedb", dataset_name="dagworks" + ) + + blog_url = "https://blog.dagworks.io/" + + full_entries = lancedb_adapter(rss_entries(blog_url), embed="summary") + chunked_entries = rss_entries(blog_url) | parsed_html | chunks + contextualized_chunks = lancedb_adapter(chunked_entries | contexts, embed="text") + + load_info = pipeline.run([full_entries, chunked_entries, contextualized_chunks]) + print(load_info) diff --git a/examples/rag-lancedb-ingestion/notebook.ipynb b/examples/rag-lancedb-ingestion/notebook.ipynb new file mode 100644 index 00000000..6a070772 --- /dev/null +++ b/examples/rag-lancedb-ingestion/notebook.ipynb @@ -0,0 +1,703 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Burr RAG with LanceDB and dlt document ingestion\n", + "\n", + "This example shows how to build a chatbot with RAG over Substack blogs (or any RSS feed) stored into LanceDB. \n", + "\n", + "The stack includes:\n", + "\n", + "- Burr: define your RAG logic \n", + "- LanceDB: store and retrieve documents using vector search\n", + "- dlt: ingest unstructured web pages and store them as structured documents\n", + "- OpenAI: embed documents for vector search and generate answers using LLMs\n", + "- OpenTelemetry: automatically track telemetry from Burr, LanceDB, and OpenAI in a unified way" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. `Substack -> LanceDB` ingestion with `dlt`\n", + "\n", + "To ingest data, we use [dlt and its LanceDB integration](https://dlthub.com/devel/dlt-ecosystem/destinations/lancedb), which makes it very simple to query, embed, and store blogs from the web into LanceDB tables. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.1 Text processing\n", + "\n", + "First, we define simple functions to split long text strings into sentences and a way to assemble sentences into larger context windows." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import re\n", + "\n", + "def split_text(text):\n", + " \"\"\"Split text on punction (., !, ?).\"\"\"\n", + " sentence_endings = r'[.!?]+'\n", + " for sentence in re.split(sentence_endings, text):\n", + " sentence = sentence.strip()\n", + " if sentence:\n", + " yield sentence\n", + "\n", + "\n", + "def contextualize(chunks: list[str], window=5, stride=3, min_window_size=2):\n", + " \"\"\"Rolling window operation to join consecutive sentences into larger chunks.\"\"\"\n", + " n_chunks = len(chunks)\n", + " for start_i in range(0, n_chunks, stride):\n", + " if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size):\n", + " yield \" \".join(chunks[start_i : min(start_i + window, n_chunks)])\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.2 Define `dlt` resources\n", + "\n", + "To use `dlt`, you author `Resource` objects that generate data using the `@dlt.resource` decorator. In this case, we create a resource that pulls an RSS feed from a Substack blog URL using the `requests` and `feedparser` libraries. Then, we iterate over RSS entries and yield them as dictionaries." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Generator\n", + "\n", + "import requests\n", + "import feedparser\n", + "import dlt\n", + "\n", + "@dlt.resource(name=\"substack\", write_disposition=\"merge\", primary_key=\"id\")\n", + "def rss_entries(substack_url: str) -> Generator:\n", + " \"\"\"Substack blog entries retrieved from a RSS feed\"\"\"\n", + " FIELDS_TO_EXCLUDE = [\n", + " \"published_parsed\",\n", + " \"title_detail\",\n", + " \"summary_detail\",\n", + " \"author_detail\",\n", + " \"guidislink\",\n", + " \"authors\",\n", + " \"links\"\n", + " ]\n", + "\n", + " r = requests.get(f\"{substack_url}/feed\")\n", + " rss_feed = feedparser.parse(r.content)\n", + " for entry in rss_feed[\"entries\"]:\n", + " for field in FIELDS_TO_EXCLUDE:\n", + " entry.pop(field)\n", + "\n", + " yield entry" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can then use `@dlt.transformer` to define operations on the values returned by `Resource` objects. In this case, we define three transformations that we'll chain:\n", + "\n", + "1. Parse HTML into a string stripped of tags\n", + "2. Chunk the text string by splitting it into sentences\n", + "3. Join sentence chunks into larger \"context windows\" via a rolling operation.\n", + "\n", + "We use a custom trick to map and store the relationship between HTML pages, sentence chunks, and context windows ([learn more](https://github.com/dlt-hub/dlt/issues/1699))." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "from bs4 import BeautifulSoup\n", + "import utils\n", + "\n", + "\n", + "@dlt.transformer(primary_key=\"id\")\n", + "def parsed_html(rss_entry: dict):\n", + " \"\"\"Parse the HTML from the RSS entry\"\"\"\n", + " soup = BeautifulSoup(rss_entry[\"content\"][0][\"value\"], \"html.parser\")\n", + " parsed_text = soup.get_text(separator=\" \", strip=True)\n", + " yield {\"id\": rss_entry[\"id\"], \"text\": parsed_text}\n", + "\n", + "\n", + "@dlt.transformer(primary_key=\"chunk_id\")\n", + "def chunks(parsed_html: dict) -> list[dict]:\n", + " \"\"\"Chunk text\"\"\"\n", + " return [\n", + " dict(\n", + " document_id=parsed_html[\"id\"],\n", + " chunk_id=idx,\n", + " text=text,\n", + " )\n", + " for idx, text in enumerate(split_text(parsed_html[\"text\"]))\n", + " ]\n", + "\n", + "# order is important for reduce / rolling step\n", + "# default to order of the batch or specifying sorting key\n", + "@dlt.transformer(primary_key=\"context_id\")\n", + "def contexts(chunks: list[dict]) -> Generator:\n", + " \"\"\"Assemble consecutive chunks into larger context windows\"\"\"\n", + " # first handle the m-to-n relationship\n", + " # set of foreign keys (i.e., \"chunk_id\")\n", + " chunk_id_set = set(chunk[\"chunk_id\"] for chunk in chunks)\n", + " context_id = utils.hash_set(chunk_id_set)\n", + " \n", + " # create a table only containing the keys\n", + " for chunk_id in chunk_id_set :\n", + " yield dlt.mark.with_table_name(\n", + " {\"chunk_id\": chunk_id, \"context_id\": context_id},\n", + " \"chunks_to_contexts_keys\",\n", + " ) \n", + " \n", + " # main transformation logic\n", + " for contextualized in contextualize([chunk[\"text\"] for chunk in chunks]):\n", + " yield dlt.mark.with_table_name(\n", + " {\"context_id\": context_id, \"text\": contextualized},\n", + " \"contexts\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 1.3 Execute the pipeline\n", + "\n", + "Before ingesting data, we need to the configuration for the `dlt` destination (LanceDB in our case). We specify which OpenAI model we want to use for text embedding and store our API key.\n", + "\n", + "`dlt` provides [multiple ways to do so](https://dlthub.com/devel/general-usage/credentials), but using the `os` module is simply the most convenient for this tutorial." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "# set your OpenAI API key\n", + "openai_api_key = ...\n", + "\n", + "# this environment variable isn't needed by dlt, but we'll use it later\n", + "os.environ[\"OPENAI_API_KEY\"] = openai_api_key\n", + "\n", + "os.environ[\"DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER\"] = \"openai\"\n", + "os.environ[\"DESTINATION__LANCEDB__EMBEDDING_MODEL\"] = \"text-embedding-3-small\"\n", + "\n", + "os.environ[\"DESTINATION__LANCEDB__CREDENTIALS__URI\"] = \".lancedb\"\n", + "os.environ[\"DESTINATION__LANCEDB__CREDENTIALS__EMBEDDING_MODEL_PROVIDER_API_KEY\"] = openai_api_key" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we combine our Substack blog `Resource` with the different text processing `Transformer` using the pipe operator `|`. We also use the `lancedb_adapter` by `dlt` which allows to specify which field will be embed with the OpenAI embedding service." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from dlt.destinations.adapters import lancedb_adapter\n", + "import dlt.destinations.impl.lancedb.models\n", + "\n", + "blog_url = \"https://blog.dagworks.io/\"\n", + "\n", + "full_entries = lancedb_adapter(rss_entries(blog_url), embed=\"summary\")\n", + "chunked_entries = rss_entries(blog_url) | parsed_html | chunks\n", + "contextualized_chunks = lancedb_adapter(chunked_entries | contexts, embed=\"text\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First, we create the `Pipeline` object and minimally specify a `pipeline_name` and `destination`. This won't exectue any code or ingest any data.\n", + "\n", + "Then, calling `pipeline.run()` with the `Resource` and `Transformer` objects will launch the ingestion job and return a `LoadInfo` object detailing the results of the job." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline = dlt.pipeline(\n", + " pipeline_name=\"substack-blog\",\n", + " destination=\"lancedb\",\n", + " dataset_name=\"dagworks\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Pipeline substack-blog load step completed in 3.02 seconds\n", + "1 load package(s) were loaded to destination LanceDB and into dataset dagworks\n", + "The LanceDB destination used location to store data\n", + "Load package 1724941288.3716326 is LOADED and contains no failed jobs\n" + ] + } + ], + "source": [ + "load_info = pipeline.run([full_entries, chunked_entries, contextualized_chunks])\n", + "print(load_info)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Burr RAG with LanceDB memory\n", + "Burr allows you to define an `Application` by defining a set of actions and valid transitions between them. This approach allows to define complex agents in an easy-to-understand and debug manner. \n", + "\n", + "Burr solves many challenges to productionize agents including monitoring, storing interactions, streaming, and more, and comes with a rich open-source UI for observability." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2.1 Define `@action`\n", + "\n", + "First, we define actions the agent can take with the `@action` decorator. The function must take a `State` object as first argument and return a `State` object. The decorator specifies which `State` fields can be read from and written to.\n", + "\n", + "The next cell contains two actions:\n", + "- `relevant_chunk_retrieval()` reads from the LanceDB table `dagworks___contexts` that was generated by `dlt` and retrieves the top 4 most similar rows to the `user_query` string. It writes the search results to the `relevant_chunks` state field and appends the user input to the `chat_history` state field.\n", + "- `bot_turn()` reads the `chat_history` and the `relevant_chunks` from state, combine the text into a prompt and send a request to OpenAI. The LLM's response is appended to the `chat_history` state field." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "import textwrap\n", + "\n", + "import openai\n", + "import lancedb\n", + "\n", + "from burr.core import State, action\n", + "\n", + "\n", + "@action(reads=[], writes=[\"relevant_chunks\", \"chat_history\"])\n", + "def relevant_chunk_retrieval(\n", + " state: State,\n", + " user_query: str,\n", + " lancedb_con: lancedb.DBConnection,\n", + ") -> State:\n", + " \"\"\"Search LanceDB with the user query and return the top 4 results\"\"\"\n", + " # this is a table generated by `dlt`\n", + " text_chunks_table = lancedb_con.open_table(\"dagworks___contexts\")\n", + "\n", + " search_results = (\n", + " text_chunks_table\n", + " .search(user_query) # this automatically embed the query does vector search\n", + " .select([\"text\", \"id__\"]) # retrieve the `text` and `id__` columns\n", + " .limit(4) # get the top 4 rows\n", + " .to_list()\n", + " )\n", + "\n", + " return state.update(relevant_chunks=search_results).append(chat_history=user_query)\n", + "\n", + "\n", + "@action(reads=[\"chat_history\", \"relevant_chunks\"], writes=[\"chat_history\"])\n", + "def bot_turn(state: State, llm_client: openai.OpenAI) -> State:\n", + " \"\"\"Collect relevant chunks and produce a response to the user query\"\"\"\n", + " user_query = state[\"chat_history\"][-1]\n", + " relevant_chunks = state[\"relevant_chunks\"]\n", + "\n", + " # create system and user prompts\n", + " system_prompt = textwrap.dedent(\n", + " \"\"\"You are a conversational agent designed to discuss and provide \\\n", + " insights about various blog posts. Your task is to engage users in \\\n", + " meaningful conversations based on the content of the blog articles they mention.\n", + " \"\"\"\n", + " )\n", + " joined_chunks = ' '.join([c[\"text\"] for c in relevant_chunks])\n", + " user_prompt = \"BLOGS CONTENT\\n\" + joined_chunks + \"\\nUSER QUERY\\n\" + user_query\n", + "\n", + " # query the OpenAI API\n", + " response = llm_client.chat.completions.create(\n", + " model=\"gpt-4o-mini\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": system_prompt},\n", + " {\"role\": \"user\", \"content\": user_prompt}\n", + " ]\n", + " )\n", + " bot_answer = response.choices[0].message.content\n", + "\n", + " return state.append(chat_history=bot_answer)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 2. Assemble the `Application`\n", + "To build a Burr `Application`, you need to pass it actions and define valid transitions as tuples `(from, to)`. The application must also define an `entrypoint` from where to begin execution. Then, we can visualize the graph of possible states and actions.\n", + "\n", + "First, let's see the simplest `Application` definition." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "%3\n", + "\n", + "\n", + "\n", + "relevant_chunk_retrieval\n", + "\n", + "relevant_chunk_retrieval\n", + "\n", + "\n", + "\n", + "bot_turn\n", + "\n", + "bot_turn\n", + "\n", + "\n", + "\n", + "relevant_chunk_retrieval->bot_turn\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__user_query\n", + "\n", + "input: user_query\n", + "\n", + "\n", + "\n", + "input__user_query->relevant_chunk_retrieval\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__lancedb_con\n", + "\n", + "input: lancedb_con\n", + "\n", + "\n", + "\n", + "input__lancedb_con->relevant_chunk_retrieval\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "bot_turn->relevant_chunk_retrieval\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__llm_client\n", + "\n", + "input: llm_client\n", + "\n", + "\n", + "\n", + "input__llm_client->bot_turn\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from burr.core import ApplicationBuilder\n", + "\n", + "application = (\n", + " ApplicationBuilder()\n", + " .with_actions(relevant_chunk_retrieval, bot_turn)\n", + " .with_transitions(\n", + " (\"relevant_chunk_retrieval\", \"bot_turn\"),\n", + " (\"bot_turn\", \"relevant_chunk_retrieval\"),\n", + " )\n", + " .with_entrypoint(\"relevant_chunk_retrieval\")\n", + " .build()\n", + ")\n", + "application.visualize()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `ApplicationBuilder` patterns allows you to add all the features you need for production-readiness without modifying the logic of your agent. In the next few cells we'll add:\n", + "\n", + "- a hook to display the bot replies\n", + "- tracking and storing execution metadata\n", + "- add OpenTelemetry support\n", + "\n", + "To quickly develop an interactive experience in the terminal, we can add a `Hook` that will run after each `step` (i.e., `action`). It will check if the name of the previously completed action is equal to `bot_turn`. If it's the case, the hook prints the most recent message from the state's `chat_history`, in other words, the bot's reply." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "from burr.lifecycle import PostRunStepHook\n", + "\n", + "class PrintBotAnswer(PostRunStepHook):\n", + " \"\"\"Hook to print the bot's answer\"\"\"\n", + " def post_run_step(self, *, state, action, **future_kwargs):\n", + " if action.name == \"bot_turn\":\n", + " print(\"\\nšŸ¤–: \", state[\"chat_history\"][-1])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This time, we specify via `.with_hooks()` to use the `PrintBotAnswer` hook and via `.with_tracker(..., use_otel_tracing=True)` to track execution and activate OpenTelemetry support. By importing the `opentelemetry.instrumentation` packages for `openai` and `lancedb` and using their `Instrumentor`, we'll be able to track more execution metadata.\n", + "\n", + "Also, we create the `OpenAI` client and the `LanceDBConnection` and bind to specific action parameter using the `.bind()` parameter. Notice how it changes the visualization slightly." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "%3\n", + "\n", + "\n", + "\n", + "relevant_chunk_retrieval\n", + "\n", + "relevant_chunk_retrieval\n", + "\n", + "\n", + "\n", + "bot_turn\n", + "\n", + "bot_turn\n", + "\n", + "\n", + "\n", + "relevant_chunk_retrieval->bot_turn\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "input__user_query\n", + "\n", + "input: user_query\n", + "\n", + "\n", + "\n", + "input__user_query->relevant_chunk_retrieval\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "bot_turn->relevant_chunk_retrieval\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from burr.core import ApplicationBuilder\n", + "from opentelemetry.instrumentation.openai import OpenAIInstrumentor\n", + "from opentelemetry.instrumentation.lancedb import LanceInstrumentor\n", + "\n", + "LanceInstrumentor().instrument()\n", + "OpenAIInstrumentor().instrument()\n", + "\n", + "llm_client = openai.OpenAI()\n", + "lancedb_con = lancedb.connect(os.environ[\"DESTINATION__LANCEDB__CREDENTIALS__URI\"])\n", + "\n", + "application = (\n", + " ApplicationBuilder()\n", + " .with_actions(\n", + " relevant_chunk_retrieval.bind(lancedb_con=lancedb_con),\n", + " bot_turn.bind(llm_client=llm_client),\n", + " )\n", + " .with_transitions(\n", + " (\"relevant_chunk_retrieval\", \"bot_turn\"),\n", + " (\"bot_turn\", \"relevant_chunk_retrieval\"),\n", + " )\n", + " .with_entrypoint(\"relevant_chunk_retrieval\")\n", + " .with_tracker(\"local\", project=\"substack-rag\", use_otel_tracing=True)\n", + " .with_hooks(PrintBotAnswer())\n", + " .build()\n", + ")\n", + "application.visualize()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 3. Launch the `Application`\n", + "Finally, we start the application in a `while` loop, allowing it to run until we exit by inputting `quit` or `q`. We use `application.run()` and specify to halt after the action `bot_turn` to wait for the user's input." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "## Lauching RAG application ##\n", + "\n", + "šŸ¤–: Burr can be incredibly helpful if you're developing applications that require state management and debugging capabilities. Here are a few ways it can assist you:\n", + "\n", + "1. **State-Driven Logic**: Burr allows you to structure your application around actions that depend on the state, giving you fine control over the flow of your application. This is particularly useful for complex applications where the next step depends on the current state.\n", + "\n", + "2. **Easy Debugging**: With Burr, you can gain visibility into the decisions your application makes. If something goes wrong, you can rewind to a previous state and inspect what happened leading up to the issue. This makes identifying bugs much easier.\n", + "\n", + "3. **Forking Applications**: The ability to fork your application from any point in time allows you to create different branches of your application state, which can be useful for testing alternate paths or scenarios.\n", + "\n", + "4. **Integration with Other Frameworks**: Burr is designed to complement other frameworks, such as Hamilton. This makes it versatile and allows you to integrate it into your existing projects seamlessly.\n", + "\n", + "5. **User Input Handling**: You can define how your application interacts with user inputs, enabling the creation of interactive applications that respond dynamically to user actions.\n", + "\n", + "6. **Simplified Action Declarations**: By defining actions as functions or objects, you can keep your code organized and easy to follow. This can lead to more maintainable code, especially in large projects.\n", + "\n", + "If you're working on applications that require robust state management, dynamic responses, and efficient debugging, Burr could be a fantastic addition to your toolkit. Is there a specific type of application you have in mind that you'd like to develop with Burr?\n" + ] + } + ], + "source": [ + "# Launch the Burr application in a `while` loop\n", + "print(\"\\n## Lauching RAG application ##\")\n", + "user_query = input(\"\\nAsk something or type `quit/q` to exit: \")\n", + "\n", + "while True:\n", + " if user_query.lower() in [\"quit\", \"q\"]:\n", + " break\n", + "\n", + " _, _, _ = application.run(\n", + " halt_after=[\"bot_turn\"],\n", + " inputs={\"user_query\": user_query},\n", + " )\n", + " user_query = input(\"\\nAsk something or type `quit/q` to exit: \")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 3. Explore Burr UI\n", + "After using the `Application`, use the command `burr` to launch the Burr UI.\n", + "\n", + "Burr UI allows you to:\n", + "- explore past executions and current ones in real-time\n", + "- see the application move through states\n", + "- view code code failures\n", + "- inspect tracked attributes (LLM prompts and responses, vector database calls, token counts)\n", + "- one-click to create test fixtures from specific states\n", + "- and more!\n", + "\n", + "![](burr-ui.gif)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/rag-lancedb-ingestion/requirements.txt b/examples/rag-lancedb-ingestion/requirements.txt new file mode 100644 index 00000000..372151b9 --- /dev/null +++ b/examples/rag-lancedb-ingestion/requirements.txt @@ -0,0 +1,8 @@ +beautifulsoup4 +burr[start] +dlt[lancedb] +feedparser +lancedb +openai +opentelemetry-instrumentation-lancedb +opentelemetry-instrumentation-openai diff --git a/examples/rag-lancedb-ingestion/statemachine.png b/examples/rag-lancedb-ingestion/statemachine.png new file mode 100644 index 00000000..88305a6e Binary files /dev/null and b/examples/rag-lancedb-ingestion/statemachine.png differ diff --git a/examples/rag-lancedb-ingestion/utils.py b/examples/rag-lancedb-ingestion/utils.py new file mode 100644 index 00000000..ece96683 --- /dev/null +++ b/examples/rag-lancedb-ingestion/utils.py @@ -0,0 +1,54 @@ +import base64 +import getpass +import hashlib +import os + + +def set_environment_variables(): + import dlt.destinations.impl.lancedb.models # noqa + + if os.environ.get("OPENAI_API_KEY") is None: + os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter OPENAI_API_KEY: ") + + os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "openai" + os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "text-embedding-3-small" + + os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"] = ".lancedb" + os.environ["DESTINATION__LANCEDB__CREDENTIALS__EMBEDDING_MODEL_PROVIDER_API_KEY"] = os.environ[ + "OPENAI_API_KEY" + ] + + +def instrument(): + from opentelemetry.instrumentation.lancedb import LanceInstrumentor + from opentelemetry.instrumentation.openai import OpenAIInstrumentor + + LanceInstrumentor().instrument() + OpenAIInstrumentor().instrument() + + +def _compact_hash(digest: bytes) -> str: + """Compact the hash to a string that's safe to pass around.""" + return base64.urlsafe_b64encode(digest).decode() + + +def hash_primitive(obj, *args, **kwargs) -> str: + """Convert the primitive to a string and hash it""" + hash_object = hashlib.md5(str(obj).encode()) + return _compact_hash(hash_object.digest()) + + +def hash_set(obj, *args, **kwargs) -> str: + """Hash each element of the set, then sort hashes, and + create a hash of hashes. + For the same objects in the set, the hashes will be the + same. + """ + hashes = (hash_primitive(elem) for elem in obj) + sorted_hashes = sorted(hashes) + + hash_object = hashlib.sha224() + for hash in sorted_hashes: + hash_object.update(hash.encode()) + + return _compact_hash(hash_object.digest())