Skip to content

Commit

Permalink
Langchain examples for testing
Browse files Browse the repository at this point in the history
and send with otlp and full chatbot demo

---

So... turns out Langchain uses the v1beta1 prediction service client
under the hood directly.. So we should probably instrument that after
all instead of the main wrapper API.

It also has a streaming option so we should try to support that as well,
and it has ainvoke() for asyncio.
  • Loading branch information
aabmass committed Jan 23, 2025
1 parent b2f6b32 commit cf3eea7
Show file tree
Hide file tree
Showing 10 changed files with 2,088 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.venv
docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.13
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
This sample contains part of the LangGraph chatbot demo taken from
https://python.langchain.com/docs/tutorials/chatbot, running with OTel instrumentation. It
sends traces and logs to the OTel collector which sends them to GCP. Docker compose wraps
everything to make it easy to run.

## Running the example

I recommend running in Cloud Shell, it's super simple. You will see GenAI spans in trace
explorer right away. Make sure the Vertex and Trace APIs are enabled in the project.

### Cloud Shell or GCE

```sh
git clone --branch=vertex-langgraph https://github.com/aabmass/opentelemetry-python-contrib.git
cd opentelemetry-python-contrib/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/langgraph-chatbot-demo
docker compose up --build --abort-on-container-exit
```

### Locally with Application Default Credentials

```sh
git clone --branch=vertex-langgraph https://github.com/aabmass/opentelemetry-python-contrib.git
cd opentelemetry-python-contrib/instrumentation-genai/opentelemetry-instrumentation-vertexai/examples/langgraph-chatbot-demo

# Export the credentials to `GOOGLE_APPLICATION_CREDENTIALS` environment variable so it is
# available inside the docker containers
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.config/gcloud/application_default_credentials.json
# Lets collector read mounted config
export USERID="$(id -u)"
# Specify the project ID
export GOOGLE_CLOUD_PROJECT=<your project id>
docker compose up --build --abort-on-container-exit
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# https://python.langchain.com/docs/tutorials/chatbot

from os import environ
from typing import Sequence

from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
SystemMessage,
trim_messages,
)
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_google_vertexai import ChatVertexAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import START, MessagesState, StateGraph
from langgraph.graph.message import add_messages
from typing_extensions import Annotated, TypedDict

from opentelemetry import trace


def main() -> None:
model = ChatVertexAI(
model="gemini-1.5-flash",
project=environ.get("GOOGLE_CLOUD_PROJECT", None),
)

# # Define a new graph
# workflow = StateGraph(state_schema=MessagesState)

# # Define the function that calls the model
# def call_model(state: MessagesState):
# response = model.invoke(state["messages"])
# return {"messages": response}

# # Define the (single) node in the graph
# workflow.add_edge(START, "model")
# workflow.add_node("model", call_model)

# # Add memory
# memory = MemorySaver()
# app = workflow.compile(checkpointer=memory)

# config = {"configurable": {"thread_id": "abc123"}}

# query = "Hi! I'm Bob."

# input_messages = [HumanMessage(query)]
# output = app.invoke({"messages": input_messages}, config)
# output["messages"][
# -1
# ].pretty_print() # output contains all messages in state

# query = "What's my name?"

# input_messages = [HumanMessage(query)]
# output = app.invoke({"messages": input_messages}, config)
# output["messages"][-1].pretty_print()

# config = {"configurable": {"thread_id": "abc234"}}

# input_messages = [HumanMessage(query)]
# output = app.invoke({"messages": input_messages}, config)
# output["messages"][-1].pretty_print()

# config = {"configurable": {"thread_id": "abc123"}}

# input_messages = [HumanMessage(query)]
# output = app.invoke({"messages": input_messages}, config)
# output["messages"][-1].pretty_print()

# prompt_template = ChatPromptTemplate.from_messages(
# [
# (
# "system",
# "You talk like a pirate. Answer all questions to the best of your ability.",
# ),
# MessagesPlaceholder(variable_name="messages"),
# ]
# )

# workflow = StateGraph(state_schema=MessagesState)

# def call_model(state: MessagesState):
# # highlight-start
# prompt = prompt_template.invoke(state)
# response = model.invoke(prompt)
# # highlight-end
# return {"messages": response}

# workflow.add_edge(START, "model")
# workflow.add_node("model", call_model)

# memory = MemorySaver()
# app = workflow.compile(checkpointer=memory)

# config = {"configurable": {"thread_id": "abc345"}}
# query = "Hi! I'm Jim."

# input_messages = [HumanMessage(query)]
# output = app.invoke({"messages": input_messages}, config)
# output["messages"][-1].pretty_print()

# query = "What is my name?"

# input_messages = [HumanMessage(query)]
# output = app.invoke({"messages": input_messages}, config)
# output["messages"][-1].pretty_print()

prompt_template = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant. Answer all questions to the best of your ability in {language}.",
),
MessagesPlaceholder(variable_name="messages"),
]
)

# # highlight-next-line
class State(TypedDict):
# highlight-next-line
messages: Annotated[Sequence[BaseMessage], add_messages]
# highlight-next-line
language: str

# workflow = StateGraph(state_schema=State)

# def call_model(state: State):
# prompt = prompt_template.invoke(state)
# response = model.invoke(prompt)
# return {"messages": [response]}

# workflow.add_edge(START, "model")
# workflow.add_node("model", call_model)

# memory = MemorySaver()
# app = workflow.compile(checkpointer=memory)

# config = {"configurable": {"thread_id": "abc456"}}
# query = "Hi! I'm Bob."
# language = "Spanish"

# input_messages = [HumanMessage(query)]
# output = app.invoke(
# # highlight-next-line
# {"messages": input_messages, "language": language},
# config,
# )
# output["messages"][-1].pretty_print()

# query = "What is my name?"

# input_messages = [HumanMessage(query)]
# output = app.invoke(
# {"messages": input_messages},
# config,
# )
# output["messages"][-1].pretty_print()

trimmer = trim_messages(
max_tokens=65,
strategy="last",
token_counter=model,
include_system=True,
allow_partial=False,
start_on="human",
)

messages = [
SystemMessage(content="you're a good assistant"),
HumanMessage(content="hi! I'm bob"),
AIMessage(content="hi!"),
HumanMessage(content="I like vanilla ice cream"),
AIMessage(content="nice"),
HumanMessage(content="whats 2 + 2"),
AIMessage(content="4"),
HumanMessage(content="thanks"),
AIMessage(content="no problem!"),
HumanMessage(content="having fun?"),
AIMessage(content="yes!"),
]

trimmer.invoke(messages)

workflow = StateGraph(state_schema=State)

def call_model(state: State):
# highlight-start
trimmed_messages = trimmer.invoke(state["messages"])
prompt = prompt_template.invoke(
{"messages": trimmed_messages, "language": state["language"]}
)
response = model.invoke(prompt)
# highlight-end
return {"messages": [response]}

workflow.add_edge(START, "model")
workflow.add_node("model", call_model)

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "abc567"}}
query = "What is my name?"
language = "English"

# highlight-next-line
input_messages = messages + [HumanMessage(query)]
output = app.invoke(
{"messages": input_messages, "language": language},
config,
)
output["messages"][-1].pretty_print()

config = {"configurable": {"thread_id": "abc678"}}
query = "What math problem did I ask?"
language = "English"

input_messages = messages + [HumanMessage(query)]
output = app.invoke(
{"messages": input_messages, "language": language},
config,
)
output["messages"][-1].pretty_print()

config = {"configurable": {"thread_id": "abc789"}}
query = "Hi I'm Todd, please tell me a joke."
language = "English"

input_messages = [HumanMessage(query)]
# highlight-next-line
for chunk, metadata in app.stream(
{"messages": input_messages, "language": language},
config,
# highlight-next-line
stream_mode="messages",
):
if isinstance(chunk, AIMessage): # Filter to just model responses
print(chunk.content, end="|")


with trace.get_tracer(__name__).start_as_current_span("demo-root-span"):
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
services:
app:
build:
dockerfile_inline: |
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
RUN apt-get update && apt-get install -y git
WORKDIR app/
COPY pyproject.toml uv.lock /app
RUN uv sync --frozen --no-dev
ENV PATH="/app/.venv/bin:$PATH"
COPY . /app
ENTRYPOINT []
CMD ["opentelemetry-instrument", "python", "chatbot.py"]
volumes:
- ${GOOGLE_APPLICATION_CREDENTIALS:-/dev/null}:${GOOGLE_APPLICATION_CREDENTIALS:-/dev/null}:ro
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
- OTEL_SERVICE_NAME=langgraph-chatbot-demo
- OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true
- OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true

- GOOGLE_CLOUD_PROJECT
- GOOGLE_CLOUD_QUOTA_PROJECT
- GOOGLE_APPLICATION_CREDENTIALS
depends_on:
- otelcol

otelcol:
image: otel/opentelemetry-collector-contrib:0.118.0
volumes:
- ./otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml:ro
- ${GOOGLE_APPLICATION_CREDENTIALS:-/dev/null}:${GOOGLE_APPLICATION_CREDENTIALS:-/dev/null}:ro
environment:
- GOOGLE_CLOUD_PROJECT
- GOOGLE_CLOUD_QUOTA_PROJECT
- GOOGLE_APPLICATION_CREDENTIALS
# If the collector does not have permission to read the mounted volumes, set
# USERID=$(id -u) to run the container as the current user
user: $USERID

volumes:
logs:
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# pylint: skip-file
from langchain_google_vertexai import ChatVertexAI

# NOTE: OpenTelemetry Python Logs and Events APIs are in beta
from opentelemetry import _events, _logs, trace

# from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
# OTLPLogExporter,
# )
# from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
# OTLPSpanExporter,
# )
from opentelemetry.instrumentation.vertexai import VertexAIInstrumentor
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)

# configure tracing
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter())
)

# configure logging and events
_logs.set_logger_provider(LoggerProvider())
# _logs.get_logger_provider().add_log_record_processor(
# BatchLogRecordProcessor(OTLPLogExporter())
# )
_events.set_event_logger_provider(EventLoggerProvider())

# instrument VertexAI
VertexAIInstrumentor().instrument()


def main():
model = ChatVertexAI(
model="gemini-1.5-flash", temperature=0.2, max_output_tokens=20
)
res = model.invoke("Hello, world!")
print(res)


if __name__ == "__main__":
main()
Loading

0 comments on commit cf3eea7

Please sign in to comment.