Skip to content

Commit

Permalink
Clean up test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Jun 7, 2024
1 parent 17469a0 commit 92c54e4
Showing 1 changed file with 40 additions and 17 deletions.
57 changes: 40 additions & 17 deletions py/server/tests/test_table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,12 @@ def listener_func(update, is_replay):
has_added=True, has_removed=False, has_modified=True)


def test_listener_with_deps_obj(self):
table_update_recorder = TableUpdateRecorder(self.test_table)
def test_listener_obj_with_deps(self):
dep_table = time_table("PT00:00:05").update("X = i % 11")
ec = get_exec_ctx()


with self.subTest("with deps"):
with self.subTest("view op on deps"):
table_update_recorder = TableUpdateRecorder(self.test_table)
j_arrays = []

class ListenerClass(TableListener):
Expand All @@ -219,29 +218,52 @@ def on_update(self, update, is_replay):
has_added=True, has_removed=True, has_modified=False)
self.assertTrue(all([len(ja) > 0 for ja in j_arrays]))

with self.subTest("with deps, error"):
with (self.subTest("update/group_by op on deps")):
table_update_recorder = TableUpdateRecorder(self.test_table)
j_arrays = []

class ListenerClass(TableListener):
def on_update(self, update, is_replay):
table_update_recorder.record(update, is_replay)
with ec:
try:
t2 = dep_table.view(["Y = i % 8"]).group_by("X")
j_arrays.append(_JColumnVectors.of(t2.j_table, "Y").copyToArray())
except Exception as e:
pass
t2 = dep_table.update(["Y = i % 8"]).group_by("X")
j_arrays.append(_JColumnVectors.of(t2.j_table, "Y").copyToArray())

listener = ListenerClass()
self.test_table.await_update()
table_listener_handle = listen(self.test_table, listener, dependencies=dep_table)
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()

self.check_update_recorder(table_update_recorder=table_update_recorder, cols="X", has_replay=False,
has_added=True, has_removed=True, has_modified=False)
self.assertTrue(len(j_arrays) == 0)
self.assertTrue(all([len(ja) > 0 for ja in j_arrays]))

with (self.subTest("join op on deps")):
table_update_recorder = TableUpdateRecorder(self.test_table)
j_arrays = []
dep_table_2 = time_table("PT00:00:05").update("X = i % 11")

class ListenerClass(TableListener):
def on_update(self, update, is_replay):
table_update_recorder.record(update, is_replay)
with ec:
t2 = dep_table.update(["Y = i % 8"]).group_by("X").join(dep_table_2, on="X",
joins="Ts2=Timestamp")
j_arrays.append(_JColumnVectors.of(t2.j_table, "Y").copyToArray())

listener = ListenerClass()
self.test_table.await_update()
table_listener_handle = listen(self.test_table, listener, dependencies=[dep_table, dep_table_2])
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()

self.check_update_recorder(table_update_recorder=table_update_recorder, cols="X", has_replay=False,
has_added=True, has_removed=True, has_modified=False)
self.assertTrue(all([len(ja) > 0 for ja in j_arrays]))

def test_listener_with_deps_func(self):

def test_listener_func_with_deps(self):
cols = [
bool_col(name="Boolean", data=[True, False]),
string_col(name="String", data=["foo", "bar"]),
Expand All @@ -256,10 +278,10 @@ def listener_func(update, is_replay):
try:
dep_table.add(t)
except Exception as e:
self.assertIn("Attempted to make a blocking input table edit from a listener or notification. This is unsupported", str(e))
self.assertIn("Attempted to make a blocking input table edit", str(e))
pass

with self.subTest("with deps"):
with self.subTest("do_replay=False"):
table_update_recorder = TableUpdateRecorder(self.test_table)
table_listener_handle = TableListenerHandle(self.test_table, listener_func, dependencies=dep_table)
table_listener_handle.start(do_replay=False)
Expand All @@ -269,14 +291,15 @@ def listener_func(update, is_replay):
has_modified=False)
self.assertEqual(dep_table.size, 0)

with self.subTest("with deps, replay_lock='exclusive'"):
with self.subTest("do_replay=True, replay_lock='exclusive'"):
table_update_recorder = TableUpdateRecorder(self.test_table)
table_listener_handle.start(do_replay=True, replay_lock="exclusive")
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()
self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False)

with self.subTest("with deps, replay_lock='shared'"):
raise unittest.SkipTest("This test will dead lock, waiting for the resolution of https://github.com/deephaven/deephaven-core/issues/5585")
with self.subTest("do_replay=True, replay_lock='shared'"):
table_update_recorder = TableUpdateRecorder(self.test_table)
table_listener_handle.start(do_replay=True, replay_lock="shared") # noqa
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()
Expand Down

0 comments on commit 92c54e4

Please sign in to comment.