Skip to content

Commit

Permalink
feat(api): add messages endpoint with streaming helpers (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-bot authored Dec 19, 2023
1 parent 0c55c84 commit c464b87
Show file tree
Hide file tree
Showing 43 changed files with 3,291 additions and 198 deletions.
2 changes: 1 addition & 1 deletion .stats.yml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
configured_endpoints: 1
configured_endpoints: 2
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,44 @@ async for completion in stream:
print(completion.completion, end="", flush=True)
```

### Streaming Helpers

This library provides several conveniences for streaming messages, for example:

```py
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def main() -> None:
async with client.beta.messages.stream(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-2.1",
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()

# you can still get the accumulated final message outside of
# the context manager, as long as the entire stream was consumed
# inside of the context manager
accumulated = await stream.get_final_message()
print(accumulated.model_dump_json(indent=2))

asyncio.run(main())
```

Streaming with `client.beta.messages.stream(...)` exposes [various helpers for your convenience](helpers.md) including event handlers and accumulation.

Alternatively, you can use `client.beta.messages.create(..., stream=True)` which only returns an async iterable of the events in the stream and thus uses less memory (it does not build up a final message object for you).

## Token counting

You can estimate billing for a given request with the `client.count_tokens()` method, eg:
Expand Down
28 changes: 28 additions & 0 deletions api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,31 @@ from anthropic.types import Completion
Methods:

- <code title="post /v1/complete">client.completions.<a href="./src/anthropic/resources/completions.py">create</a>(\*\*<a href="src/anthropic/types/completion_create_params.py">params</a>) -> <a href="./src/anthropic/types/completion.py">Completion</a></code>

# Beta

## Messages

Types:

```python
from anthropic.types.beta import (
ContentBlock,
ContentBlockDeltaEvent,
ContentBlockStartEvent,
ContentBlockStopEvent,
Message,
MessageDeltaEvent,
MessageParam,
MessageStartEvent,
MessageStopEvent,
MessageStreamEvent,
TextBlock,
TextDelta,
)
```

Methods:

- <code title="post /v1/messages">client.beta.messages.<a href="./src/anthropic/resources/beta/messages.py">create</a>(\*\*<a href="src/anthropic/types/beta/message_create_params.py">params</a>) -> <a href="./src/anthropic/types/beta/message.py">Message</a></code>
- <code>client.beta.messages.<a href="./src/anthropic/resources/beta/messages.py">stream</a>(\*args) -> MessageStreamManager[MessageStream] | MessageStreamManager[MessageStreamT]</code>
30 changes: 30 additions & 0 deletions examples/messages_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio

from anthropic import AsyncAnthropic

client = AsyncAnthropic()


async def main() -> None:
async with client.beta.messages.stream(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-2.1",
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()

# you can still get the accumulated final message outside of
# the context manager, as long as the entire stream was consumed
# inside of the context manager
accumulated = await stream.get_final_message()
print("accumulated message: ", accumulated.model_dump_json(indent=2))


asyncio.run(main())
33 changes: 33 additions & 0 deletions examples/messages_stream_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
from typing_extensions import override

from anthropic import AsyncAnthropic
from anthropic.types.beta import MessageStreamEvent
from anthropic.lib.streaming import AsyncMessageStream

client = AsyncAnthropic()


class MyStream(AsyncMessageStream):
@override
async def on_stream_event(self, event: MessageStreamEvent) -> None:
print("on_event fired with:", event)


async def main() -> None:
async with client.beta.messages.stream(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-2.1",
event_handler=MyStream,
) as stream:
accumulated = await stream.get_final_message()
print("accumulated message: ", accumulated.model_dump_json(indent=2))


asyncio.run(main())
103 changes: 103 additions & 0 deletions helpers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Message Helpers

## Streaming Responses

```python
async with client.beta.messages.stream(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-2.1",
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
```

`client.beta.messages.stream()` returns a `MessageStreamManager`, which is a context manager that yields a `MessageStream` which is iterable, emits events and accumulates messages.

Alternatively, you can use `client.beta.messages.create(..., stream=True)` which returns an
iteratable of the events in the stream and uses less memory (most notably, it does not accumulate a final message
object for you).

The stream will be cancelled when the context manager exits but you can also close it prematurely by calling `stream.close()`.

See an example of streaming helpers in action in [`examples/messages_stream.py`](examples/messages_stream.py) and defining custom event handlers in [`examples/messages_stream_handler.py`](examples/messages_stream_handler.py)

> [!NOTE]
> The synchronous client has the same interface just without `async/await`.
### Lenses

#### `.text_stream`

Provides an iterator over just the text deltas in the stream:

```py
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
```

### Events

#### `await on_stream_event(event: MessageStreamEvent)`

The event is fired when an event is received from the API.

#### `await on_message(message: Message)`

The event is fired when a full Message object has been accumulated. This corresponds to the `message_stop` SSE.

#### `await on_content_block(content_block: ContentBlock)`

The event is fired when a full ContentBlock object has been accumulated. This corresponds to the `content_block_stop` SSE.

#### `await on_text(text: str, snapshot: str)`

The event is fired when a `text` ContentBlock object is being accumulated. The first argument is the text delta and the second is the current accumulated text, for example:

```py
on_text('Hello', 'Hello')
on_text(' there', 'Hello there')
on_text('!', 'Hello there!')
```

This corresponds to the `content_block_delta` SSE.

#### `await on_exception(exception: Exception)`

The event is fired when an exception is encountered while streaming the response.

#### `await on_timeout()`

The event is fired when the request times out.

#### `await on_end()`

The last event fired in the stream.

### Methods

#### `await .close()`

Aborts the request.

#### `await .until_done()`

Blocks until the stream has been read to completion.

#### `await .get_final_message()`

Blocks until the stream has been read to completion and returns the accumulated `Message` object.

#### `await .get_final_text()`

> [!NOTE]
> Currently the API will only ever return 1 content block
Blocks until the stream has been read to completion and returns all `text` content blocks concatenated together.
2 changes: 2 additions & 0 deletions src/anthropic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
"AI_PROMPT",
]

from .lib.streaming import *

_setup_logging()

# Update the __module__ attribute for exported symbols so that
Expand Down
6 changes: 6 additions & 0 deletions src/anthropic/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

class Anthropic(SyncAPIClient):
completions: resources.Completions
beta: resources.Beta
with_raw_response: AnthropicWithRawResponse

# client options
Expand Down Expand Up @@ -126,6 +127,7 @@ def __init__(
self._default_stream_cls = Stream

self.completions = resources.Completions(self)
self.beta = resources.Beta(self)
self.with_raw_response = AnthropicWithRawResponse(self)

@property
Expand Down Expand Up @@ -303,6 +305,7 @@ def _make_status_error(

class AsyncAnthropic(AsyncAPIClient):
completions: resources.AsyncCompletions
beta: resources.AsyncBeta
with_raw_response: AsyncAnthropicWithRawResponse

# client options
Expand Down Expand Up @@ -377,6 +380,7 @@ def __init__(
self._default_stream_cls = AsyncStream

self.completions = resources.AsyncCompletions(self)
self.beta = resources.AsyncBeta(self)
self.with_raw_response = AsyncAnthropicWithRawResponse(self)

@property
Expand Down Expand Up @@ -555,11 +559,13 @@ def _make_status_error(
class AnthropicWithRawResponse:
def __init__(self, client: Anthropic) -> None:
self.completions = resources.CompletionsWithRawResponse(client.completions)
self.beta = resources.BetaWithRawResponse(client.beta)


class AsyncAnthropicWithRawResponse:
def __init__(self, client: AsyncAnthropic) -> None:
self.completions = resources.AsyncCompletionsWithRawResponse(client.completions)
self.beta = resources.AsyncBetaWithRawResponse(client.beta)


Client = Anthropic
Expand Down
17 changes: 9 additions & 8 deletions src/anthropic/_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import datetime
import functools
from typing import TYPE_CHECKING, Any, Union, Generic, TypeVar, Callable, cast
from typing_extensions import Awaitable, ParamSpec, get_args, override, get_origin
from typing_extensions import Awaitable, ParamSpec, override, get_origin

import httpx

from ._types import NoneType, UnknownResponse, BinaryResponseContent
from ._utils import is_given
from ._utils import is_given, extract_type_var_from_base
from ._models import BaseModel, is_basemodel
from ._constants import RAW_RESPONSE_HEADER
from ._exceptions import APIResponseValidationError
Expand Down Expand Up @@ -221,12 +221,13 @@ def __init__(self) -> None:


def _extract_stream_chunk_type(stream_cls: type) -> type:
args = get_args(stream_cls)
if not args:
raise TypeError(
f"Expected stream_cls to have been given a generic type argument, e.g. Stream[Foo] but received {stream_cls}",
)
return cast(type, args[0])
from ._base_client import Stream, AsyncStream

return extract_type_var_from_base(
stream_cls,
index=0,
generic_bases=cast("tuple[type, ...]", (Stream, AsyncStream)),
)


def to_raw_response_wrapper(func: Callable[P, R]) -> Callable[P, APIResponse[R]]:
Expand Down
Loading

0 comments on commit c464b87

Please sign in to comment.