From 53ec7c41b2bd4261ba3790f417724a420132d863 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Wed, 27 Nov 2024 17:31:21 -0800 Subject: [PATCH 1/2] Revert "sdk-py: Fix SSE parsing to split lines only \n \r \r\n per SSE spec" This reverts commit dc09b134007c2a8c054b4db6ff09cf779366ebd0. --- libs/sdk-py/langgraph_sdk/client.py | 5 +- libs/sdk-py/langgraph_sdk/sse.py | 106 ---------------------------- 2 files changed, 2 insertions(+), 109 deletions(-) delete mode 100644 libs/sdk-py/langgraph_sdk/sse.py diff --git a/libs/sdk-py/langgraph_sdk/client.py b/libs/sdk-py/langgraph_sdk/client.py index a018bb3b6e..6a3bb6c9c9 100644 --- a/libs/sdk-py/langgraph_sdk/client.py +++ b/libs/sdk-py/langgraph_sdk/client.py @@ -59,7 +59,6 @@ ThreadStatus, ThreadUpdateStateResponse, ) -from langgraph_sdk.sse import EventSource logger = logging.getLogger(__name__) @@ -293,7 +292,7 @@ async def stream( else: logger.error(f"Error from langgraph-api: {body}", exc_info=e) raise e - async for event in EventSource(sse.response).aiter_sse(): + async for event in sse.aiter_sse(): yield StreamPart( event.event, orjson.loads(event.data) if event.data else None ) @@ -2427,7 +2426,7 @@ def stream( else: logger.error(f"Error from langgraph-api: {body}", exc_info=e) raise e - for event in EventSource(sse.response).iter_sse(): + for event in sse.iter_sse(): yield StreamPart( event.event, orjson.loads(event.data) if event.data else None ) diff --git a/libs/sdk-py/langgraph_sdk/sse.py b/libs/sdk-py/langgraph_sdk/sse.py deleted file mode 100644 index 8b019e4a63..0000000000 --- a/libs/sdk-py/langgraph_sdk/sse.py +++ /dev/null @@ -1,106 +0,0 @@ -"""Adapted from httpx_sse to split lines on \n, \r, \r\n per the SSE spec.""" - -import io -from typing import AsyncIterator, Iterator - -import httpx -import httpx_sse -import httpx_sse._decoders - - -class BytesLineDecoder: - """ - Handles incrementally reading lines from text. - - Has the same behaviour as the stdllib bytes splitlines, - but handling the input iteratively. - """ - - def __init__(self) -> None: - self.buffer = io.BytesIO() - self.trailing_cr: bool = False - - def decode(self, text: bytes) -> list[bytes]: - # See https://docs.python.org/3/glossary.html#term-universal-newlines - NEWLINE_CHARS = b"\n\r" - - # We always push a trailing `\r` into the next decode iteration. - if self.trailing_cr: - text = b"\r" + text - self.trailing_cr = False - if text.endswith(b"\r"): - self.trailing_cr = True - text = text[:-1] - - if not text: - # NOTE: the edge case input of empty text doesn't occur in practice, - # because other httpx internals filter out this value - return [] # pragma: no cover - - trailing_newline = text[-1] in NEWLINE_CHARS - lines = text.splitlines() - - if len(lines) == 1 and not trailing_newline: - # No new lines, buffer the input and continue. - self.buffer.write(lines[0]) - return [] - - if self.buffer: - # Include any existing buffer in the first portion of the - # splitlines result. - lines = [self.buffer.getvalue() + lines[0]] + lines[1:] - self.buffer.truncate(0) - - if not trailing_newline: - # If the last segment of splitlines is not newline terminated, - # then drop it from our output and start a new buffer. - self.buffer.write(lines.pop()) - - return lines - - def flush(self) -> list[bytes]: - if not self.buffer and not self.trailing_cr: - return [] - - lines = [self.buffer.getvalue()] if self.buffer else [] - self.buffer.truncate(0) - self.trailing_cr = False - return lines - - -async def aiter_lines_raw(response: httpx.Response) -> AsyncIterator[bytes]: - decoder = BytesLineDecoder() - async for chunk in response.aiter_bytes(): - for line in decoder.decode(chunk): - yield line - for line in decoder.flush(): - yield line - - -def iter_lines_raw(response: httpx.Response) -> Iterator[bytes]: - decoder = BytesLineDecoder() - for chunk in response.iter_bytes(): - for line in decoder.decode(chunk): - yield line - for line in decoder.flush(): - yield line - - -class EventSource(httpx_sse.EventSource): - async def aiter_sse(self) -> AsyncIterator[httpx_sse.ServerSentEvent]: - self._check_content_type() - decoder = httpx_sse._decoders.SSEDecoder() - async for line in aiter_lines_raw(self._response): - line = line.rstrip(b"\n") - sse = decoder.decode(line.decode()) - if sse is not None: - yield sse - - def iter_sse(self) -> Iterator[httpx_sse.ServerSentEvent]: - self._check_content_type() - decoder = httpx_sse._decoders.SSEDecoder() - for line in iter_lines_raw(self._response): - line = line.rstrip(b"\n") - sse = decoder.decode(line.decode()) - if sse is not None: - yield sse From 4576a259dd92850de369be041c66173622dfce92 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Wed, 27 Nov 2024 17:32:07 -0800 Subject: [PATCH 2/2] sdk-py 0.1.39 --- libs/sdk-py/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/sdk-py/pyproject.toml b/libs/sdk-py/pyproject.toml index 735954803e..7c8af51b28 100644 --- a/libs/sdk-py/pyproject.toml +++ b/libs/sdk-py/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langgraph-sdk" -version = "0.1.38" +version = "0.1.39" description = "SDK for interacting with LangGraph API" authors = [] license = "MIT"