Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a listener that only fires when all dependencies are satisfied #5571

Merged
merged 8 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
Expand All @@ -33,7 +34,7 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListener
implements TableSnapshotReplayer {
private static final long serialVersionUID = -8882402061960621245L;
private final PyObject pyCallable;
private final Table[] dependencies;
private final NotificationQueue.Dependency[] dependencies;

/**
* Create a Python listener.
Expand All @@ -46,15 +47,15 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListener
* @param dependencies The tables that must be satisfied before this listener is executed.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
*/
public static PythonReplayListenerAdapter create(@Nullable String description, Table source, boolean retain,
PyObject pyObjectIn, Table... dependencies) {
PyObject pyObjectIn, NotificationQueue.Dependency... dependencies) {
final UpdateGraph updateGraph = source.getUpdateGraph(dependencies);
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonReplayListenerAdapter(description, source, retain, pyObjectIn, dependencies);
}
}

private PythonReplayListenerAdapter(String description, Table source, boolean retain, PyObject pyObjectIn,
Table... dependencies) {
private PythonReplayListenerAdapter(@Nullable String description, Table source, boolean retain, PyObject pyObjectIn,
NotificationQueue.Dependency... dependencies) {
super(description, source, retain);
this.dependencies = dependencies;
this.pyCallable = PythonUtils.pyListenerFunc(pyObjectIn);
Expand Down
28 changes: 15 additions & 13 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,14 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. A
refreshing table is considered to be satisfied if all updates to the table have been processed in the current
update graph cycle. A static table is always considered to be satisfied. If a specified table is refreshing,
it must belong to the same update graph as the table being listened to. Default is None.
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
A refreshing table is considered to be satisfied if all possible updates to the table have been processed
in the current update graph cycle. A static table is always considered to be satisfied. If a specified
table is refreshing, it must belong to the same update graph as the table being listened to. Default is
None.

Dependencies are used to ensure that the listener can safely access them during its execution, such as reading
the data from the tables or even performing table operations on them.
Dependencies are used to ensure that the listener can safely access them during its execution, such as
reading the data from the tables or even performing certain table operations on them.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

Raises:
DHError
Expand Down Expand Up @@ -412,13 +413,14 @@ def listen(t: Table, listener: Union[Callable, TableListener], description: str
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. A
refreshing table is considered to be satisfied if all updates to the table have been processed in the current
update graph cycle. A static table is always considered to be satisfied. If a specified table is refreshing,
it must belong to the same update graph as the table being listened to. Default is None.

Dependencies are used to ensure that the listener can safely access them during its execution, such as reading
the data from the tables or even performing table operations on them.
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
A refreshing table is considered to be satisfied if all possible updates to the table have been processed
in the current update graph cycle. A static table is always considered to be satisfied. If a specified
table is refreshing, it must belong to the same update graph as the table being listened to. Default is
None.

Dependencies are used to ensure that the listener can safely access them during its execution, such as
reading the data from the tables or even performing certain table operations on them.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

Returns:
a TableListenerHandle
Expand Down
Loading