Skip to content

Commit

Permalink
Better fire-and-forget WS closure
Browse files Browse the repository at this point in the history
This introduces more problems than what it solves, since now if the
application wants to exit, it may orphan this and cause a lot of
logspam.

This change now closes the websocket in the background task manager so
that it is not orphaned upon process exit.
  • Loading branch information
lhchavez committed Aug 14, 2024
1 parent b4f084d commit 980e4cc
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
6 changes: 3 additions & 3 deletions replit_river/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ async def _handle_messages_from_ws(
continue
except OutOfOrderMessageException:
logger.exception("Out of order message, closing connection")
await ws_wrapper.close()
await ws_wrapper.close(self._task_manager)
return
except InvalidMessageException:
logger.exception("Got invalid transport message, closing session")
Expand All @@ -227,7 +227,7 @@ async def replace_with_new_websocket(
old_ws_id = old_wrapper.ws.id
if new_ws.id != old_ws_id:
self._reset_session_close_countdown()
await old_wrapper.close()
await old_wrapper.close(self._task_manager)
self._ws_wrapper = WebsocketWrapper(new_ws)
await self._send_buffered_messages(new_ws)
# Server will call serve itself.
Expand Down Expand Up @@ -445,7 +445,7 @@ async def close_websocket(
# Already closed.
if not await ws_wrapper.is_open():
return
await ws_wrapper.close()
await ws_wrapper.close(self._task_manager)
if should_retry and self._retry_connection_callback:
self._task_manager.create_task(self._retry_connection_callback())

Expand Down
22 changes: 17 additions & 5 deletions replit_river/websocket_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from websockets import WebSocketCommonProtocol

from replit_river.task_manager import BackgroundTaskManager

logger = logging.getLogger(__name__)


Expand All @@ -24,12 +26,22 @@ async def is_open(self) -> bool:
async with self.ws_lock:
return self.ws_state == WsState.OPEN

async def close(self) -> None:
async def close(self, tm: BackgroundTaskManager) -> None:
async with self.ws_lock:
if self.ws_state == WsState.OPEN:
self.ws_state = WsState.CLOSING
task = asyncio.create_task(self.ws.close())
task.add_done_callback(
lambda _: logger.debug("old websocket %s closed.", self.ws.id)
)

# Here we schedule the closing of the WebSocket into a background task,
# because it can take an arbitrarily long time to perform (since it
# waits for the close message). This is normally fast, but in the face
# of network disconnects, it can take a couple of minutes for the
# kernel to realize that the packets are being lost, and we want to
# avoid blocking the reconnect for that to happen.
async def _close_ws(ws: WebSocketCommonProtocol) -> None:
try:
await ws.close()
finally:
logger.debug("old websocket %s closed.", ws.id)

tm.create_task(_close_ws(self.ws))
self.ws_state = WsState.CLOSED

0 comments on commit 980e4cc

Please sign in to comment.