From de313bb54e47441025dfb1c731ac91b8014bcdf5 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 25 Oct 2023 18:43:30 -0500 Subject: [PATCH] Don't leak the viewport subscription when the viewport data is read (#4420) Also closes other streams after the client is closed. Fixes #4410 --- .../io/deephaven/web/client/api/JsTable.java | 12 ++-- .../web/client/api/WorkerConnection.java | 67 +++++++++++++------ .../TableViewportSubscription.java | 2 +- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java index 082619007f1..398c2915a3e 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/JsTable.java @@ -741,10 +741,11 @@ public void setInternalViewport(double firstRow, double lastRow, Column[] column /** * Gets the currently visible viewport. If the current set of operations has not yet resulted in data, it will not - * resolve until that data is ready. + * resolve until that data is ready. If this table is closed before the promise resolves, it will be rejected - to + * separate the lifespan of this promise from the table itself, call + * {@link TableViewportSubscription#getViewportData()} on the result from {@link #setViewport(double, double)}. * * @return Promise of {@link TableData} - * */ @JsMethod public Promise getViewportData() { @@ -752,7 +753,7 @@ public Promise getViewportData() { if (subscription == null) { return Promise.reject("No viewport currently set"); } - return subscription.getViewportData(); + return subscription.getInternalViewportData(); } public Promise getInternalViewportData() { @@ -760,7 +761,7 @@ public Promise getInternalViewportData() { final ClientTableState active = state(); active.onRunning(state -> { if (currentViewportData == null) { - // no viewport data received yet; let's setup a one-shot UPDATED event listener + // no viewport data received yet; let's set up a one-shot UPDATED event listener addEventListenerOneShot(EVENT_UPDATED, ignored -> promise.succeed(currentViewportData)); } else { promise.succeed(currentViewportData); @@ -1961,8 +1962,7 @@ public void setState(final ClientTableState state) { && existingSubscription.getStatus() != TableViewportSubscription.Status.DONE) { JsLog.debug("closing old viewport", state(), existingSubscription.state()); // with the replacement state successfully running, we can shut down the old viewport (unless - // something - // external retained it) + // something external retained it) existingSubscription.internalClose(); } } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java index dfecd01bdae..b7eff98e4b4 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java @@ -52,6 +52,7 @@ import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ReleaseRequest; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationRequest; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb_service.SessionServiceClient; +import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb_service.UnaryResponse; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.storage_pb_service.StorageServiceClient; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.ApplyPreviewColumnsRequest; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.EmptyTableRequest; @@ -179,7 +180,6 @@ private enum State { private List> onOpen = new ArrayList<>(); private State state; - private double killTimerCancelation; private SessionServiceClient sessionServiceClient; private TableServiceClient tableServiceClient; private ConsoleServiceClient consoleServiceClient; @@ -209,6 +209,8 @@ private enum State { private JsConsumer recordLog = pastLogs::add; private ResponseStreamWrapper logStream; + private UnaryResponse terminationStream; + private final JsSet> fieldUpdatesCallback = new JsSet<>(); private Map knownFields = new HashMap<>(); private ResponseStreamWrapper fieldsChangeUpdateStream; @@ -386,6 +388,9 @@ private boolean checkStatus(ResponseStreamWrapper.ServiceError fail) { } public boolean checkStatus(ResponseStreamWrapper.Status status) { + if (state == State.Disconnected) { + return false; + } if (status.isOk()) { // success, ignore return true; @@ -523,26 +528,31 @@ private Promise authUpdate() { } private void subscribeToTerminationNotification() { - sessionServiceClient.terminationNotification(new TerminationNotificationRequest(), metadata(), - (fail, success) -> { - if (fail != null) { - // Errors are treated like connection issues, won't signal any shutdown - if (checkStatus((ResponseStreamWrapper.ServiceError) fail)) { - // restart the termination notification - subscribeToTerminationNotification(); - } else { - info.notifyConnectionError(Js.cast(fail)); - connectionLost(); - } - return; - } - assert success != null; + terminationStream = + sessionServiceClient.terminationNotification(new TerminationNotificationRequest(), metadata(), + (fail, success) -> { + if (state == State.Disconnected) { + // already disconnected, no need to respond + return; + } + if (fail != null) { + // Errors are treated like connection issues, won't signal any shutdown + if (checkStatus((ResponseStreamWrapper.ServiceError) fail)) { + // restart the termination notification + subscribeToTerminationNotification(); + } else { + info.notifyConnectionError(Js.cast(fail)); + connectionLost(); + } + return; + } + assert success != null; - // welp; the server is gone -- let everyone know - connectionLost(); + // welp; the server is gone -- let everyone know + connectionLost(); - info.notifyServerShutdown(success); - }); + info.notifyServerShutdown(success); + }); } // @Override @@ -644,9 +654,26 @@ public void forceClose() { // explicitly mark as disconnected so reconnect isn't attempted state = State.Disconnected; + // forcibly clean up the log stream and its listeners + if (logStream != null) { + logStream.cancel(); + logStream = null; + } + pastLogs.clear(); + logCallbacks.clear(); + + // Stop server streams, will not reconnect + if (terminationStream != null) { + terminationStream.cancel(); + terminationStream = null; + } + if (exportNotifications != null) { + exportNotifications.cancel(); + exportNotifications = null; + } newSessionReconnect.disconnected(); - DomGlobal.clearTimeout(killTimerCancelation); + DomGlobal.clearTimeout(scheduledAuthUpdate); } public void setSessionTimeoutMs(double sessionTimeoutMs) { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java index 809f5ec6e1d..043ec165e5f 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/subscription/TableViewportSubscription.java @@ -238,7 +238,7 @@ public void close() { * forwarding events and optionally close the underlying table/subscription. */ public void internalClose() { - // indicate that the base table shouldn't get events any more, even if it this is still retained elsewhere + // indicate that the base table shouldn't get events anymore, even if it is still retained elsewhere originalActive = false; if (retained || status == Status.DONE) {