From a21a1a10795604d822348c0b80284e179da89f4b Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Thu, 18 Apr 2024 13:06:40 -0700 Subject: [PATCH] Added BarrageSessionFactoryClient and application --- .../impl/BarrageFactoryBuilderModule.java | 11 +- .../client/impl/BarrageSessionModule.java | 7 ++ .../client/impl/BarrageSubcomponent.java | 11 ++ .../client/impl/DeephavenBarrageRoot.java | 14 +++ .../examples/BarrageClientExampleBase.java | 5 +- .../deephaven/client/impl/BarrageSession.java | 14 ++- .../client/impl/BarrageSessionFactory.java | 16 +++ .../client/impl/DeephavenFlightRoot.java | 14 +++ .../client/impl/FlightSessionModule.java | 6 ++ .../client/impl/FlightSubcomponent.java | 10 ++ .../DeephavenFlightSessionTestBase.java | 11 +- .../deephaven/client/examples/DoPutSpray.java | 5 +- .../client/examples/FlightExampleBase.java | 5 +- .../deephaven/client/impl/FlightSession.java | 8 ++ .../client/impl/FlightSessionFactory.java | 15 +++ .../client/DeephavenSessionRoot.java | 14 +++ .../deephaven/client/SessionImplModule.java | 30 ++++-- .../deephaven/client/SessionSubcomponent.java | 10 +- .../client/DeephavenSessionTestBase.java | 9 +- .../deephaven/client/SessionPublishTest.java | 6 +- .../client/examples/SessionExampleBase.java | 5 +- .../client/impl/ClientChannelFactory.java | 6 ++ .../deephaven/client/impl/ClientConfig.java | 21 ++++ .../deephaven/client/impl/SessionFactory.java | 16 ++- .../io/deephaven/client/impl/SessionImpl.java | 7 ++ .../client/impl/SessionImplConfig.java | 22 ++++ .../server/appmode/ApplicationInjector.java | 3 +- .../server/barrage/BarrageClientModule.java | 25 +++++ .../barrage/BarrageSessionFactoryClient.java | 100 ++++++++++++++++++ .../server/uri/BarrageTableResolver.java | 37 +------ .../test/FlightMessageRoundTripTest.java | 12 +-- 31 files changed, 405 insertions(+), 70 deletions(-) create mode 100644 server/src/main/java/io/deephaven/server/barrage/BarrageSessionFactoryClient.java diff --git a/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageFactoryBuilderModule.java b/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageFactoryBuilderModule.java index 6e400aa1176..9e232b7ea7d 100644 --- a/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageFactoryBuilderModule.java +++ b/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageFactoryBuilderModule.java @@ -6,11 +6,20 @@ import dagger.Module; import dagger.Provides; +/** + * Module that provides {@link BarrageSessionFactoryBuilder}. + */ @Module public interface BarrageFactoryBuilderModule { + /** + * Equivalent to {@code DeephavenBarrageRoot.of().factoryBuilder()}. + * + * @return the barrage session factory builder + * @see DeephavenBarrageRoot + */ @Provides static BarrageSessionFactoryBuilder providesFactoryBuilder() { - return DaggerDeephavenBarrageRoot.create().factoryBuilder(); + return DeephavenBarrageRoot.of().factoryBuilder(); } } diff --git a/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageSessionModule.java b/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageSessionModule.java index f986afc93f1..b620a8c1e65 100644 --- a/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageSessionModule.java +++ b/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageSessionModule.java @@ -8,8 +8,15 @@ import io.grpc.ManagedChannel; import org.apache.arrow.memory.BufferAllocator; +/** + * Provides {@link BarrageSession}. + */ @Module public class BarrageSessionModule { + + /** + * Delegates to {@link BarrageSession#of(SessionImpl, BufferAllocator, ManagedChannel)}. + */ @Provides public static BarrageSession newDeephavenClientSession( SessionImpl session, BufferAllocator allocator, ManagedChannel managedChannel) { diff --git a/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageSubcomponent.java b/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageSubcomponent.java index b1bc8bf3807..44c7e06f3c5 100644 --- a/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageSubcomponent.java +++ b/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/BarrageSubcomponent.java @@ -14,11 +14,22 @@ import javax.inject.Named; import java.util.concurrent.ScheduledExecutorService; +/** + * The barrage subcomponent. + * + * @see SessionImplModule + * @see FlightSessionModule + * @see BarrageSessionModule + */ @Subcomponent(modules = {SessionImplModule.class, FlightSessionModule.class, BarrageSessionModule.class}) public interface BarrageSubcomponent extends BarrageSessionFactory { + @Override BarrageSession newBarrageSession(); + @Override + ManagedChannel managedChannel(); + @Module(subcomponents = {BarrageSubcomponent.class}) interface DeephavenClientSubcomponentModule { diff --git a/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/DeephavenBarrageRoot.java b/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/DeephavenBarrageRoot.java index ac21187c6d9..4e8266c56c6 100644 --- a/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/DeephavenBarrageRoot.java +++ b/java-client/barrage-dagger/src/main/java/io/deephaven/client/impl/DeephavenBarrageRoot.java @@ -7,8 +7,22 @@ import io.deephaven.client.impl.BarrageSubcomponent.Builder; import io.deephaven.client.impl.BarrageSubcomponent.DeephavenClientSubcomponentModule; +/** + * Component for creating {@link BarrageSubcomponent}. + * + * @see DeephavenClientSubcomponentModule + */ @Component(modules = DeephavenClientSubcomponentModule.class) public interface DeephavenBarrageRoot { + /** + * Equivalent to {@code DaggerDeephavenBarrageRoot.create()}. + * + * @return the barrage root + */ + static DeephavenBarrageRoot of() { + return DaggerDeephavenBarrageRoot.create(); + } + Builder factoryBuilder(); } diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/BarrageClientExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/BarrageClientExampleBase.java index cba2ae4c048..c11d783bd21 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/BarrageClientExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/BarrageClientExampleBase.java @@ -6,7 +6,7 @@ import io.deephaven.client.impl.BarrageSession; import io.deephaven.client.impl.BarrageSessionFactory; import io.deephaven.client.impl.BarrageSubcomponent.Builder; -import io.deephaven.client.impl.DaggerDeephavenBarrageRoot; +import io.deephaven.client.impl.DeephavenBarrageRoot; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.util.SafeCloseable; @@ -55,7 +55,8 @@ public final Void call() throws Exception { .setUpdateGraph(updateGraph) .build(); - final Builder builder = DaggerDeephavenBarrageRoot.create().factoryBuilder() + final Builder builder = DeephavenBarrageRoot.of() + .factoryBuilder() .managedChannel(managedChannel) .scheduler(scheduler) .allocator(bufferAllocator); diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java index a86e8a82c56..15c592a71e0 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSession.java @@ -16,22 +16,30 @@ public class BarrageSession extends FlightSession implements BarrageSubscription.Factory, BarrageSnapshot.Factory { + /** + * Creates a barrage session. Closing the barrage session does not close {@code channel}. + * + * @param session the session + * @param incomingAllocator the incoming allocator + * @param channel the managed channel + * @return the barrage session + */ public static BarrageSession of( SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) { final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel( incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session))); - return new BarrageSession(session, client, channel); + return new BarrageSession(session, client); } public static BarrageSession create( SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) { final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel( incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session))); - return new BarrageSession(session, client, channel); + return new BarrageSession(session, client); } protected BarrageSession( - final SessionImpl session, final FlightClient client, final ManagedChannel channel) { + final SessionImpl session, final FlightClient client) { super(session, client); } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSessionFactory.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSessionFactory.java index c29b017212c..e3145073bb6 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSessionFactory.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSessionFactory.java @@ -3,6 +3,22 @@ // package io.deephaven.client.impl; +import io.grpc.ManagedChannel; + public interface BarrageSessionFactory { + + /** + * Creates a new {@link BarrageSession}. Closing the session does not close the {@link #managedChannel()}. + * + * @return the new barrage session + */ BarrageSession newBarrageSession(); + + /** + * The {@link ManagedChannel} associated with {@code this} factory. Use {@link ManagedChannel#shutdown()} when + * {@code this} factory and sessions are no longer needed. + * + * @return the managed channel + */ + ManagedChannel managedChannel(); } diff --git a/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/DeephavenFlightRoot.java b/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/DeephavenFlightRoot.java index 94866580543..4561ad8c528 100644 --- a/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/DeephavenFlightRoot.java +++ b/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/DeephavenFlightRoot.java @@ -7,8 +7,22 @@ import io.deephaven.client.impl.FlightSubcomponent.Builder; import io.deephaven.client.impl.FlightSubcomponent.FlightSubcomponentModule; +/** + * Component for creating {@link FlightSubcomponent}. + * + * @see FlightSubcomponentModule + */ @Component(modules = FlightSubcomponentModule.class) public interface DeephavenFlightRoot { + /** + * Equivalent to {@code DaggerDeephavenFlightRoot.create()}. + * + * @return the flight root + */ + static DeephavenFlightRoot of() { + return DaggerDeephavenFlightRoot.create(); + } + Builder factoryBuilder(); } diff --git a/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/FlightSessionModule.java b/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/FlightSessionModule.java index 5d536b13b44..7570c522024 100644 --- a/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/FlightSessionModule.java +++ b/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/FlightSessionModule.java @@ -8,9 +8,15 @@ import io.grpc.ManagedChannel; import org.apache.arrow.memory.BufferAllocator; +/** + * Provides {@link FlightSession}. + */ @Module public class FlightSessionModule { + /** + * Delegates to {@link FlightSession#of(SessionImpl, BufferAllocator, ManagedChannel)}. + */ @Provides public static FlightSession newFlightSession(SessionImpl session, BufferAllocator allocator, ManagedChannel managedChannel) { diff --git a/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/FlightSubcomponent.java b/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/FlightSubcomponent.java index c3abd2c6e31..5e747871419 100644 --- a/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/FlightSubcomponent.java +++ b/java-client/flight-dagger/src/main/java/io/deephaven/client/impl/FlightSubcomponent.java @@ -14,11 +14,21 @@ import javax.inject.Named; import java.util.concurrent.ScheduledExecutorService; +/** + * The flight subcomponent. + * + * @see SessionImplModule + * @see FlightSessionModule + */ @Subcomponent(modules = {SessionImplModule.class, FlightSessionModule.class}) public interface FlightSubcomponent extends FlightSessionFactory { + @Override FlightSession newFlightSession(); + @Override + ManagedChannel managedChannel(); + @Module(subcomponents = {FlightSubcomponent.class}) interface FlightSubcomponentModule { diff --git a/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTestBase.java b/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTestBase.java index 88ba50b9688..b90396929e0 100644 --- a/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTestBase.java +++ b/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTestBase.java @@ -3,7 +3,7 @@ // package io.deephaven.client; -import io.deephaven.client.impl.DaggerDeephavenFlightRoot; +import io.deephaven.client.impl.DeephavenFlightRoot; import io.deephaven.client.impl.FlightSession; import io.deephaven.server.runner.DeephavenApiServerTestBase; import io.grpc.ManagedChannel; @@ -30,8 +30,13 @@ public void setUp() throws Exception { register(channel); sessionScheduler = Executors.newScheduledThreadPool(2); bufferAllocator = new RootAllocator(); - flightSession = DaggerDeephavenFlightRoot.create().factoryBuilder().allocator(bufferAllocator) - .managedChannel(channel).scheduler(sessionScheduler).build().newFlightSession(); + flightSession = DeephavenFlightRoot.of() + .factoryBuilder() + .allocator(bufferAllocator) + .managedChannel(channel) + .scheduler(sessionScheduler) + .build() + .newFlightSession(); } @Override diff --git a/java-client/flight-examples/src/main/java/io/deephaven/client/examples/DoPutSpray.java b/java-client/flight-examples/src/main/java/io/deephaven/client/examples/DoPutSpray.java index bb4dd535afa..08fcbd468fd 100644 --- a/java-client/flight-examples/src/main/java/io/deephaven/client/examples/DoPutSpray.java +++ b/java-client/flight-examples/src/main/java/io/deephaven/client/examples/DoPutSpray.java @@ -3,7 +3,7 @@ // package io.deephaven.client.examples; -import io.deephaven.client.impl.DaggerDeephavenFlightRoot; +import io.deephaven.client.impl.DeephavenFlightRoot; import io.deephaven.client.impl.FlightSession; import io.deephaven.client.impl.TableHandle; import io.deephaven.qst.table.TicketTable; @@ -72,7 +72,8 @@ private static void close(FlightSession session) throws InterruptedException, Ex private FlightSession session(BufferAllocator bufferAllocator, ScheduledExecutorService scheduler, ManagedChannel sourceChannel) { - return DaggerDeephavenFlightRoot.create().factoryBuilder() + return DeephavenFlightRoot.of() + .factoryBuilder() .managedChannel(sourceChannel) .scheduler(scheduler) .allocator(bufferAllocator) diff --git a/java-client/flight-examples/src/main/java/io/deephaven/client/examples/FlightExampleBase.java b/java-client/flight-examples/src/main/java/io/deephaven/client/examples/FlightExampleBase.java index 7e8ea63d14d..583eb1ae6d4 100644 --- a/java-client/flight-examples/src/main/java/io/deephaven/client/examples/FlightExampleBase.java +++ b/java-client/flight-examples/src/main/java/io/deephaven/client/examples/FlightExampleBase.java @@ -3,7 +3,7 @@ // package io.deephaven.client.examples; -import io.deephaven.client.impl.DaggerDeephavenFlightRoot; +import io.deephaven.client.impl.DeephavenFlightRoot; import io.deephaven.client.impl.FlightSession; import io.deephaven.client.impl.FlightSessionFactory; import io.deephaven.client.impl.FlightSubcomponent.Builder; @@ -36,7 +36,8 @@ public final Void call() throws Exception { Runtime.getRuntime() .addShutdownHook(new Thread(() -> onShutdown(scheduler, managedChannel))); - final Builder builder = DaggerDeephavenFlightRoot.create().factoryBuilder() + final Builder builder = DeephavenFlightRoot.of() + .factoryBuilder() .managedChannel(managedChannel) .scheduler(scheduler) .allocator(bufferAllocator); diff --git a/java-client/flight/src/main/java/io/deephaven/client/impl/FlightSession.java b/java-client/flight/src/main/java/io/deephaven/client/impl/FlightSession.java index e6a7d2a4f18..35e7e8eaeee 100644 --- a/java-client/flight/src/main/java/io/deephaven/client/impl/FlightSession.java +++ b/java-client/flight/src/main/java/io/deephaven/client/impl/FlightSession.java @@ -18,6 +18,14 @@ public class FlightSession implements AutoCloseable { + /** + * Creates a flight session. Closing the flight session does not close {@code channel}. + * + * @param session the session + * @param incomingAllocator the incoming allocator + * @param channel the managed channel + * @return the flight session + */ public static FlightSession of(SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) { // Note: this pattern of FlightClient owning the ManagedChannel does not mesh well with the idea that some diff --git a/java-client/flight/src/main/java/io/deephaven/client/impl/FlightSessionFactory.java b/java-client/flight/src/main/java/io/deephaven/client/impl/FlightSessionFactory.java index e7746201f6a..31ccf870b57 100644 --- a/java-client/flight/src/main/java/io/deephaven/client/impl/FlightSessionFactory.java +++ b/java-client/flight/src/main/java/io/deephaven/client/impl/FlightSessionFactory.java @@ -3,6 +3,21 @@ // package io.deephaven.client.impl; +import io.grpc.ManagedChannel; + public interface FlightSessionFactory { + /** + * Creates a new {@link FlightSession}. Closing the session does not close the {@link #managedChannel()}. + * + * @return the new flight session + */ FlightSession newFlightSession(); + + /** + * The {@link ManagedChannel} associated with {@code this} factory. Use {@link ManagedChannel#shutdown()} when + * {@code this} factory and sessions are no longer needed. + * + * @return the managed channel + */ + ManagedChannel managedChannel(); } diff --git a/java-client/session-dagger/src/main/java/io/deephaven/client/DeephavenSessionRoot.java b/java-client/session-dagger/src/main/java/io/deephaven/client/DeephavenSessionRoot.java index 6b8c4ac8e96..40545e88fe2 100644 --- a/java-client/session-dagger/src/main/java/io/deephaven/client/DeephavenSessionRoot.java +++ b/java-client/session-dagger/src/main/java/io/deephaven/client/DeephavenSessionRoot.java @@ -7,8 +7,22 @@ import io.deephaven.client.SessionSubcomponent.Builder; import io.deephaven.client.SessionSubcomponent.SessionFactorySubcomponentModule; +/** + * Component for creating {@link SessionSubcomponent}. + * + * @see SessionFactorySubcomponentModule + */ @Component(modules = SessionFactorySubcomponentModule.class) public interface DeephavenSessionRoot { + /** + * Equivalent to {@code DaggerDeephavenSessionRoot.create()}. + * + * @return the session root + */ + static DeephavenSessionRoot of() { + return DaggerDeephavenSessionRoot.create(); + } + Builder factoryBuilder(); } diff --git a/java-client/session-dagger/src/main/java/io/deephaven/client/SessionImplModule.java b/java-client/session-dagger/src/main/java/io/deephaven/client/SessionImplModule.java index cfdba83d842..cafa7fba3c3 100644 --- a/java-client/session-dagger/src/main/java/io/deephaven/client/SessionImplModule.java +++ b/java-client/session-dagger/src/main/java/io/deephaven/client/SessionImplModule.java @@ -8,7 +8,6 @@ import dagger.Provides; import io.deephaven.client.impl.SessionImpl; import io.deephaven.client.impl.SessionImplConfig; -import io.deephaven.client.impl.SessionImplConfig.Builder; import io.deephaven.proto.DeephavenChannel; import io.deephaven.proto.DeephavenChannelImpl; import io.grpc.Channel; @@ -18,6 +17,9 @@ import javax.inject.Named; import java.util.concurrent.ScheduledExecutorService; +/** + * Provides {@link Channel}, {@link DeephavenChannel}, {@link SessionImplConfig}, and {@link SessionImpl}. + */ @Module public interface SessionImplModule { @@ -27,19 +29,29 @@ public interface SessionImplModule { @Binds DeephavenChannel bindsDeephavenChannelImpl(DeephavenChannelImpl deephavenChannelImpl); + /** + * Delegates to {@link SessionImplConfig#of(DeephavenChannel, ScheduledExecutorService, String)}. + */ @Provides - static SessionImpl session(DeephavenChannel channel, ScheduledExecutorService scheduler, + static SessionImplConfig providesSessionImplConfig( + DeephavenChannel channel, + ScheduledExecutorService scheduler, @Nullable @Named("authenticationTypeAndValue") String authenticationTypeAndValue) { - final Builder builder = SessionImplConfig.builder() - .executor(scheduler) - .channel(channel); - if (authenticationTypeAndValue != null) { - builder.authenticationTypeAndValue(authenticationTypeAndValue); - } - final SessionImplConfig config = builder.build(); + return SessionImplConfig.of(channel, scheduler, authenticationTypeAndValue); + } + + /** + * Creates a session. Equivalent to {@link SessionImplConfig#createSession()}. + * + * @param config the config + * @return the session + */ + @Provides + static SessionImpl session(SessionImplConfig config) { try { return config.createSession(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } } diff --git a/java-client/session-dagger/src/main/java/io/deephaven/client/SessionSubcomponent.java b/java-client/session-dagger/src/main/java/io/deephaven/client/SessionSubcomponent.java index 8f2cf21e401..1d477e7eba2 100644 --- a/java-client/session-dagger/src/main/java/io/deephaven/client/SessionSubcomponent.java +++ b/java-client/session-dagger/src/main/java/io/deephaven/client/SessionSubcomponent.java @@ -12,14 +12,22 @@ import javax.annotation.Nullable; import javax.inject.Named; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +/** + * The session subcomponent. + * + * @see SessionImplModule + */ @Subcomponent(modules = SessionImplModule.class) public interface SessionSubcomponent extends SessionFactory { + @Override SessionImpl newSession(); + @Override + ManagedChannel managedChannel(); + @Module(subcomponents = SessionSubcomponent.class) interface SessionFactorySubcomponentModule { diff --git a/java-client/session-dagger/src/test/java/io/deephaven/client/DeephavenSessionTestBase.java b/java-client/session-dagger/src/test/java/io/deephaven/client/DeephavenSessionTestBase.java index 5b0775ec2de..57e2259cac0 100644 --- a/java-client/session-dagger/src/test/java/io/deephaven/client/DeephavenSessionTestBase.java +++ b/java-client/session-dagger/src/test/java/io/deephaven/client/DeephavenSessionTestBase.java @@ -29,9 +29,12 @@ public void setUp() throws Exception { ManagedChannel channel = channelBuilder().build(); register(channel); sessionScheduler = Executors.newScheduledThreadPool(2); - final SessionImpl clientSessionImpl = - DaggerDeephavenSessionRoot.create().factoryBuilder().managedChannel(channel) - .scheduler(sessionScheduler).build().newSession(); + final SessionImpl clientSessionImpl = DeephavenSessionRoot.of() + .factoryBuilder() + .managedChannel(channel) + .scheduler(sessionScheduler) + .build() + .newSession(); session = clientSessionImpl; serverSessionState = Require.neqNull(server().sessionService().getSessionForToken( clientSessionImpl._hackBearerHandler().getCurrentToken()), "SessionState"); diff --git a/java-client/session-dagger/src/test/java/io/deephaven/client/SessionPublishTest.java b/java-client/session-dagger/src/test/java/io/deephaven/client/SessionPublishTest.java index aac8c25dd22..a43761d89b6 100644 --- a/java-client/session-dagger/src/test/java/io/deephaven/client/SessionPublishTest.java +++ b/java-client/session-dagger/src/test/java/io/deephaven/client/SessionPublishTest.java @@ -36,10 +36,12 @@ public void setUp() throws Exception { // Create a second client with its own channel and session. final ManagedChannel channel2 = channelBuilder().build(); register(channel2); - session2 = DaggerDeephavenSessionRoot.create().factoryBuilder() + session2 = DeephavenSessionRoot.of() + .factoryBuilder() .managedChannel(channel2) .scheduler(sessionScheduler) - .build().newSession(); + .build() + .newSession(); } @Override diff --git a/java-client/session-examples/src/main/java/io/deephaven/client/examples/SessionExampleBase.java b/java-client/session-examples/src/main/java/io/deephaven/client/examples/SessionExampleBase.java index c9a0a1dc539..70c1933dc96 100644 --- a/java-client/session-examples/src/main/java/io/deephaven/client/examples/SessionExampleBase.java +++ b/java-client/session-examples/src/main/java/io/deephaven/client/examples/SessionExampleBase.java @@ -3,7 +3,7 @@ // package io.deephaven.client.examples; -import io.deephaven.client.DaggerDeephavenSessionRoot; +import io.deephaven.client.DeephavenSessionRoot; import io.deephaven.client.SessionSubcomponent.Builder; import io.deephaven.client.impl.SessionFactory; import io.grpc.ManagedChannel; @@ -33,7 +33,8 @@ public final Void call() throws Exception { Runtime.getRuntime() .addShutdownHook(new Thread(() -> onShutdown(scheduler, managedChannel))); - final Builder builder = DaggerDeephavenSessionRoot.create().factoryBuilder() + final Builder builder = DeephavenSessionRoot.of() + .factoryBuilder() .managedChannel(managedChannel) .scheduler(scheduler); if (authenticationOptions != null) { diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/ClientChannelFactory.java b/java-client/session/src/main/java/io/deephaven/client/impl/ClientChannelFactory.java index e4831e86ab0..fa0cb0cdcfd 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/ClientChannelFactory.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/ClientChannelFactory.java @@ -6,6 +6,12 @@ import io.grpc.ManagedChannel; public interface ClientChannelFactory { + + /** + * The default client channel factory. Equivalent to {@link ChannelHelper#channel(ClientConfig)}. + * + * @return the default client channel factory. + */ static ClientChannelFactory defaultInstance() { return ChannelHelper::channel; } diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/ClientConfig.java b/java-client/session/src/main/java/io/deephaven/client/impl/ClientConfig.java index 3b746beaa3a..fb75b7e142f 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/ClientConfig.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/ClientConfig.java @@ -58,6 +58,27 @@ public int maxInboundMessageSize() { return DEFAULT_MAX_INBOUND_MESSAGE_SIZE; } + /** + * Returns {@code this} if {@code sslConfig} equals {@link #ssl()}, otherwise creates a new client config with + * {@link #ssl()} as {@code sslConfig}. + * + * @param sslConfig the new SSL config + * @return the client config + */ + public ClientConfig withSsl(SSLConfig sslConfig) { + if (sslConfig.equals(ssl().orElse(null))) { + return this; + } + final Builder builder = builder() + .target(target()) + .ssl(sslConfig) + .putAllExtraHeaders(extraHeaders()) + .maxInboundMessageSize(maxInboundMessageSize()); + userAgent().ifPresent(builder::userAgent); + overrideAuthority().ifPresent(builder::overrideAuthority); + return builder.build(); + } + public interface Builder { Builder target(DeephavenTarget target); diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/SessionFactory.java b/java-client/session/src/main/java/io/deephaven/client/impl/SessionFactory.java index 1fe25ed84b0..77edf4840b9 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/SessionFactory.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/SessionFactory.java @@ -3,7 +3,21 @@ // package io.deephaven.client.impl; -public interface SessionFactory { +import io.grpc.ManagedChannel; +public interface SessionFactory { + /** + * Creates a new {@link Session}. Closing the session does not close the {@link #managedChannel()}. + * + * @return the new session + */ Session newSession(); + + /** + * The {@link ManagedChannel} associated with {@code this} factory. Use {@link ManagedChannel#shutdown()} when + * {@code this} factory and sessions are no longer needed. + * + * @return the managed channel + */ + ManagedChannel managedChannel(); } diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java index a72e0dacddc..4a6eb7a68c5 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImpl.java @@ -60,6 +60,13 @@ public final class SessionImpl extends SessionBase { private static final Logger log = LoggerFactory.getLogger(SessionImpl.class); + /** + * Creates a session. Closing the session does not close the underlying channel. + * + * @param config the config + * @return the session + * @throws InterruptedException if the thread is interrupted + */ public static SessionImpl create(SessionImplConfig config) throws InterruptedException { final Authentication authentication = Authentication.authenticate(config.channel(), config.authenticationTypeAndValue()); diff --git a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImplConfig.java b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImplConfig.java index 8111448a891..80463713810 100644 --- a/java-client/session/src/main/java/io/deephaven/client/impl/SessionImplConfig.java +++ b/java-client/session/src/main/java/io/deephaven/client/impl/SessionImplConfig.java @@ -8,6 +8,8 @@ import org.immutables.value.Value.Default; import org.immutables.value.Value.Immutable; +import javax.annotation.Nullable; +import javax.inject.Named; import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; @@ -26,6 +28,19 @@ public static Builder builder() { return ImmutableSessionImplConfig.builder(); } + public static SessionImplConfig of( + DeephavenChannel channel, + ScheduledExecutorService scheduler, + @Nullable @Named("authenticationTypeAndValue") String authenticationTypeAndValue) { + final Builder builder = SessionImplConfig.builder() + .executor(scheduler) + .channel(channel); + if (authenticationTypeAndValue != null) { + builder.authenticationTypeAndValue(authenticationTypeAndValue); + } + return builder.build(); + } + public abstract ScheduledExecutorService executor(); public abstract DeephavenChannel channel(); @@ -81,6 +96,13 @@ public Duration closeTimeout() { return Duration.parse(System.getProperty(DEEPHAVEN_SESSION_CLOSE_TIMEOUT, "PT5s")); } + /** + * Equivalent to {@code SessionImpl.create(this)}. + * + * @return the session + * @throws InterruptedException if the thread is interrupted + * @see SessionImpl#create(SessionImplConfig) + */ public final SessionImpl createSession() throws InterruptedException { return SessionImpl.create(this); } diff --git a/server/src/main/java/io/deephaven/server/appmode/ApplicationInjector.java b/server/src/main/java/io/deephaven/server/appmode/ApplicationInjector.java index 39c9c8b037c..51b5cb0e533 100644 --- a/server/src/main/java/io/deephaven/server/appmode/ApplicationInjector.java +++ b/server/src/main/java/io/deephaven/server/appmode/ApplicationInjector.java @@ -102,7 +102,8 @@ private void loadApplicationFactory(ApplicationState.Factory factory) { app = factory.create(applicationListener); } int numExports = app.listFields().size(); - log.info().append("\tfound ").append(numExports).append(" exports").endl(); + log.info().append("\tapp.id=").append(app.id()).append(", found ").append(numExports).append(" exports") + .endl(); ticketResolver.onApplicationLoad(app); } } diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageClientModule.java b/server/src/main/java/io/deephaven/server/barrage/BarrageClientModule.java index 2fecf1275aa..ef0648d6794 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageClientModule.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageClientModule.java @@ -3,8 +3,11 @@ // package io.deephaven.server.barrage; +import dagger.Binds; import dagger.Module; import dagger.Provides; +import dagger.multibindings.IntoSet; +import io.deephaven.appmode.ApplicationState; import io.deephaven.client.impl.BarrageFactoryBuilderModule; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -13,18 +16,40 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +/** + * Provides a singleton {@link BufferAllocator}, a singleton {@link ScheduledExecutorService}, and binds + * {@link BarrageSessionFactoryClient.Application}. + */ @Module(includes = BarrageFactoryBuilderModule.class) public interface BarrageClientModule { + /** + * Equivalent to {@code new RootAllocator()}. + * + * @see RootAllocator + */ @Provides @Singleton static BufferAllocator providesAllocator() { return new RootAllocator(); } + /** + * Equivalent to {@code Executors.newScheduledThreadPool(4)}. + * + * @see Executors#newScheduledThreadPool(int) + */ @Provides @Singleton static ScheduledExecutorService providesScheduler() { return Executors.newScheduledThreadPool(4); } + + /** + * Binds {@link BarrageSessionFactoryClient.Application}. + */ + @Binds + @IntoSet + ApplicationState.Factory bindBarrageSessionFactoryClientApplication( + BarrageSessionFactoryClient.Application application); } diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageSessionFactoryClient.java b/server/src/main/java/io/deephaven/server/barrage/BarrageSessionFactoryClient.java new file mode 100644 index 00000000000..c2ae7a6decd --- /dev/null +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageSessionFactoryClient.java @@ -0,0 +1,100 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.server.barrage; + +import io.deephaven.appmode.ApplicationState; +import io.deephaven.appmode.ApplicationState.Factory; +import io.deephaven.appmode.ApplicationState.Listener; +import io.deephaven.client.impl.BarrageSessionFactory; +import io.deephaven.client.impl.BarrageSessionFactoryBuilder; +import io.deephaven.client.impl.ClientChannelFactory; +import io.deephaven.client.impl.ClientConfig; +import io.deephaven.ssl.config.SSLConfig; +import org.apache.arrow.memory.BufferAllocator; + +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.inject.Named; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; + +public final class BarrageSessionFactoryClient { + + private final BarrageSessionFactoryBuilder factoryBuilder; + private final ScheduledExecutorService scheduler; + private final BufferAllocator allocator; + private final ClientChannelFactory clientChannelFactory; + private final SSLConfig sslConfig; + + @Inject + public BarrageSessionFactoryClient( + BarrageSessionFactoryBuilder factoryBuilder, + ScheduledExecutorService scheduler, + BufferAllocator allocator, + @Named("client.sslConfig") SSLConfig sslConfig, + ClientChannelFactory clientChannelFactory) { + this.factoryBuilder = Objects.requireNonNull(factoryBuilder); + this.scheduler = Objects.requireNonNull(scheduler); + this.allocator = Objects.requireNonNull(allocator); + this.sslConfig = Objects.requireNonNull(sslConfig); + this.clientChannelFactory = Objects.requireNonNull(clientChannelFactory); + } + + /** + * Creates a {@link BarrageSessionFactory}. If {@code clientConfig} does not specify {@link ClientConfig#ssl()} and + * the target is secure, a {@code clientConfig} with {@code sslConfig} will be used. Equivalent to + * + *
+     * factoryBuilder
+     *         .managedChannel(clientChannelFactory.create(clientConfig))
+     *         .scheduler(scheduler)
+     *         .allocator(allocator)
+     *         .authenticationTypeAndValue(authenticationTypeAndValue)
+     *         .build()
+     * 
+ * + * @param clientConfig the client configuration + * @param authenticationTypeAndValue the authentication type and value + * @return the barrage session factory + */ + public BarrageSessionFactory factory(ClientConfig clientConfig, @Nullable String authenticationTypeAndValue) { + final ClientConfig config; + if (clientConfig.ssl().isEmpty() && clientConfig.target().isSecure()) { + config = clientConfig.withSsl(sslConfig); + } else { + config = clientConfig; + } + return factoryBuilder + .managedChannel(clientChannelFactory.create(config)) + .scheduler(scheduler) + .allocator(allocator) + .authenticationTypeAndValue(authenticationTypeAndValue) + .build(); + } + + /** + * Provides an application id as {@link BarrageSessionFactoryClient} class name. A + * {@link BarrageSessionFactoryClient} is set as the field name {@value INSTANCE}. + */ + public static final class Application implements Factory { + + public static final String INSTANCE = "instance"; + + private final BarrageSessionFactoryClient barrageSessionFactoryClient; + + @Inject + public Application(BarrageSessionFactoryClient barrageSessionFactoryClient) { + this.barrageSessionFactoryClient = Objects.requireNonNull(barrageSessionFactoryClient); + } + + @Override + public ApplicationState create(Listener appStateListener) { + final ApplicationState state = + new ApplicationState(appStateListener, BarrageSessionFactoryClient.class.getName(), + Application.class.getSimpleName()); + state.setField(INSTANCE, barrageSessionFactoryClient); + return state; + } + } +} diff --git a/server/src/main/java/io/deephaven/server/uri/BarrageTableResolver.java b/server/src/main/java/io/deephaven/server/uri/BarrageTableResolver.java index b70a4199f43..40fd9b915bd 100644 --- a/server/src/main/java/io/deephaven/server/uri/BarrageTableResolver.java +++ b/server/src/main/java/io/deephaven/server/uri/BarrageTableResolver.java @@ -13,7 +13,7 @@ import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.qst.table.TableSpec; import io.deephaven.qst.table.TicketTable; -import io.deephaven.ssl.config.SSLConfig; +import io.deephaven.server.barrage.BarrageSessionFactoryClient; import io.deephaven.uri.ApplicationUri; import io.deephaven.uri.DeephavenTarget; import io.deephaven.uri.DeephavenUri; @@ -23,18 +23,14 @@ import io.deephaven.uri.StructuredUri.Visitor; import io.deephaven.uri.resolver.UriResolver; import io.deephaven.uri.resolver.UriResolversInstance; -import io.grpc.ManagedChannel; -import org.apache.arrow.memory.BufferAllocator; import javax.inject.Inject; -import javax.inject.Named; import javax.inject.Singleton; import java.net.URI; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; /** * The barrage table resolver is able to resolve {@link RemoteUri remote URIs} into {@link Table tables}. @@ -70,25 +66,12 @@ public static BarrageTableResolver get() { return UriResolversInstance.get().find(BarrageTableResolver.class).get(); } - private final BarrageSessionFactoryBuilder builder; - private final ScheduledExecutorService executor; - private final BufferAllocator allocator; - private final SSLConfig sslConfig; - private final ClientChannelFactory clientChannelFactory; + private final BarrageSessionFactoryClient sessionFactoryClient; private final Map sessions; @Inject - public BarrageTableResolver( - BarrageSessionFactoryBuilder builder, - ScheduledExecutorService executor, - BufferAllocator allocator, - @Named("client.sslConfig") SSLConfig sslConfig, - ClientChannelFactory clientChannelFactory) { - this.builder = Objects.requireNonNull(builder); - this.executor = Objects.requireNonNull(executor); - this.allocator = Objects.requireNonNull(allocator); - this.sslConfig = Objects.requireNonNull(sslConfig); - this.clientChannelFactory = Objects.requireNonNull(clientChannelFactory); + public BarrageTableResolver(BarrageSessionFactoryClient sessionFactoryClient) { + this.sessionFactoryClient = Objects.requireNonNull(sessionFactoryClient); this.sessions = new ConcurrentHashMap<>(); } @@ -310,22 +293,12 @@ private BarrageSession newSession(DeephavenTarget target) { return newSession(ClientConfig.builder() .target(target) .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) - .ssl(sslConfig) .build()); } private BarrageSession newSession(ClientConfig config) { - return newSession(clientChannelFactory.create(config)); - } - - private BarrageSession newSession(ManagedChannel channel) { // TODO(deephaven-core#3421): DH URI / BarrageTableResolver authentication support - return builder - .allocator(allocator) - .managedChannel(channel) - .scheduler(executor) - .build() - .newBarrageSession(); + return sessionFactoryClient.factory(config, null).newBarrageSession(); } static class RemoteResolver implements Visitor { diff --git a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java index 94bdea7ad1f..4273e42b9ee 100644 --- a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java +++ b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java @@ -272,12 +272,12 @@ public void onCallCompleted(CallStatus status) {} .build(); clientScheduler = Executors.newSingleThreadScheduledExecutor(); - FlightSessionFactory flightSessionFactory = - DaggerDeephavenFlightRoot.create().factoryBuilder() - .managedChannel(clientChannel) - .scheduler(clientScheduler) - .allocator(new RootAllocator()) - .build(); + FlightSessionFactory flightSessionFactory = DeephavenFlightRoot.of() + .factoryBuilder() + .managedChannel(clientChannel) + .scheduler(clientScheduler) + .allocator(new RootAllocator()) + .build(); clientSession = flightSessionFactory.newFlightSession(); }