Skip to content

Commit

Permalink
Merge pull request #51 from wri/hitl
Browse files Browse the repository at this point in the history
Add HITL for location
  • Loading branch information
yellowcap authored Dec 18, 2024
2 parents 19aadce + 1cc8159 commit 281fe05
Show file tree
Hide file tree
Showing 11 changed files with 1,471 additions and 250 deletions.
117 changes: 78 additions & 39 deletions api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

from langgraph.types import Command
from langfuse.callback import CallbackHandler
from zeno.agents.maingraph.agent import graph
from zeno.agents.maingraph.utils.state import GraphState
Expand All @@ -27,53 +28,80 @@ def pack(data):


# Streams the response from the graph
def event_stream(query: str, thread_id: Optional[str]=None):

def event_stream(query: str, thread_id: Optional[str]=None, query_type: Optional[str]=None):
if not thread_id:
thread_id = uuid.uuid4()

initial_state = GraphState(question=query)

config = {
# "callbacks": [langfuse_handler],
"configurable": {"thread_id": thread_id},
}

for namespace, chunk in graph.stream(
# for data in graph.stream(
initial_state,
stream_mode="updates",
subgraphs=True,
config=config,
):
if query_type == "human_input":
print(query)
selected_index = int(query)
current_state = graph.get_state(config)
stream = graph.stream(
Command(resume={
"action": "update",
"option": selected_index
}),
stream_mode="updates",
subgraphs=True,
config=config,
)
elif query_type == "query":
stream = graph.stream(
{"question": query, "route": None},
stream_mode="updates",
subgraphs=True,
config=config,
)
else:
raise ValueError(f"Invalid query type from frontend: {query_type}")

for namespace,chunk in stream:
node_name = list(chunk.keys())[0]
print(f"Namespace {namespace}")
if not namespace:
continue
print(f"Node {node_name}")
if not chunk[node_name]:
continue
messages = chunk[node_name].get("messages")
if not messages:
continue
for msg in messages:
# print(msg)
# yield pack({
# "type":
# })
if isinstance(msg, ToolMessage):
yield pack({
"type": "tool",
"tool_name": msg.name,
"message": msg.content,
"artifact": msg.artifact if hasattr(msg, "artifact") else None,
})
elif isinstance(msg, AIMessage):
if msg.content:
print(f"Namespace -> {namespace}")
print(f"Node name -> {node_name}")

if node_name == "__interrupt__":
print(f"Waiting for human input")
interrupt_msg = chunk[node_name][0].value
question = interrupt_msg["question"]
options = interrupt_msg["options"]
artifact = interrupt_msg["artifact"]

print("Waiting for human input")
yield pack({
"type": "human_input",
"options": options,
"artifact": artifact,
"question": question
})
elif node_name == "slasher":
pass
else:
if not chunk[node_name]:
continue
messages = chunk[node_name].get("messages", {})
if not messages:
continue
for msg in messages:
if isinstance(msg, ToolMessage):
yield pack({
"type": "assistant",
"message": msg.content
"type": "tool_call",
"tool_name": msg.name,
"content": msg.content,
"artifact": msg.artifact if hasattr(msg, "artifact") else None,
})
elif isinstance(msg, AIMessage):
if msg.content:
yield pack({
"type": "update",
"content": msg.content
})
else:
raise ValueError(f"Unknown message type: {type(msg)}")



Expand Down Expand Up @@ -101,8 +129,19 @@ def event_stream(query: str, thread_id: Optional[str]=None):


@app.post("/stream")
async def stream(query: Annotated[str, Body(embed=True)], thread_id: Optional[str]=None):
return StreamingResponse(event_stream(query, thread_id), media_type="application/x-ndjson")
async def stream(
query: Annotated[str, Body(embed=True)],
thread_id: Optional[str] = Body(None),
query_type: Optional[str] = Body(None)):

print("\n\nPOST...\n\n")
print(query, thread_id, query_type)
print("=" * 30)

return StreamingResponse(
event_stream(query, thread_id, query_type),
media_type="application/x-ndjson"
)


# Processes the query and returns the response
Expand Down
155 changes: 0 additions & 155 deletions app/basic.py

This file was deleted.

Loading

0 comments on commit 281fe05

Please sign in to comment.