Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a listener that only fires when all dependencies are satisfied #5571

Merged
merged 8 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.integrations.python;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.TableUpdateImpl;
Expand All @@ -11,9 +12,15 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import org.jpy.PyObject;

import javax.annotation.Nullable;
import java.util.Arrays;


/**
* A Deephaven table listener which passes update events to a Python listener object. The listener can also replay the
Expand All @@ -27,48 +34,31 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListener
implements TableSnapshotReplayer {
private static final long serialVersionUID = -8882402061960621245L;
private final PyObject pyCallable;
private final NotificationQueue.Dependency[] dependencies;

/**
* Create a Python listener.
*
* No description for this listener will be provided. A hard reference to this listener will be maintained to
* prevent garbage collection. See {@link #PythonReplayListenerAdapter(String, Table, boolean, PyObject)} if you do
* not want to prevent garbage collection of this listener.
*
* @param source The source table to which this listener will subscribe.
* @param pyObjectIn Python listener object.
*/
public PythonReplayListenerAdapter(Table source, PyObject pyObjectIn) {
this(null, source, true, pyObjectIn);
}

/**
* Create a Python listener.
*
* A hard reference to this listener will be maintained to prevent garbage collection. See
* {@link #PythonReplayListenerAdapter(String, Table, boolean, PyObject)} if you do not want to prevent garbage
* collection of this listener.
*
* @param description A description for the UpdatePerformanceTracker to append to its entry description.
* @param description A description for the UpdatePerformanceTracker to append to its entry description, may be
* null.
* @param source The source table to which this listener will subscribe.
* @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected.
* @param pyObjectIn Python listener object.
* @param dependencies The tables that must be satisfied before this listener is executed.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
*/
public PythonReplayListenerAdapter(String description, Table source, PyObject pyObjectIn) {
this(description, source, true, pyObjectIn);
public static PythonReplayListenerAdapter create(@Nullable String description, Table source, boolean retain,
PyObject pyObjectIn, NotificationQueue.Dependency... dependencies) {
final UpdateGraph updateGraph = source.getUpdateGraph(dependencies);
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonReplayListenerAdapter(description, source, retain, pyObjectIn, dependencies);
}
}

/**
* Create a Python listener.
*
* @param description A description for the UpdatePerformanceTracker to append to its entry description.
* @param source The source table to which this listener will subscribe.
* @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected.
* @param pyObjectIn Python listener object.
*/
public PythonReplayListenerAdapter(String description, Table source, boolean retain,
PyObject pyObjectIn) {
private PythonReplayListenerAdapter(@Nullable String description, Table source, boolean retain, PyObject pyObjectIn,
NotificationQueue.Dependency... dependencies) {
super(description, source, retain);
pyCallable = PythonUtils.pyListenerFunc(pyObjectIn);
this.dependencies = dependencies;
this.pyCallable = PythonUtils.pyListenerFunc(pyObjectIn);
}

@Override
Expand All @@ -87,4 +77,10 @@ public void onUpdate(final TableUpdate update) {
final boolean isReplay = false;
pyCallable.call("__call__", update, isReplay);
}

@Override
public boolean canExecute(final long step) {
return super.canExecute(step)
&& (dependencies.length == 0 || Arrays.stream(dependencies).allMatch(t -> t.satisfied(step)));
}
}
101 changes: 61 additions & 40 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABC, abstractmethod
from functools import wraps
from inspect import signature
from typing import Callable, Union, List, Generator, Dict, Optional, Literal
from typing import Callable, Union, List, Generator, Dict, Optional, Literal, Sequence

import jpy
import numpy
Expand Down Expand Up @@ -61,6 +61,8 @@ def _changes_to_numpy(table: Table, cols: Union[str, List[str]], row_set, chunk_


class TableUpdate(JObjectWrapper):
"""A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the
table. """
j_object_type = _JTableUpdate

def __init__(self, table: Table, j_table_update: jpy.JType):
Expand Down Expand Up @@ -306,39 +308,12 @@ def _wrap_listener_obj(t: Table, listener: TableListener):
return listener


def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False,
replay_lock: Literal["shared", "exclusive"] = "shared"):
"""This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen
for table updates.

The function returns the created TableListenerHandle object whose 'stop' method can be called to stop listening.
If it goes out of scope and is garbage collected, the listener will stop receiving any table updates.

Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'

Returns:
a TableListenerHandle

Raises:
DHError
"""
table_listener_handle = TableListenerHandle(t=t, listener=listener, description=description)
table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock)
return table_listener_handle


class TableListenerHandle(JObjectWrapper):
class TableListenerHandle:
"""A handle to manage a table listener's lifecycle."""
j_object_type = _JPythonReplayListenerAdapter

def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None):
"""Creates a new table listener handle.
def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None,
dependencies: Union[Table, Sequence[Table]] = None):
"""Creates a new table listener handle with dependencies.

Table change events are processed by 'listener', which can be either
(1) a callable (e.g. function) or
Expand All @@ -355,20 +330,33 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
A refreshing table is considered to be satisfied if all possible updates to the table have been processed
in the current update graph cycle. A static table is always considered to be satisfied. If a specified
table is refreshing, it must belong to the same update graph as the table being listened to. Default is
None.

Dependencies are used to ensure that the listener can safely access them during its execution, such as
reading the data from the tables or even performing certain table operations on them.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

Raises:
ValueError
DHError
"""
self.t = t
self.description = description
self.dependencies = to_sequence(dependencies)

if callable(listener):
listener_wrapped = _wrap_listener_func(t, listener)
self.listener_wrapped = _wrap_listener_func(t, listener)
elif isinstance(listener, TableListener):
listener_wrapped = _wrap_listener_obj(t, listener)
self.listener_wrapped = _wrap_listener_obj(t, listener)
else:
raise ValueError("listener is neither callable nor TableListener object")
self.listener = _JPythonReplayListenerAdapter(description, t.j_table, False, listener_wrapped)
raise DHError(message="listener is neither callable nor TableListener object")

try:
self.listener = _JPythonReplayListenerAdapter.create(description, t.j_table, False, self.listener_wrapped, self.dependencies)
except Exception as e:
raise DHError(e, "failed to create a table listener.") from e
self.started = False

def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared") -> None:
Expand Down Expand Up @@ -408,6 +396,39 @@ def stop(self) -> None:
self.t.j_table.removeUpdateListener(self.listener)
self.started = False

@property
def j_object(self) -> jpy.JType:
return self.listener

def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False,
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None)\
-> TableListenerHandle:
"""This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen
for table updates.

The function returns the created TableListenerHandle object whose 'stop' method can be called to stop listening.
If it goes out of scope and is garbage collected, the listener will stop receiving any table updates.

Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
A refreshing table is considered to be satisfied if all possible updates to the table have been processed
in the current update graph cycle. A static table is always considered to be satisfied. If a specified
table is refreshing, it must belong to the same update graph as the table being listened to. Default is
None.

Dependencies are used to ensure that the listener can safely access them during its execution, such as
reading the data from the tables or even performing certain table operations on them.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

Returns:
a TableListenerHandle

Raises:
DHError
"""
table_listener_handle = TableListenerHandle(t=t, dependencies=dependencies, listener=listener,
description=description)
table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock)
return table_listener_handle
Loading
Loading