Skip to content

Commit

Permalink
Added BarrageSessionFactoryClient and application
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Apr 18, 2024
1 parent 0beb368 commit a21a1a1
Show file tree
Hide file tree
Showing 31 changed files with 405 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@
import dagger.Module;
import dagger.Provides;

/**
* Module that provides {@link BarrageSessionFactoryBuilder}.
*/
@Module
public interface BarrageFactoryBuilderModule {

/**
* Equivalent to {@code DeephavenBarrageRoot.of().factoryBuilder()}.
*
* @return the barrage session factory builder
* @see DeephavenBarrageRoot
*/
@Provides
static BarrageSessionFactoryBuilder providesFactoryBuilder() {
return DaggerDeephavenBarrageRoot.create().factoryBuilder();
return DeephavenBarrageRoot.of().factoryBuilder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@
import io.grpc.ManagedChannel;
import org.apache.arrow.memory.BufferAllocator;

/**
* Provides {@link BarrageSession}.
*/
@Module
public class BarrageSessionModule {

/**
* Delegates to {@link BarrageSession#of(SessionImpl, BufferAllocator, ManagedChannel)}.
*/
@Provides
public static BarrageSession newDeephavenClientSession(
SessionImpl session, BufferAllocator allocator, ManagedChannel managedChannel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,22 @@
import javax.inject.Named;
import java.util.concurrent.ScheduledExecutorService;

/**
* The barrage subcomponent.
*
* @see SessionImplModule
* @see FlightSessionModule
* @see BarrageSessionModule
*/
@Subcomponent(modules = {SessionImplModule.class, FlightSessionModule.class, BarrageSessionModule.class})
public interface BarrageSubcomponent extends BarrageSessionFactory {

@Override
BarrageSession newBarrageSession();

@Override
ManagedChannel managedChannel();

@Module(subcomponents = {BarrageSubcomponent.class})
interface DeephavenClientSubcomponentModule {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,22 @@
import io.deephaven.client.impl.BarrageSubcomponent.Builder;
import io.deephaven.client.impl.BarrageSubcomponent.DeephavenClientSubcomponentModule;

/**
* Component for creating {@link BarrageSubcomponent}.
*
* @see DeephavenClientSubcomponentModule
*/
@Component(modules = DeephavenClientSubcomponentModule.class)
public interface DeephavenBarrageRoot {

/**
* Equivalent to {@code DaggerDeephavenBarrageRoot.create()}.
*
* @return the barrage root
*/
static DeephavenBarrageRoot of() {
return DaggerDeephavenBarrageRoot.create();
}

Builder factoryBuilder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.deephaven.client.impl.BarrageSession;
import io.deephaven.client.impl.BarrageSessionFactory;
import io.deephaven.client.impl.BarrageSubcomponent.Builder;
import io.deephaven.client.impl.DaggerDeephavenBarrageRoot;
import io.deephaven.client.impl.DeephavenBarrageRoot;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.util.SafeCloseable;
Expand Down Expand Up @@ -55,7 +55,8 @@ public final Void call() throws Exception {
.setUpdateGraph(updateGraph)
.build();

final Builder builder = DaggerDeephavenBarrageRoot.create().factoryBuilder()
final Builder builder = DeephavenBarrageRoot.of()
.factoryBuilder()
.managedChannel(managedChannel)
.scheduler(scheduler)
.allocator(bufferAllocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,30 @@

public class BarrageSession extends FlightSession implements BarrageSubscription.Factory, BarrageSnapshot.Factory {

/**
* Creates a barrage session. Closing the barrage session does <b>not</b> close {@code channel}.
*
* @param session the session
* @param incomingAllocator the incoming allocator
* @param channel the managed channel
* @return the barrage session
*/
public static BarrageSession of(
SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) {
final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel(
incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session)));
return new BarrageSession(session, client, channel);
return new BarrageSession(session, client);
}

public static BarrageSession create(
SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) {
final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel(
incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session)));
return new BarrageSession(session, client, channel);
return new BarrageSession(session, client);
}

protected BarrageSession(
final SessionImpl session, final FlightClient client, final ManagedChannel channel) {
final SessionImpl session, final FlightClient client) {
super(session, client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
//
package io.deephaven.client.impl;

import io.grpc.ManagedChannel;

public interface BarrageSessionFactory {

/**
* Creates a new {@link BarrageSession}. Closing the session does <b>not</b> close the {@link #managedChannel()}.
*
* @return the new barrage session
*/
BarrageSession newBarrageSession();

/**
* The {@link ManagedChannel} associated with {@code this} factory. Use {@link ManagedChannel#shutdown()} when
* {@code this} factory and sessions are no longer needed.
*
* @return the managed channel
*/
ManagedChannel managedChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,22 @@
import io.deephaven.client.impl.FlightSubcomponent.Builder;
import io.deephaven.client.impl.FlightSubcomponent.FlightSubcomponentModule;

/**
* Component for creating {@link FlightSubcomponent}.
*
* @see FlightSubcomponentModule
*/
@Component(modules = FlightSubcomponentModule.class)
public interface DeephavenFlightRoot {

/**
* Equivalent to {@code DaggerDeephavenFlightRoot.create()}.
*
* @return the flight root
*/
static DeephavenFlightRoot of() {
return DaggerDeephavenFlightRoot.create();
}

Builder factoryBuilder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@
import io.grpc.ManagedChannel;
import org.apache.arrow.memory.BufferAllocator;

/**
* Provides {@link FlightSession}.
*/
@Module
public class FlightSessionModule {

/**
* Delegates to {@link FlightSession#of(SessionImpl, BufferAllocator, ManagedChannel)}.
*/
@Provides
public static FlightSession newFlightSession(SessionImpl session, BufferAllocator allocator,
ManagedChannel managedChannel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@
import javax.inject.Named;
import java.util.concurrent.ScheduledExecutorService;

/**
* The flight subcomponent.
*
* @see SessionImplModule
* @see FlightSessionModule
*/
@Subcomponent(modules = {SessionImplModule.class, FlightSessionModule.class})
public interface FlightSubcomponent extends FlightSessionFactory {

@Override
FlightSession newFlightSession();

@Override
ManagedChannel managedChannel();

@Module(subcomponents = {FlightSubcomponent.class})
interface FlightSubcomponentModule {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.client;

import io.deephaven.client.impl.DaggerDeephavenFlightRoot;
import io.deephaven.client.impl.DeephavenFlightRoot;
import io.deephaven.client.impl.FlightSession;
import io.deephaven.server.runner.DeephavenApiServerTestBase;
import io.grpc.ManagedChannel;
Expand All @@ -30,8 +30,13 @@ public void setUp() throws Exception {
register(channel);
sessionScheduler = Executors.newScheduledThreadPool(2);
bufferAllocator = new RootAllocator();
flightSession = DaggerDeephavenFlightRoot.create().factoryBuilder().allocator(bufferAllocator)
.managedChannel(channel).scheduler(sessionScheduler).build().newFlightSession();
flightSession = DeephavenFlightRoot.of()
.factoryBuilder()
.allocator(bufferAllocator)
.managedChannel(channel)
.scheduler(sessionScheduler)
.build()
.newFlightSession();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.client.examples;

import io.deephaven.client.impl.DaggerDeephavenFlightRoot;
import io.deephaven.client.impl.DeephavenFlightRoot;
import io.deephaven.client.impl.FlightSession;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.qst.table.TicketTable;
Expand Down Expand Up @@ -72,7 +72,8 @@ private static void close(FlightSession session) throws InterruptedException, Ex

private FlightSession session(BufferAllocator bufferAllocator, ScheduledExecutorService scheduler,
ManagedChannel sourceChannel) {
return DaggerDeephavenFlightRoot.create().factoryBuilder()
return DeephavenFlightRoot.of()
.factoryBuilder()
.managedChannel(sourceChannel)
.scheduler(scheduler)
.allocator(bufferAllocator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.client.examples;

import io.deephaven.client.impl.DaggerDeephavenFlightRoot;
import io.deephaven.client.impl.DeephavenFlightRoot;
import io.deephaven.client.impl.FlightSession;
import io.deephaven.client.impl.FlightSessionFactory;
import io.deephaven.client.impl.FlightSubcomponent.Builder;
Expand Down Expand Up @@ -36,7 +36,8 @@ public final Void call() throws Exception {
Runtime.getRuntime()
.addShutdownHook(new Thread(() -> onShutdown(scheduler, managedChannel)));

final Builder builder = DaggerDeephavenFlightRoot.create().factoryBuilder()
final Builder builder = DeephavenFlightRoot.of()
.factoryBuilder()
.managedChannel(managedChannel)
.scheduler(scheduler)
.allocator(bufferAllocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

public class FlightSession implements AutoCloseable {

/**
* Creates a flight session. Closing the flight session does <b>not</b> close {@code channel}.
*
* @param session the session
* @param incomingAllocator the incoming allocator
* @param channel the managed channel
* @return the flight session
*/
public static FlightSession of(SessionImpl session, BufferAllocator incomingAllocator,
ManagedChannel channel) {
// Note: this pattern of FlightClient owning the ManagedChannel does not mesh well with the idea that some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@
//
package io.deephaven.client.impl;

import io.grpc.ManagedChannel;

public interface FlightSessionFactory {
/**
* Creates a new {@link FlightSession}. Closing the session does <b>not</b> close the {@link #managedChannel()}.
*
* @return the new flight session
*/
FlightSession newFlightSession();

/**
* The {@link ManagedChannel} associated with {@code this} factory. Use {@link ManagedChannel#shutdown()} when
* {@code this} factory and sessions are no longer needed.
*
* @return the managed channel
*/
ManagedChannel managedChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,22 @@
import io.deephaven.client.SessionSubcomponent.Builder;
import io.deephaven.client.SessionSubcomponent.SessionFactorySubcomponentModule;

/**
* Component for creating {@link SessionSubcomponent}.
*
* @see SessionFactorySubcomponentModule
*/
@Component(modules = SessionFactorySubcomponentModule.class)
public interface DeephavenSessionRoot {

/**
* Equivalent to {@code DaggerDeephavenSessionRoot.create()}.
*
* @return the session root
*/
static DeephavenSessionRoot of() {
return DaggerDeephavenSessionRoot.create();
}

Builder factoryBuilder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import dagger.Provides;
import io.deephaven.client.impl.SessionImpl;
import io.deephaven.client.impl.SessionImplConfig;
import io.deephaven.client.impl.SessionImplConfig.Builder;
import io.deephaven.proto.DeephavenChannel;
import io.deephaven.proto.DeephavenChannelImpl;
import io.grpc.Channel;
Expand All @@ -18,6 +17,9 @@
import javax.inject.Named;
import java.util.concurrent.ScheduledExecutorService;

/**
* Provides {@link Channel}, {@link DeephavenChannel}, {@link SessionImplConfig}, and {@link SessionImpl}.
*/
@Module
public interface SessionImplModule {

Expand All @@ -27,19 +29,29 @@ public interface SessionImplModule {
@Binds
DeephavenChannel bindsDeephavenChannelImpl(DeephavenChannelImpl deephavenChannelImpl);

/**
* Delegates to {@link SessionImplConfig#of(DeephavenChannel, ScheduledExecutorService, String)}.
*/
@Provides
static SessionImpl session(DeephavenChannel channel, ScheduledExecutorService scheduler,
static SessionImplConfig providesSessionImplConfig(
DeephavenChannel channel,
ScheduledExecutorService scheduler,
@Nullable @Named("authenticationTypeAndValue") String authenticationTypeAndValue) {
final Builder builder = SessionImplConfig.builder()
.executor(scheduler)
.channel(channel);
if (authenticationTypeAndValue != null) {
builder.authenticationTypeAndValue(authenticationTypeAndValue);
}
final SessionImplConfig config = builder.build();
return SessionImplConfig.of(channel, scheduler, authenticationTypeAndValue);
}

/**
* Creates a session. Equivalent to {@link SessionImplConfig#createSession()}.
*
* @param config the config
* @return the session
*/
@Provides
static SessionImpl session(SessionImplConfig config) {
try {
return config.createSession();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
Expand Down
Loading

0 comments on commit a21a1a1

Please sign in to comment.