Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server Sent Events #20

Closed
tomchristie opened this issue Jul 12, 2018 · 32 comments
Closed

Server Sent Events #20

tomchristie opened this issue Jul 12, 2018 · 32 comments
Labels
feature New feature or request

Comments

@tomchristie
Copy link
Member

Helpers for sending SSE event streams over HTTP connections.

Related resources:

@tomchristie tomchristie added the feature New feature or request label Dec 18, 2018
@kesavkolla
Copy link

Any update on this? It would be really helpful for building realtime applications

@tomchristie
Copy link
Member Author

Not yet - tho I'd be happy to help guide anyone who's interested in taking on a pull request for it.

@Kamforka
Copy link
Contributor

I'm trying to do a simple working example for server side events in starlette without luck, maybe if you could check what's wrong with my logic I could refine it, and then raise a PR for a new EventResponse class.

This is my code:

from asyncio.queues import Queue

import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, StreamingResponse


class SSE:
    def __init__(self, data, event=None, event_id=None, retry=None):
        self.data = data
        self.event = event
        self.id = event_id
        self.retry = retry

    def encode(self):
        message = f"data: {self.data}"
        if self.event is not None:
            message += f"\nevent: {self.event}"
        if self.id is not None:
            message += f"\nid: {self.id}"
        if self.retry is not None:
            message += f"\nretry: {self.retry}"
        message += "\r\n\r\n"
        return message.encode("utf-8")


app = Starlette(debug=True)
app.queues = []


@app.route("/subscribe", methods=["GET"])
async def subscribe(request: Request):
    async def event_publisher():
        while True:
            event = await queue.get()
            yield event.encode()

    queue = Queue()
    app.queues.append(queue)

    headers = {
        "Content-Type": "text/event-stream",
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no",
    }

    return StreamingResponse(content=event_publisher(), headers=headers)


@app.route("/publish", methods=["POST"])
async def publish(request: Request):
    payload = await request.json()
    data = payload["data"]
    for queue in app.queues:
        event = SSE(data)
        await queue.put(event)

    return JSONResponse({"message": "ok"})


if __name__ == "__main__":
    uvicorn.run("__main__:app", host="0.0.0.0", port=4321, reload=True)

Obviously it's a naive implementation at the moment, but the main thing is that whenever I publish a new event it won't get broadcasted to the subscribers. When debugging I can see that the event is added to the queue, and also the generator can fetch it from the queue, but I never see it streamed to the client.

@Kamforka
Copy link
Contributor

Oh, to be honest it seems like it's working, I tried it first from Firefox and it tries to download the stream by default as a file, but with Chrome it works just fine.

@jacopofar
Copy link

Interesting, @Kamforka do you have the frontend code as well? It should definitely be supported on Firefox

@Kamforka
Copy link
Contributor

Kamforka commented Feb 14, 2020

@jacopofar There is no code, usually I just navigate to the url localhost:4321/subscribe and on Chrome it starts listening to the event stream and displays the messages published by the backend.
Seems like Firefox lacks this feature.

This is how it looks like in Chrome (pretty convenient for debugging):
image

@Kamforka
Copy link
Contributor

Kamforka commented Feb 14, 2020

So at the moment I created these POC classes to enable starlette to send server side events:

class SSE:
    def __init__(self, data, event=None, event_id=None, retry=None):
        self.data = data
        self.event = event
        self.id = event_id
        self.retry = retry

    def encode(self, charset="utf-8"):
        message = f"data: {self.data}"
        if self.event is not None:
            message += f"\nevent: {self.event}"
        if self.id is not None:
            message += f"\nid: {self.id}"
        if self.retry is not None:
            message += f"\nretry: {self.retry}"
        message += "\r\n\r\n"
        return message.encode(charset)

class EventSourceResponse(StreamingResponse):
    def __init__(
        self, content, headers={}, media_type=None, status_code=200, background=None,
    ):

        default_headers = {
            **headers,
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }

        super().__init__(
            content=content,
            status_code=status_code,
            headers=default_headers,
            media_type=media_type,
            background=background,
        )

    async def __call__(self, scope, receive, send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        async for event in self.body_iterator:
            if not isinstance(event, SSE):
                raise Exception("Event source body must be an SSE instance")
            await send(
                {
                    "type": "http.response.body",
                    "body": event.encode(self.charset),
                    "more_body": True,
                }
            )
        await send({"type": "http.response.body", "body": b"", "more_body": False})

        if self.background is not None:
            await self.background()

It works just fine, but I identified two pain points:

  1. If the client disconnects (i.e. the request is disconnected) the response object will just keep hanging and streaming events to the already gone request. However this one can be solved in the generator logic (more or less):
async def event_publisher():
    while True:
        if not await request.is_disconnected():
            try:
                event = await asyncio.wait_for(queue.get(), 1.0)
            except asyncio.TimeoutError:
                continue
            yield event
        else:
            return
  1. When one wants to shutdown the uvicorn worker but there is still event streaming going on, the shutdown process will first be stuck at Waiting for connections to close. lifespan, and if the client disconnects in the meantime it will again be stuck at the Waiting for background tasks to complete. lifespan. (It seems like uvicorn just won't kill streamed responses unless it's forced to do so, but it's really annoying during reload because you will have to press Ctrl + C all the time when a file has been changed

Any thoughts what is the most idiomatic way to overcome these issues within starlette or maybe uvicorn?

@jacopofar
Copy link

Ah, I didn't know that it was possible to see them in Chrome by just visiting the address.
I extended the code with a minimal JS to react to the events, and can confirm it works on Firefox.
Here the gist: https://gist.github.com/jacopofar/b328948c018dc360da8471a930140c06

MDN reports it's not supported only by IE and Edge (probably the new Edge based on Blink will)

I really like it, seems much easier to manage than websockets

@tomchristie
Copy link
Member Author

So, I think we ought to change StreamingResponse so that it listens in the background for disconnects, and breaks out of the iteration when it occurs.

Something along these lines...

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        disconnected = False

        async def listen_for_disconnect():
            while True:
                message = await receive()
                if message['type'] == 'http.disconnect':
                    disconnected = True
                    break

        task = asyncio.create_task(listen_for_disconnect())
        try:
            await send(
                {
                    "type": "http.response.start",
                    "status": self.status_code,
                    "headers": self.raw_headers,
                }
            )
            async for chunk in self.body_iterator:
                if not isinstance(chunk, bytes):
                    chunk = chunk.encode(self.charset)
                await send({"type": "http.response.body", "body": chunk, "more_body": True})
                if disconnected:
                    break

            if not disconnected:
                await send({"type": "http.response.body", "body": b"", "more_body": False})
        finally:
            if task.done():
                task.result()
            else:
                task.cancel()

        if self.background is not None:
            await self.background()

@Kamforka
Copy link
Contributor

@tomchristie I think the above implementation would still block until a new value is being yielded from the body_iterator, and only after that could check the value of the disconnected flag.

Also the nested listen_for_disconnect doesn't have access to the disconnected flag defined in the scope of the __call__ method.

I think we should somehow cancel the async for when the listen_for_disconnect receives the 'http.disconnect' event. I'll try to look into the possibilities for that.

While it should solve the problem with disconnected clients I'm still not sure that it will solve the hang issue of the server shutdown process, what do you think?

@tomchristie
Copy link
Member Author

I think the above implementation would still block until a new value is being yielded from the body_iterator, and only after that could check the value of the disconnected flag.

Sure - tweakable by pushing the streaming into a stream_response task, and cancelling if required. Probably something similar to this...

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
    async def stream_response():
        nonlocal self, send

        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        async for chunk in self.body_iterator:
            if not isinstance(chunk, bytes):
                chunk = chunk.encode(self.charset)
            await send({"type": "http.response.body", "body": chunk, "more_body": True})

        await send({"type": "http.response.body", "body": b"", "more_body": False})

    async def listen_for_disconnect(task):
        nonlocal self, receive

        while True:
            message = await receive()
            if message['type'] == 'http.disconnect':
                if not task.done():
                    task.cancel()
                break

    stream_task = asyncio.create_task(stream_response())
    disconnect_task = asyncio.create_task(listen_for_disconnect(stream_task))
    await stream_task
    disconnect_task.result() if disconnect_task.done() else disconnect_task.cancel()
    stream_task.result()

    if self.background is not None:
        await self.background()

While it should solve the problem with disconnected clients I'm still not sure that it will solve the hang issue of the server shutdown process, what do you think?

I've not looked into it - depends if uvicorn (or daphne/hypercorn) sends disconnected events during shutdown, right?

@Kamforka
Copy link
Contributor

Kamforka commented Feb 17, 2020

I think the above implementation would still block until a new value is being yielded from the body_iterator, and only after that could check the value of the disconnected flag.

Sure - tweakable by pushing the streaming into a stream_response task, and cancelling if required. Probably something similar to this...

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
    async def stream_response():
        nonlocal self, send

        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        async for chunk in self.body_iterator:
            if not isinstance(chunk, bytes):
                chunk = chunk.encode(self.charset)
            await send({"type": "http.response.body", "body": chunk, "more_body": True})

        await send({"type": "http.response.body", "body": b"", "more_body": False})

    async def listen_for_disconnect(task):
        nonlocal self, receive

        while True:
            message = await receive()
            if message['type'] == 'http.disconnect':
                if not task.done():
                    task.cancel()
                break

    stream_task = asyncio.create_task(stream_response())
    disconnect_task = asyncio.create_task(listen_for_disconnect(stream_task))
    await stream_task
    disconnect_task.result() if disconnect_task.done() else disconnect_task.cancel()
    stream_task.result()

    if self.background is not None:
        await self.background()

While it should solve the problem with disconnected clients I'm still not sure that it will solve the hang issue of the server shutdown process, what do you think?

I've not looked into it - depends if uvicorn (or daphne/hypercorn) sends disconnected events during shutdown, right?

Wow I like this one! I reworked it a bit though:

    async def stream_response(self, send):
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )
        async for chunk in self.body_iterator:
            if not isinstance(chunk, bytes):
                chunk = chunk.encode(self.charset)
            await send({"type": "http.response.body", "body": chunk, "more_body": True})

        await send({"type": "http.response.body", "body": b"", "more_body": False})

    async def listen_for_disconnect(self, receive):
        while True:
            message = await receive()
            if message["type"] == "http.disconnect":
                break

    async def __call__(self, scope, receive, send):
        done, pending = await asyncio.wait(
            [self.stream_response(send), self.listen_for_disconnect(receive)],
            return_when=asyncio.FIRST_COMPLETED,
        )

        for task in pending:
            task.cancel()

        if self.background is not None:
            await self.background()

Tested and works. Thoughts?

@tomchristie
Copy link
Member Author

That's a nice implementation, yup.

My one other concern here would be cases where we might inadvertantly end up with multiple readers listening for the disconnect event. For example, the HTTP base middleware returns a StreamingResponse... https://github.com/encode/starlette/blob/master/starlette/middleware/base.py#L58 ...which I assume would break our proposals here, since we'd end up with more than one task listening on the receive, with only one of them ending up with the message.

@Kamforka
Copy link
Contributor

@tomchristie I will try adding a middleware like that to my setup and check what happens.

Also I found another caveat specific to event source subscriptions. With the above implementation there is no way to check when a response was cancelled.

What do you think implementing a callback e.g.: on_disconnect for the StreamingResponse so one could pass clean up logic there?

I mean something like this:

return StreamingResponse(content=event_publisher(), on_disconnect=lambda: app.subscriptions.remove(queue))

And then the listener logic could call it when receives a disconnect:

if message["type"] == "http.disconnect":
    self.on_disconnect()
    break

@tomchristie
Copy link
Member Author

Also I found another caveat specific to event source subscriptions. With the above implementation there is no way to check when a response was cancelled.

You'll get a cancelled exception raised within the streaming code. The sensible thing to do here would be to use with context blocks or try ... finally: statements will end up executing clean-up code.

@Kamforka
Copy link
Contributor

Kamforka commented Feb 17, 2020

@tomchristie however that require to add __aenter__ and __aexit__ to StreamingResponse right?

And then you could do something like this:

async def hello(request):
    async with StreamingResponse() as resp:
        while True:
            await resp.send(data)
            await asyncio.sleep(1)
    return resp

So in this case you could try-catch the exception when the disconnect cancels, right?

Or maybe I overthink something? Cuz my problem here is that I need to do the cleanup logic from the view function and not inside the response object.

@tomchristie
Copy link
Member Author

however that require to add aenter and aexit to StreamingResponse right?

The async iterator that gets passed to the response instance will have the exception raised there.

@Kamforka
Copy link
Contributor

Hmm, you sure about that?
I don't get any exceptions in my iterator when I run my previously posted codes.

@tomchristie
Copy link
Member Author

Which python version are you running?

@Kamforka
Copy link
Contributor

3.7.x and 3.8.x

@Kamforka
Copy link
Contributor

That's a nice implementation, yup.

My one other concern here would be cases where we might inadvertantly end up with multiple readers listening for the disconnect event. For example, the HTTP base middleware returns a StreamingResponse... https://github.com/encode/starlette/blob/master/starlette/middleware/base.py#L58 ...which I assume would break our proposals here, since we'd end up with more than one task listening on the receive, with only one of them ending up with the message.

I tested the proposal using the CustomHeaderMiddleware and it works just fine.
It seems like both the middleware's streaming response, and the event source's streaming response got the message from receive.

@Kamforka
Copy link
Contributor

Kamforka commented Feb 18, 2020

@tomchristie

Also I found another caveat specific to event source subscriptions. With the above implementation there is no way to check when a response was cancelled.

You'll get a cancelled exception raised within the streaming code. The sensible thing to do here would be to use with context blocks or try ... finally: statements will end up executing clean-up code.

I think I found a legitimate solution for the cleanup as well using background tasks.
E.g.:

@app.route("/subscribe", methods=["GET"])
async def subscribe(request: Request):
    async def remove_subscriptions():
        app.subscriptions.remove(queue)

    async def event_iterator():
        while True:
            # yielding events here

    queue = Queue()
    app.subscriptions.add(queue)

    return EventSourceResponse(
        content=event_iterator(), background=BackgroundTask(remove_subscriptions)
    )

Since background tasks are executed whenever the response is disconnected or finished it kinda feels appropriate to do cleanups with them.

@tomchristie
Copy link
Member Author

So at the moment I think I'd like to see any implementation here tackled as a third party package, as per comment on #757.

I'm trying to keep Starlette's scope down to a minimum, and having an SSEResponse maintained independantly would be really helpful here.

@Kamforka
Copy link
Contributor

So basically there were an already worked out PR on this all time long?

@Kamforka
Copy link
Contributor

Btw I think that so far we discussed an alternative implementation for the StreamingResponse, therefore most of the brainstorming that we had here should directly go to starlette, don't you think?

@tomchristie
Copy link
Member Author

Potentially. Let's wait and see what any pull request here looks like, then we'd be in a better position to take a call on it.

@paxcodes
Copy link
Contributor

paxcodes commented Oct 5, 2020

There is a third-party package that implements SSE for starlette: https://github.com/sysid/sse-starlette

@jacopofar
Copy link

It says Caveat: SSE streaming does not work in combination with GZipMiddleware., is it because of #919 ?

@tomchristie
Copy link
Member Author

@jacopofar No, it's not specific to Starlette - it's a constraint of how SSE works.

You could potentially do something like compress the content of the individual messages themselves if they were large enough for that to mater, but you can't compress the stream itself. (It wouldn't be a valid SSE response if you did, since you'd be scrambling the framing that indicates "here's a new message".)

@transfluxus
Copy link

Hi would be nice if the GZipMiddleware or maybe middlewares in general, would have a set of routes (strings or regex pattern) to ignore

@Kludex
Copy link
Member

Kludex commented Sep 18, 2021

I guess the summary here is:

  • Starlette will not implement SSE internally.
  • sse-starlette is the official recommendation to use SSE with Starlette.

@Kludex
Copy link
Member

Kludex commented Sep 18, 2021

As per @tomchristie 's reply on #51 (comment) 3 months ago, I'm closing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants