From c519597247e173da168ed60ee822c909e118f9c8 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Mon, 11 Dec 2023 15:25:37 +0100 Subject: [PATCH] Synchronously create Document patch message to avoid race conditions (#6028) --- panel/io/document.py | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/panel/io/document.py b/panel/io/document.py index 4cee6979b6..a741c345df 100644 --- a/panel/io/document.py +++ b/panel/io/document.py @@ -185,13 +185,14 @@ def wrapper(*args, **kw): wrapper.lock = True # type: ignore return wrapper -def dispatch_tornado(conn, events): +def dispatch_tornado(conn, events=None, msg=None): from tornado.websocket import WebSocketHandler socket = conn._socket ws_conn = getattr(socket, 'ws_connection', False) if not ws_conn or ws_conn.is_closing(): # type: ignore return [] - msg = conn.protocol.create('PATCH-DOC', events) + if msg is None: + msg = conn.protocol.create('PATCH-DOC', events) futures = [ WebSocketHandler.write_message(socket, msg.header_json), WebSocketHandler.write_message(socket, msg.metadata_json), @@ -206,9 +207,10 @@ def dispatch_tornado(conn, events): ]) return futures -def dispatch_django(conn, events): +def dispatch_django(conn, events=None, msg=None): socket = conn._socket - msg = conn.protocol.create('PATCH-DOC', events) + if msg is None: + msg = conn.protocol.create('PATCH-DOC', events) futures = [ socket.send(text_data=msg.header_json), socket.send(text_data=msg.metadata_json), @@ -223,6 +225,21 @@ def dispatch_django(conn, events): ]) return futures +async def _dispatch_msgs(msgs): + from tornado.websocket import WebSocketClosedError, WebSocketHandler + for conn, msg in msgs.items(): + if isinstance(conn._socket, WebSocketHandler): + futures = dispatch_tornado(conn, msg=msg) + else: + futures = dispatch_django(conn, msg=msg) + for future in futures: + try: + await future + except WebSocketClosedError: + logger.warning("Failed sending message as connection was closed") + except Exception as e: + logger.warning(f"Failed sending message due to following error: {e}") + @contextmanager def unlocked() -> Iterator: """ @@ -297,8 +314,17 @@ async def handle_write_errors(): try: curdoc.unhold() except RuntimeError: - if remaining_events: - curdoc.add_next_tick_callback(partial(_dispatch_events, curdoc, remaining_events)) + if not remaining_events: + return + # Create messages for remaining events + msgs = {} + for conn in connections: + if not remaining_events: + continue + # Create a protocol message for any events that cannot be immediately dispatched + msgs[conn] = conn.protocol.create('PATCH-DOC', remaining_events) + curdoc.add_next_tick_callback(partial(_dispatch_msgs, msgs)) + @contextmanager def immediate_dispatch(doc: Document | None = None):