Skip to content

Commit

Permalink
Review points
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Nov 6, 2023
1 parent b90fa27 commit 45d1e6f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.client.impl;

import io.deephaven.client.impl.ExportRequest.Listener;
import io.deephaven.client.impl.ExportStates.State;
import io.deephaven.qst.table.TableSpec;

import java.util.Objects;
Expand Down Expand Up @@ -61,6 +62,10 @@ ExportStates exportStates() {
return state.exportStates();
}

State state() {
return state;
}

/**
* The table spec.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -143,6 +142,8 @@ private ExportServiceRequest exportRequestImpl(ExportsRequest requests) {
};
}
return new ExportServiceRequest() {
boolean sent;
boolean closed;

@Override
public List<Export> exports() {
Expand All @@ -151,27 +152,62 @@ public List<Export> exports() {

@Override
public void send() {
if (closed || sent) {
return;
}
sent = true;
// After the user has called send, all handling of state needs to be handled by the respective
// io.deephaven.client.impl.ExportRequest.listener
send.run();

}

@Override
public void close() {
lock.unlock();
if (closed) {
return;
}
closed = true;
try {
if (!sent) {
cleanupUnsent();
}
} finally {
lock.unlock();
}
}

private void cleanupUnsent() {
for (Export result : results) {
final State state = result.state();
if (newStates.containsKey(state.exportId())) {
// On brand new states that we didn't even send, we can simply remove them. We aren't
// leaking anything, but we have incremented our export id creator state.
removeImpl(state);
continue;
}
result.release();
}
}
};
}

private void release(State state) {

private void remove(State state) {
lock.lock();
try {
if (!exports.remove(state.table(), state)) {
throw new IllegalStateException("Unable to remove state");
}
removeImpl(state);
} finally {
lock.unlock();
}
}

private void removeImpl(State state) {
if (!exports.remove(state.table(), state)) {
throw new IllegalStateException("Unable to remove state");
}
}

private Optional<State> lookup(TableSpec table) {
return Optional.ofNullable(exports.get(table));
}
Expand Down Expand Up @@ -259,7 +295,7 @@ synchronized void release(Export export) {
throw new IllegalStateException("Unable to remove child");
}
if (children.isEmpty()) {
ExportStates.this.release(this);
ExportStates.this.remove(this);
released = true;
sessionStub.release(
ReleaseRequest.newBuilder().setId(ExportTicketHelper.wrapExportIdInTicket(exportId)).build(),
Expand Down

0 comments on commit 45d1e6f

Please sign in to comment.