Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix streaming handling for builtin assistants #462

Merged
merged 8 commits into from
Aug 1, 2024
Merged

Conversation

pmeier
Copy link
Member

@pmeier pmeier commented Jul 26, 2024

My patch in #425 is broken when using the Python API with asyncio.run, i.e. the regular use case. As implemented in #425, breaking out of the loop in .answer() means that the async iterator is not fully exhausted, i.e. AsyncStopIteration is never raised. This leads to the context manager that handles the HTTP request staying open longer than Chat.answer. When it is cleaned up eventually, internally it calls await request.aclose(). However, if at that point the async event loop is already killed, e.g. because the coroutine called by asyncio.run is finished, you'll get an error:

Task exception was never retrieved
future: <Task finished name='Task-5' coro=<<async_generator_athrow without __name__>()> exception=RuntimeError('aclose(): asynchronous generator is already running')>
RuntimeError: aclose(): asynchronous generator is already running
Exception ignored in: <coroutine object Response.aclose at 0x73f95efdfcc0>
Traceback (most recent call last):
  File "/home/philip/.conda/envs/ragna-deploy-dev/lib/python3.9/site-packages/httpx/_models.py", line 1008, in aclose
    await self.stream.aclose()
  File "/home/philip/.conda/envs/ragna-deploy-dev/lib/python3.9/site-packages/httpx/_client.py", line 155, in aclose
    await self._stream.aclose()
  File "/home/philip/.conda/envs/ragna-deploy-dev/lib/python3.9/site-packages/httpx/_transports/default.py", line 259, in aclose
    await self._httpcore_stream.aclose()
  File "/home/philip/.conda/envs/ragna-deploy-dev/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 374, in aclose
    await self._stream.aclose()
  File "/home/philip/.conda/envs/ragna-deploy-dev/lib/python3.9/site-packages/httpcore/_synchronization.py", line 225, in __exit__
    self._anyio_shield.__exit__(exc_type, exc_value, traceback)
  File "/home/philip/.conda/envs/ragna-deploy-dev/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 406, in __exit__
    if current_task() is not self._host_task:
  File "/home/philip/.conda/envs/ragna-deploy-dev/lib/python3.9/asyncio/tasks.py", line 38, in current_task
    loop = events.get_running_loop()
RuntimeError: no running event loop

To fix this, this PR introduces the same pattern for our streaming handling that httpx and httpx_sse use as well: a context manager creates the stream and the stream can than be iterated on.

async with httpx_sse.aconnect_sse(
self._client, method, url, **kwargs
) as event_source:
await self._assert_api_call_is_success(event_source.response)
async for sse in event_source.aiter_sse():
yield json.loads(sse.data)

async with self._client.stream(method, url, **kwargs) as response:
await self._assert_api_call_is_success(response)
async for chunk in response.aiter_lines():
yield json.loads(chunk)

Basically, we switch from

async for data in self._call_api(...):
    ...

to

async with self._call_api(...) as stream:
    async for data in stream:
        ...

With this pattern, the cleanup is correctly handled when we exit .answer() and thus get rid of the error.

Copy link
Contributor

@nenb nenb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems sensible to me.

Nitpick: Could you add a unit test to prevent this regression in the future?

@pmeier
Copy link
Member Author

pmeier commented Jul 31, 2024

Could you add a unit test to prevent this regression in the future?

I thought about it, but I honestly have trouble coming up with one that is not unreasonably complex. We need a server that we can hit for HTTP streaming. And the best option we have here is to start our own.

I'll see if we can have this in a reasonable way.

@@ -11,12 +10,7 @@
from ragna._utils import timeout_after
from ragna.deploy import Config
from tests.deploy.utils import TestAssistant


def get_available_port():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to the generic test utils as this is no longer just needed for deploy.



@contextlib.contextmanager
def background_subprocess(*args, stdout=sys.stdout, stderr=sys.stdout, **kwargs):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had this before, but it was removed in #322. Probably can also be used by the UI tests, but we can do that in a follow-up.

Comment on lines +120 to +122
@pytest.mark.parametrize("streaming_protocol", list(HttpStreamingProtocol))
def test_http_streaming_termination(streaming_server, streaming_protocol):
# Non-regression test for https://github.com/Quansight/ragna/pull/462
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nenb This test fails when I revert my patch in this PR. See the CI runs for 0b9211e.

@pmeier pmeier merged commit 19d326b into main Aug 1, 2024
15 of 17 checks passed
@pmeier pmeier deleted the fix-streaming-handling branch August 1, 2024 08:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants