Skip to content

Commit

Permalink
Merge pull request #2679 from langchain-ai/nc/9dec/imperative-generator
Browse files Browse the repository at this point in the history
lib: imperative api: Generators use yield to publish stream_mode=custom events
  • Loading branch information
nfcampos authored Dec 9, 2024
2 parents 3f1bdb9 + e0a0958 commit a403e80
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions libs/langgraph/langgraph/func/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import concurrent
import concurrent.futures
import inspect
import types
from functools import partial, update_wrapper
from typing import (
Expand All @@ -24,7 +25,7 @@
from langgraph.pregel.read import PregelNode
from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry
from langgraph.store.base import BaseStore
from langgraph.types import RetryPolicy
from langgraph.types import RetryPolicy, StreamMode, StreamWriter

P = ParamSpec("P")
P1 = TypeVar("P1")
Expand Down Expand Up @@ -76,10 +77,32 @@ def entrypoint(
store: Optional[BaseStore] = None,
) -> Callable[[types.FunctionType], Pregel]:
def _imp(func: types.FunctionType) -> Pregel:
if inspect.isgeneratorfunction(func):

def gen_wrapper(*args: Any, writer: StreamWriter, **kwargs: Any) -> Any:
for chunk in func(*args, **kwargs):
writer(chunk)

bound = get_runnable_for_func(gen_wrapper)
stream_mode: StreamMode = "custom"
elif inspect.isasyncgenfunction(func):

async def agen_wrapper(
*args: Any, writer: StreamWriter, **kwargs: Any
) -> Any:
async for chunk in func(*args, **kwargs):
writer(chunk)

bound = get_runnable_for_func(agen_wrapper)
stream_mode = "custom"
else:
bound = get_runnable_for_func(func)
stream_mode = "updates"

return Pregel(
nodes={
func.__name__: PregelNode(
bound=get_runnable_for_func(func),
bound=bound,
triggers=[START],
channels=[START],
writers=[ChannelWrite([ChannelWriteEntry(END)], tags=[TAG_HIDDEN])],
Expand All @@ -89,7 +112,7 @@ def _imp(func: types.FunctionType) -> Pregel:
input_channels=START,
output_channels=END,
stream_channels=END,
stream_mode="updates",
stream_mode=stream_mode,
checkpointer=checkpointer,
store=store,
)
Expand Down

0 comments on commit a403e80

Please sign in to comment.