Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Clean up how objects are managed when they are fetched #4909

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Promise<JsPartitionedTable> refetch() {
this.columns = JsObject.freeze(columns);
this.keyColumns = JsObject.freeze(keyColumns);

return w.getExportedObjects()[0].fetch();
return w.getExportedObjects()[0].fetch(true);
}).then(result -> {
keys = (JsTable) result;
// TODO(deephaven-core#3604) in case of a new session, we should do a full refetch
Expand Down Expand Up @@ -308,6 +308,8 @@ public Promise<JsTable> getKeyTable() {
* will not affect tables in use.
*/
public void close() {
widget.close();

if (keys != null) {
keys.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,14 +759,68 @@ public Promise<JsTable> getTable(JsVariableDefinition varDef, @Nullable Boolean
});
}

/**
* Export a table from the provided ticket
mofojed marked this conversation as resolved.
Show resolved Hide resolved
*
* @param ticket Ticket of the table to export
* @param fetchSummary Description for the fetch. Used for logging and errors.
* @return Promise for the table
*/
public Promise<JsTable> getTable(TypedTicket ticket, String fetchSummary) {
mofojed marked this conversation as resolved.
Show resolved Hide resolved
return Callbacks.<ExportedTableCreationResponse, Object>grpcUnaryPromise(c -> {
tableServiceClient().getExportedTableCreationResponse(ticket.getTicket(),
metadata(),
c::apply);
}).then(etcr -> {
ClientTableState cts = newStateFromUnsolicitedTable(etcr, fetchSummary);
JsTable table = new JsTable(this, cts);
return Promise.resolve(table);
});
}

/**
* Get an object from it's ticket
mofojed marked this conversation as resolved.
Show resolved Hide resolved
*
* @param typedTicket Ticket to get the object for
* @return The object requested
*/
public Promise<?> getObject(TypedTicket typedTicket) {
if (JsVariableType.TABLE.equalsIgnoreCase(typedTicket.getType())) {
return getTable(typedTicket, "get a table");
} else if (JsVariableType.FIGURE.equalsIgnoreCase(typedTicket.getType())) {
return getFigure(typedTicket);
} else if (JsVariableType.PANDAS.equalsIgnoreCase(typedTicket.getType())) {
return getPandasTable(typedTicket);
} else if (JsVariableType.PARTITIONEDTABLE.equalsIgnoreCase(typedTicket.getType())) {
return getPartitionedTable(typedTicket);
} else if (JsVariableType.HIERARCHICALTABLE.equalsIgnoreCase(typedTicket.getType())) {
return getHierarchicalTable(typedTicket);
} else {
if (JsVariableType.TABLEMAP.equalsIgnoreCase(typedTicket.getType())) {
JsLog.warn(
"TableMap is now known as PartitionedTable, fetching as a plain widget. To fetch as a PartitionedTable use that as the type.");
}
if (JsVariableType.TREETABLE.equalsIgnoreCase(typedTicket.getType())) {
JsLog.warn(
"TreeTable is now HierarchicalTable, fetching as a plain widget. To fetch as a HierarchicalTable use that as this type.");
}
return getWidget(typedTicket, true);
}
}

/**
* Retrieve an object from it's definition
mofojed marked this conversation as resolved.
Show resolved Hide resolved
*
* @param definition Variable definition to get
* @return The object requested
*/
public Promise<?> getObject(JsVariableDefinition definition) {
if (JsVariableType.TABLE.equalsIgnoreCase(definition.getType())) {
return getTable(definition, null);
} else if (JsVariableType.FIGURE.equalsIgnoreCase(definition.getType())) {
return getFigure(definition);
} else if (JsVariableType.PANDAS.equalsIgnoreCase(definition.getType())) {
return getWidget(definition)
.then(widget -> widget.getExportedObjects()[0].fetch());
return getPandasTable(definition);
} else if (JsVariableType.PARTITIONEDTABLE.equalsIgnoreCase(definition.getType())) {
return getPartitionedTable(definition);
} else if (JsVariableType.HIERARCHICALTABLE.equalsIgnoreCase(definition.getType())) {
Expand Down Expand Up @@ -907,38 +961,84 @@ private TicketAndPromise<?> exportScopeTicket(JsVariableDefinition varDef) {

public Promise<JsPartitionedTable> getPartitionedTable(JsVariableDefinition varDef) {
return whenServerReady("get a partitioned table")
.then(server -> getWidget(varDef))
.then(ignore -> getWidget(varDef, false))
.then(widget -> new JsPartitionedTable(this, widget).refetch());
}

public Promise<JsTreeTable> getTreeTable(JsVariableDefinition varDef) {
return getWidget(varDef).then(w -> Promise.resolve(new JsTreeTable(this, w)));
public Promise<JsPartitionedTable> getPartitionedTable(TypedTicket ticket) {
return whenServerReady("get a partitioned table")
.then(ignore -> new JsPartitionedTable(this, new JsWidget(this, ticket)).refetch());
}

public Promise<JsTreeTable> getHierarchicalTable(JsVariableDefinition varDef) {
return getWidget(varDef).then(w -> Promise.resolve(new JsTreeTable(this, w)));
return whenServerReady("get a hierarchical table")
.then(ignore -> getWidget(varDef))
.then(widget -> Promise.resolve(new JsTreeTable(this, widget)));
}

public Promise<JsTreeTable> getHierarchicalTable(TypedTicket typedTicket) {
return whenServerReady("get a hierarchical table")
.then(ignore -> getWidget(typedTicket, true))
.then(widget -> Promise.resolve(new JsTreeTable(this, widget)));
}

private JsFigure.FigureFetch getWidgetFigureFetch(Promise<JsWidget> widgetPromise) {
return c -> widgetPromise.then(widget -> {
FetchObjectResponse legacyResponse = new FetchObjectResponse();
legacyResponse.setData(widget.getDataAsU8());
legacyResponse.setType(widget.getType());
legacyResponse.setTypedExportIdsList(
Arrays.stream(widget.getExportedObjects()).map(JsWidgetExportedObject::takeTicket)
.toArray(TypedTicket[]::new));
c.apply(null, legacyResponse);
widget.close();
return null;
}, error -> {
c.apply(error, null);
return null;
});
}

public Promise<JsFigure> getFigure(JsVariableDefinition varDef) {
if (!varDef.getType().equalsIgnoreCase("Figure")) {
throw new IllegalArgumentException("Can't load as a figure: " + varDef.getType());
}
private Promise<JsFigure> getFigure(Promise<JsWidget> widgetPromise) {
return whenServerReady("get a figure")
.then(server -> new JsFigure(this,
c -> {
getWidget(varDef).then(widget -> {
FetchObjectResponse legacyResponse = new FetchObjectResponse();
legacyResponse.setData(widget.getDataAsU8());
legacyResponse.setType(widget.getType());
legacyResponse.setTypedExportIdsList(Arrays.stream(widget.getExportedObjects())
.map(JsWidgetExportedObject::typedTicket).toArray(TypedTicket[]::new));
c.apply(null, legacyResponse);
return null;
}, error -> {
c.apply(error, null);
return null;
});
}).refetch());
.then(server -> new JsFigure(this, getWidgetFigureFetch(widgetPromise)).refetch());

}

public Promise<JsFigure> getFigure(JsVariableDefinition varDef) {
return getFigure(getWidget(varDef));
}

public Promise<JsFigure> getFigure(TypedTicket ticket) {
return getFigure(getWidget(ticket, true));
}

/**
* Gets a Pandas Table from the Pandas table widget provided. Closes the widget when done.
*
* @param widget Pandas table widget
* @return Table representing the Pandas data frame
*/
private Promise<JsTable> getPandasTable(JsWidget widget) {
TypedTicket typedTicket = widget.getExportedObjects()[0].takeTicket();

// We don't need to keep the original widget open, we can just close it right away
widget.close();

return getTable(typedTicket, "table for pandas").then(table -> {
// TODO(deephaven-core#3604) if using a new session don't attempt a reconnect
// never attempt a reconnect, since we might have a different widget schema entirely
table.addEventListener(JsTable.EVENT_DISCONNECT, ignore -> table.close());
return Promise.resolve(table);
});
mofojed marked this conversation as resolved.
Show resolved Hide resolved
}

public Promise<JsTable> getPandasTable(JsVariableDefinition definition) {
return getWidget(definition).then(this::getPandasTable);
}

public Promise<JsTable> getPandasTable(TypedTicket ticket) {
return getWidget(ticket, true).then(this::getPandasTable);
}

private TypedTicket createTypedTicket(JsVariableDefinition varDef) {
Expand All @@ -948,19 +1048,53 @@ private TypedTicket createTypedTicket(JsVariableDefinition varDef) {
return typedTicket;
}


/**
* Retrieve a widget using a variable definition. Creates a new export ticket to reference the variable.
* Intentionally races the ticket export with the variable export to parallelize the requests.
*
* @param varDef Variable definition to get the widget for
* @return Promise of the widget
*/
public Promise<JsWidget> getWidget(JsVariableDefinition varDef) {
return getWidget(varDef, true);
}

/**
* Retrieve a widget using a variable definition. Creates a new export ticket to reference the variable.
* Intentionally races the ticket export with the variable export to parallelize the requests.
*
* @param varDef Variable definition to get the widget for
* @param refetch Whether to refetch the widget before resolving or not
* @return Promise of the widget
*/
public Promise<JsWidget> getWidget(JsVariableDefinition varDef, boolean refetch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of "refetch" could we use something like "export" or "re-export"? Most of the time "Fetch" means to get the object from the server (usually by name), but this isn't doing that again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refetch in this case is triggering widget.refetch(), which was already named... but essentially is restarting the stream and fetching the data. I think the reexport name probably doesn't fit here either, and refetch makes more sense (or JsWidget.refetch should be renamed?). Thoughts?

return exportScopeTicket(varDef)
.race(ticket -> {
TypedTicket typedTicket = new TypedTicket();
typedTicket.setType(varDef.getType());
typedTicket.setTicket(ticket);
return getWidget(typedTicket);
return getWidget(typedTicket, refetch);
}).promise();
}

public Promise<JsWidget> getWidget(TypedTicket typedTicket) {
/**
* Retrieve a widget using a ticket. Does not create a new ticket.
*
* @param typedTicket Ticket to fetch
* @param refetch Whether to refetch the widget. If false, you must ensure you call {@JsWidget#refetch()} before
mofojed marked this conversation as resolved.
Show resolved Hide resolved
* using the widget.
* @return Promise of the widget
*/
public Promise<JsWidget> getWidget(TypedTicket typedTicket, boolean refetch) {
mofojed marked this conversation as resolved.
Show resolved Hide resolved
return whenServerReady("get a widget")
.then(response -> new JsWidget(this, typedTicket).refetch());
.then(response -> Promise.resolve(new JsWidget(this, typedTicket)))
.then(widget -> {
if (refetch) {
return widget.refetch();
}
return Promise.resolve(widget);
});
mofojed marked this conversation as resolved.
Show resolved Hide resolved
}

public void registerSimpleReconnectable(HasLifecycle figure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,8 +979,7 @@ public void close() {

connection.unregisterSimpleReconnectable(this);

// Presently it is never necessary to release widget tickets, since they can't be export tickets.
// connection.releaseTicket(widget.getTicket());
connection.releaseTicket(widget.getTicket());
mofojed marked this conversation as resolved.
Show resolved Hide resolved

if (filteredTable != null) {
filteredTable.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,26 @@ private void closeStream() {
hasFetched = false;
}

/**
* Also closes all the exported objects that the widget still owns
mofojed marked this conversation as resolved.
Show resolved Hide resolved
*/
private void closeExportedObjects() {
for (int i = 0; i < exportedObjects.length; i++) {
JsWidgetExportedObject exportedObject = exportedObjects.getAt(i);
if (exportedObject.isOwner()) {
exportedObject.close();
}
}
}

/**
* Ends the client connection to the server.
*/
@JsMethod
public void close() {
suppressEvents();
closeStream();
closeExportedObjects();
mofojed marked this conversation as resolved.
Show resolved Hide resolved
connection.releaseTicket(getTicket());
}

Expand All @@ -102,7 +115,6 @@ public Promise<JsWidget> refetch() {

messageStream = streamFactory.get();
messageStream.onData(res -> {

JsArray<JsWidgetExportedObject> responseObjects = res.getData().getExportedReferencesList()
.map((p0, p1, p2) -> new JsWidgetExportedObject(connection, p0));
if (!hasFetched) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@
import com.vertispan.tsdefs.annotations.TsInterface;
import com.vertispan.tsdefs.annotations.TsName;
import elemental2.promise.Promise;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.ExportedTableCreationResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.TypedTicket;
import io.deephaven.web.client.api.Callbacks;
import io.deephaven.web.client.api.JsTable;
import io.deephaven.web.client.api.ServerObject;
import io.deephaven.web.client.api.WorkerConnection;
import io.deephaven.web.client.api.console.JsVariableDefinition;
import io.deephaven.web.client.api.console.JsVariableType;
import io.deephaven.web.client.state.ClientTableState;
import jsinterop.annotations.JsMethod;
import jsinterop.annotations.JsOptional;
import jsinterop.annotations.JsProperty;

/**
Expand All @@ -29,6 +25,9 @@ public class JsWidgetExportedObject implements ServerObject {

private final TypedTicket ticket;

/** Whether this exported object instance is the owner of the ticket or not. */
private boolean isOwner = true;

public JsWidgetExportedObject(WorkerConnection connection, TypedTicket ticket) {
this.connection = connection;
this.ticket = ticket;
Expand All @@ -47,24 +46,46 @@ public TypedTicket typedTicket() {
return typedTicket;
}

@JsMethod
public boolean isOwner() {
return isOwner;
}

public Promise<?> fetch() {
if (getType().equals(JsVariableType.TABLE)) {
return Callbacks.<ExportedTableCreationResponse, Object>grpcUnaryPromise(c -> {
connection.tableServiceClient().getExportedTableCreationResponse(ticket.getTicket(),
connection.metadata(),
c::apply);
}).then(etcr -> {
ClientTableState cts = connection.newStateFromUnsolicitedTable(etcr, "table for widget");
JsTable table = new JsTable(connection, cts);
// never attempt a reconnect, since we might have a different widget schema entirely
table.addEventListener(JsTable.EVENT_DISCONNECT, ignore -> table.close());
return Promise.resolve(table);
});
} else {
return this.connection.getObject(
new JsVariableDefinition(ticket.getType(), null, ticket.getTicket().getTicket_asB64(), null));
return fetch(false);
}

private void verifyIsOwner() {
if (!isOwner) {
throw new IllegalStateException("Exported object is no longer the owner of this ticket.");
}
}

/**
* Takes ownership of this exported object. Returns the ticket for this object.
*/
public TypedTicket takeTicket() {
verifyIsOwner();
isOwner = false;
return typedTicket();
}

/**
* Fetches the object from the server.
*
* @param takeOwnership Whether to take ownership of the object. Defaults to false.
*
mofojed marked this conversation as resolved.
Show resolved Hide resolved
* @return a promise that resolves to the object
*/
@JsMethod
public Promise<?> fetch(@JsOptional Boolean takeOwnership) {
if (takeOwnership == null) {
takeOwnership = false;
}
if (takeOwnership) {
return this.connection.getObject(takeTicket());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means that raced calls to fetch(true) and fetch(false) are going to intermittently fail - probably need some clearer docs on this and other general usage of ownership.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was debating just having a separate method take instead of making it a boolean.
Really you should only be calling one or the other (true or false) depending on how you're managing the object, I wouldn't expect you'd want to call both.
In any case, I can add a verifyIsOwner() here in the false case, and throw if it's been called after a fetch(false) so it reduces a race that way, but you could still race fetch(false) then fetch(true) I suppose.

}
return this.connection.getObject(
new JsVariableDefinition(ticket.getType(), null, ticket.getTicket().getTicket_asB64(), null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised this works - VariableDefinition is meant for scope tickets...

Will think on it more, but I think I'd prefer we decompose this (or provide an interface and second impl) to let this be reexported-and-fetched.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that was already there. Looks like I added this back when implementing JsWidget: #1888 (comment)
I may have misinterpreted that comment then.
Should be able to refactor the WorkerConnection#getObject methods; the only part of the JsVariableDefinition that is actually used is the type and the id used in exportScopeTicket

}

/**
Expand All @@ -73,6 +94,7 @@ public Promise<?> fetch() {
*/
@JsMethod
public void close() {
verifyIsOwner();
connection.releaseTicket(ticket.getTicket());
}
}
Loading
Loading