Skip to content

Commit

Permalink
Review points
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Nov 6, 2023
1 parent ef3e4c8 commit 09a0b13
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.client;

import io.deephaven.api.TableOperations;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandle.TableHandleException;
import io.deephaven.qst.TableCreationLogic;
import io.deephaven.qst.TableCreator;
import org.junit.Test;

import java.time.Duration;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;

public class SessionBatchMixinTest extends DeephavenSessionTestBase {

@Test
public void noMixin() throws InterruptedException {
try (final TableHandle ignored = session
.batch(false)
.executeLogic((TableCreationLogic) SessionBatchMixinTest::thisIsTheMethodName)) {
failBecauseExceptionWasNotThrown(TableHandleException.class);
} catch (TableHandleException e) {
assertThat(Stream.of(e.getStackTrace()).map(StackTraceElement::getMethodName))
.doesNotContain("thisIsTheMethodName");
}
}

@Test
public void yesMixin() throws InterruptedException {
try (final TableHandle ignored = session
.batch(true)
.executeLogic((TableCreationLogic) SessionBatchMixinTest::thisIsTheMethodName)) {
failBecauseExceptionWasNotThrown(TableHandleException.class);
} catch (TableHandleException e) {
assertThat(Stream.of(e.getStackTrace()).map(StackTraceElement::getMethodName))
.contains("thisIsTheMethodName");
}
}

static <T extends TableOperations<T, T>> T thisIsTheMethodName(TableCreator<T> creation) {
T t1 = creation.timeTable(Duration.ofSeconds(1)).view("I=i");
T t2 = t1.where("I % 3 == 0").tail(3);
T t3 = t1.where("I % 5 == 0").tail(5);
T t4 = t1.where("This is not a good filter").tail(6);
return creation.merge(t2, t3, t4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import org.junit.Test;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -25,70 +27,68 @@ public class SessionTableServicesTest extends DeephavenSessionTestBase {
public static final boolean STATEFUL = true;

@Test
public void sessionIsNotStateful() throws TableHandleException, InterruptedException {
checkState(session, session, NOT_STATEFUL);
public void sessionIsNotStateful()
throws TableHandleException, InterruptedException, ExecutionException, TimeoutException {
isNotStateful(() -> session);
isNotStatefulAsync(() -> session);
}

@Test
public void distinctSerialManagerIsNotStateful() throws TableHandleException, InterruptedException {
checkState(session.serial(), session.serial(), NOT_STATEFUL);
public void sessionSerialIsNotStateful() throws TableHandleException, InterruptedException {
isNotStateful(() -> session.serial());
}

@Test
public void distinctBatchManagerIsNotStateful() throws TableHandleException, InterruptedException {
checkState(session.batch(), session.batch(), NOT_STATEFUL);
public void sessionBatchIsNotStateful() throws TableHandleException, InterruptedException {
isNotStateful(() -> session.batch());
}

@Test
public void distinctTableServicesIsNotStateful() throws TableHandleException, InterruptedException {
checkState(session.newStatefulTableService(), session.newStatefulTableService(), NOT_STATEFUL);
public void sessionNewTsIsStateful()
throws TableHandleException, InterruptedException, ExecutionException, TimeoutException {
final TableService stateful = session.newStatefulTableService();
isStateful(() -> stateful);
isStatefulAsync(() -> stateful);
}

@Test
public void distinctTableServiceAsyncsIsNotStateful()
throws InterruptedException, ExecutionException, TimeoutException {
checkAsyncState(session.newStatefulTableService(), session.newStatefulTableService(), NOT_STATEFUL);
}

// this is currently broken; serial clients *can't* reliably execute the same non-trivial TableSpec DAG
@Ignore
@Test
public void singleSerialManagerIsStateful() throws TableHandleException, InterruptedException {
final TableHandleManager manager = session.serial();
checkState(manager, manager, STATEFUL);
public void sessionNewTsSerialIsStateful() throws TableHandleException, InterruptedException {
final TableService stateful = session.newStatefulTableService();
isStateful(stateful::serial);
}

@Test
public void singleBatchManagerNotIsStateful() throws TableHandleException, InterruptedException {
final TableHandleManager manager = session.batch();
checkState(manager, manager, NOT_STATEFUL);
public void sessionNewTsBatchIsStateful() throws TableHandleException, InterruptedException {
final TableService stateful = session.newStatefulTableService();
isStateful(stateful::batch);
}

@Test
public void newStatefulTableServiceIsStateful() throws TableHandleException, InterruptedException {
final TableService ts = session.newStatefulTableService();
checkState(ts, ts, STATEFUL);
static void isStateful(Supplier<TableHandleManager> manager) throws TableHandleException, InterruptedException {
final TableHandleManager instance1 = manager.get();
final TableHandleManager instance2 = manager.get();
checkState(instance1, instance2, STATEFUL);
}

@Test
public void newStatefulTableServiceAsyncIsStateful()
static void isStatefulAsync(Supplier<TableService> manager)
throws InterruptedException, ExecutionException, TimeoutException {
final TableService ts = session.newStatefulTableService();
checkAsyncState(ts, ts, STATEFUL);
final TableService instance1 = manager.get();
final TableService instance2 = manager.get();
checkAsyncState(instance1, instance2, STATEFUL);
checkExecuteAsyncStateAtSameTime2(instance1);
}

@Test
public void newStatefulTableServiceBatchIsStateful() throws TableHandleException, InterruptedException {
final TableService ts = session.newStatefulTableService();
checkState(ts.batch(), ts.batch(), STATEFUL);
static void isNotStateful(Supplier<TableHandleManager> manager) throws TableHandleException, InterruptedException {
final TableHandleManager singleInstance = manager.get();
checkState(singleInstance, singleInstance, NOT_STATEFUL);
}

// this is currently broken; serial clients *can't* reliably execute the same non-trivial TableSpec DAG
@Ignore
@Test
public void newStatefulTableServiceSerialIsStateful() throws TableHandleException, InterruptedException {
final TableService ts = session.newStatefulTableService();
checkState(ts.serial(), ts.serial(), STATEFUL);
static void isNotStatefulAsync(Supplier<TableService> manager)
throws InterruptedException, ExecutionException, TimeoutException {
final TableService instance1 = manager.get();
final TableService instance2 = manager.get();
checkAsyncState(instance1, instance2, NOT_STATEFUL);
checkExecuteAsyncStateAtSameTime2(instance1);
}

static void checkState(TableHandleManager m1, TableHandleManager m2, boolean expectEquals)
Expand Down Expand Up @@ -168,6 +168,22 @@ private static void checkExecuteAsyncStateAtSameTime(TableService a1, TableServi
}
}

private static void checkExecuteAsyncStateAtSameTime2(TableService a1)
throws InterruptedException, ExecutionException, TimeoutException {
final TableSpec q = tableSpec();
final List<? extends TableHandleFuture> f = a1.executeAsync(List.of(q, q));
try (
final TableHandle h1 = f.get(0).getOrCancel(Duration.ofSeconds(555));
final TableHandle h2 = f.get(1).getOrCancel(Duration.ofSeconds(555))) {
assertThat(h1.exportId().toString().equals(h2.exportId().toString())).isEqualTo(true);
try (
final TableHandle h3 = h1.updateView("K=ii");
final TableHandle h4 = h2.updateView("K=ii")) {
assertThat(h3.exportId().toString().equals(h4.exportId().toString())).isEqualTo(true);
}
}
}

private static TableSpec tableSpec() {
final TableSpec t1 = TableSpec.empty(99).updateView("I=ii");
final TableSpec t2 = TableSpec.empty(99).updateView("I=ii", "J=ii");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ public class TableServiceAsyncTest extends DeephavenSessionTestBase {
public void longChainAsyncExportOnlyLast() throws ExecutionException, InterruptedException, TimeoutException {
final List<TableSpec> longChain = createLongChain();
final TableSpec longChainLast = longChain.get(longChain.size() - 1);
try (final TableHandle handle = get(session.newStatefulTableService().executeAsync(longChainLast))) {
try (final TableHandle handle = get(session.executeAsync(longChainLast))) {
checkSucceeded(handle);
}
}

@Test(timeout = 20000)
public void longChainAsyncExportAll() throws ExecutionException, InterruptedException, TimeoutException {
final List<TableSpec> longChain = createLongChain();
final List<? extends TableHandleFuture> futures = session.newStatefulTableService().executeAsync(longChain);
final List<? extends TableHandleFuture> futures = session.executeAsync(longChain);
try {
for (final TableHandleFuture future : futures) {
try (final TableHandle handle = get(future)) {
Expand All @@ -51,7 +51,7 @@ public void longChainAsyncExportAll() throws ExecutionException, InterruptedExce
public void longChainAsyncExportAllCancelAllButLast()
throws ExecutionException, InterruptedException, TimeoutException {
final List<TableSpec> longChain = createLongChain();
final List<? extends TableHandleFuture> futures = session.newStatefulTableService().executeAsync(longChain);
final List<? extends TableHandleFuture> futures = session.executeAsync(longChain);
// Cancel or close all but the last one
TableService.TableHandleFuture.cancelOrClose(futures.subList(0, futures.size() - 1), true);
try (final TableHandle lastHandle = get(futures.get(futures.size() - 1))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ enum Type {
protected void execute(Session session) throws Exception {
final Filter filter = type == Type.AND ? Filter.and(Filter.from(filters)) : Filter.or(Filter.from(filters));
final TableSpec filtered = ticket.ticketId().table().where(filter);
try (final TableHandle handle = session.newStatefulTableService().executeAsync(filtered).getOrCancel()) {
try (final TableHandle handle = session.executeAsync(filtered).getOrCancel()) {
session.publish("filter_table_results", handle).get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.client.impl.Session;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandleManager;
import io.deephaven.client.impl.TableService;
import io.deephaven.qst.table.TableSpec;
import picocli.CommandLine;
import picocli.CommandLine.ArgGroup;
Expand All @@ -32,11 +33,12 @@ protected void execute(Session session) throws Exception {
final TableSpec r = TableSpec.empty(10).select("R=random()");
final TableSpec rPlusOne = r.view("PlusOne=R + 1");
final TableSpec rMinusOne = r.view("PlusOne=R - 1");
final TableService statefulTableService = session.newStatefulTableService();
final TableHandleManager manager = mode == null
? session.newStatefulTableService()
? statefulTableService
: mode.batch
? session.batch()
: session.serial();
? statefulTableService.batch()
: statefulTableService.serial();
// noinspection unused
try (
final TableHandle hPlusOne = manager.execute(rPlusOne);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -261,7 +261,7 @@ class State {
State(TableSpec table, int exportId) {
this.table = Objects.requireNonNull(table);
this.exportId = exportId;
this.children = new CopyOnWriteArraySet<>();
this.children = new LinkedHashSet<>();
}

Session session() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;

final class FutureHelper {


static <T> T getOrCancel(Future<T> future) throws InterruptedException, ExecutionException {
try {
return future.get();
Expand Down Expand Up @@ -41,25 +41,46 @@ static <T> T getOrCancel(Future<T> future, Duration timeout)
}
}

static <T> void cancelOrConsume(Iterable<? extends Future<T>> futures, BiConsumer<T, ExecutionException> consumer,
boolean mayInterruptIfRunning) {
@FunctionalInterface
interface FutureConsumer<T> {
void accept(T result, ExecutionException e, CancellationException c);
}

/**
* Invokes {@link Future#cancel(boolean)} for each future in {@code futures}. If the cancel is not successful, the
* completed result will be passed to {@code consumer}.
*
* @param futures the futures
* @param consumer the consumer
* @param mayInterruptIfRunning {@code true} if the thread executing the task should be interrupted; otherwise,
* in-progress tasks are allowed to complete
* @param <T> The result type for the {@code futures}
*/
static <T> void cancelOrConsume(
Iterable<? extends Future<T>> futures, FutureConsumer<T> consumer, boolean mayInterruptIfRunning) {
for (Future<T> future : futures) {
if (future.cancel(mayInterruptIfRunning)) {
continue;
}
final T normal;
try {
normal = getCompleted(future, false);
} catch (ExecutionException e) {
consumer.accept(null, e);
continue;
} catch (CancellationException e) {
continue;
}
consumer.accept(normal, null);
consumeCompleted(future, consumer);
}
}

private static <T> void consumeCompleted(Future<T> future, FutureConsumer<T> consumer) {
final T result;
try {
result = getCompleted(future, false);
} catch (ExecutionException e) {
consumer.accept(null, e, null);
return;
} catch (CancellationException c) {
consumer.accept(null, null, c);
return;
}
consumer.accept(result, null, null);
}


private static <T> T getCompleted(Future<T> future, boolean interrupted) throws ExecutionException {
// We know future is done
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected ExportService exportService() {

@Override
protected TableHandle handle(TableSpec table) {
return io.deephaven.client.impl.TableServiceImpl.ofUnchecked(exportService(), table, null);
return io.deephaven.client.impl.TableServiceImpl.executeUnchecked(exportService(), table, null);
}
};
}
Expand Down Expand Up @@ -312,8 +312,9 @@ public CompletableFuture<Void> closeFuture() {

@Override
protected TableService delegate() {
// Session.execute / Session.executeAsync will create one-off TableServices for exporting
// Session.batch / Session.serial will create stateful TableHandleManagers
// This allows Session to implement an un-cached TableService.
// Each respective execution (Session.execute(), Session.executeAsync(), Session.serial().execute(), etc)
// will create new states for that specific execution.
return newStatefulTableService();
}

Expand Down Expand Up @@ -602,7 +603,7 @@ protected ExportService exportService() {

@Override
protected TableHandle handle(TableSpec table) {
return io.deephaven.client.impl.TableServiceImpl.ofUnchecked(exportService(), table, null);
return io.deephaven.client.impl.TableServiceImpl.executeUnchecked(exportService(), table, null);
}
};
}
Expand Down
Loading

0 comments on commit 09a0b13

Please sign in to comment.