Skip to content

Commit

Permalink
Synchronously create Document patch message to avoid race conditions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr authored Dec 11, 2023
1 parent 8a02be4 commit c519597
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions panel/io/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,14 @@ def wrapper(*args, **kw):
wrapper.lock = True # type: ignore
return wrapper

def dispatch_tornado(conn, events):
def dispatch_tornado(conn, events=None, msg=None):
from tornado.websocket import WebSocketHandler
socket = conn._socket
ws_conn = getattr(socket, 'ws_connection', False)
if not ws_conn or ws_conn.is_closing(): # type: ignore
return []
msg = conn.protocol.create('PATCH-DOC', events)
if msg is None:
msg = conn.protocol.create('PATCH-DOC', events)
futures = [
WebSocketHandler.write_message(socket, msg.header_json),
WebSocketHandler.write_message(socket, msg.metadata_json),
Expand All @@ -206,9 +207,10 @@ def dispatch_tornado(conn, events):
])
return futures

def dispatch_django(conn, events):
def dispatch_django(conn, events=None, msg=None):
socket = conn._socket
msg = conn.protocol.create('PATCH-DOC', events)
if msg is None:
msg = conn.protocol.create('PATCH-DOC', events)
futures = [
socket.send(text_data=msg.header_json),
socket.send(text_data=msg.metadata_json),
Expand All @@ -223,6 +225,21 @@ def dispatch_django(conn, events):
])
return futures

async def _dispatch_msgs(msgs):
from tornado.websocket import WebSocketClosedError, WebSocketHandler
for conn, msg in msgs.items():
if isinstance(conn._socket, WebSocketHandler):
futures = dispatch_tornado(conn, msg=msg)
else:
futures = dispatch_django(conn, msg=msg)
for future in futures:
try:
await future
except WebSocketClosedError:
logger.warning("Failed sending message as connection was closed")
except Exception as e:
logger.warning(f"Failed sending message due to following error: {e}")

@contextmanager
def unlocked() -> Iterator:
"""
Expand Down Expand Up @@ -297,8 +314,17 @@ async def handle_write_errors():
try:
curdoc.unhold()
except RuntimeError:
if remaining_events:
curdoc.add_next_tick_callback(partial(_dispatch_events, curdoc, remaining_events))
if not remaining_events:
return
# Create messages for remaining events
msgs = {}
for conn in connections:
if not remaining_events:
continue
# Create a protocol message for any events that cannot be immediately dispatched
msgs[conn] = conn.protocol.create('PATCH-DOC', remaining_events)
curdoc.add_next_tick_callback(partial(_dispatch_msgs, msgs))


@contextmanager
def immediate_dispatch(doc: Document | None = None):
Expand Down

0 comments on commit c519597

Please sign in to comment.