Skip to content

Commit

Permalink
Python Client: avoid thread self-join on stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
kosak committed Jan 4, 2024
1 parent e860064 commit 30299ad
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions py/client-ticking/src/pydeephaven_ticking/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ class TableListenerHandle:

_table: Table
_listener: TableListener
# Set when the user calls stop()
_cancelled: bool
# Tracks Whether we've called cancel on the FlightStreamReader
_reader_cancelled: bool
_bp: dhc.BarrageProcessor
_writer: flight.FlightStreamWriter
_reader: flight.FlightStreamReader
Expand All @@ -195,6 +198,7 @@ def __init__(self, table: Table, listener: TableListener):
self._table = table
self._listener = listener
self._cancelled = False
self._reader_cancelled = False

def start(self) -> None:
"""Subscribes to changes on the table referenced in the constructor. When changes happen, the
Expand All @@ -211,9 +215,16 @@ def start(self) -> None:

def stop(self) -> None:
"""Cancels the subscription to the table and stops the service thread. By the time this method returns, the
thread servicing the subscription will be destroyed, and the callback will no longer be invoked."""
thread servicing the subscription will be destroyed, and the callback will no longer be invoked.
This method joins the subscription servicing thread, unless stop() was called from that very thread.
This can happen if the user's callback calls stop()."""

self._cancelled = True
if threading.get_ident() == self._thread.ident:
# We are inside the callback, so just setting the 'cancelled' flag suffices.
return

self._reader_cancelled = True
self._reader.cancel()
self._thread.join()

Expand All @@ -223,16 +234,25 @@ def _process_data(self):
user-supplied callback with that TableUpdate."""

try:
while True:
while not self._cancelled:
data, metadata = self._reader.read_chunk()
ticking_update = self._bp.process_next_chunk(data.columns, metadata)
if ticking_update is not None:
table_update = TableUpdate(ticking_update)
self._listener.on_update(table_update)
except StopIteration:
pass
except Exception as e:
if not self._cancelled:
self._listener.on_error(e)

try:
if not self._reader_cancelled:
self._reader_cancelled = True
self._reader.cancel()
self._writer.close()
except Exception as e:
pass

def listen(table: Table, listener: Union[Callable, TableListener],
on_error: Union[Callable, None] = None) -> TableListenerHandle:
Expand Down

0 comments on commit 30299ad

Please sign in to comment.