diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index d54600dd3f3..27466f2729f 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -73,12 +73,13 @@ public static void DoGetCustom( final Flight.Ticket request, final StreamObserver observer) { - final String description = "FlightService#DoGet(table=" + ticketRouter.getLogNameFor(request, "table") + ")"; + final String ticketLogName = ticketRouter.getLogNameFor(request, "table"); + final String description = "FlightService#DoGet(table=" + ticketLogName + ")"; final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery( description, session.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY); try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) { - final SessionState.ExportObject> tableExport = + final SessionState.ExportObject tableExport = ticketRouter.resolve(session, request, "table"); final BarragePerformanceLog.SnapshotMetricsHelper metrics = @@ -91,7 +92,15 @@ public static void DoGetCustom( .onError(observer) .submit(() -> { metrics.queueNanos = System.nanoTime() - queueStartTm; - final BaseTable table = tableExport.get(); + Object export = tableExport.get(); + if (export instanceof Table) { + export = ((Table) export).coalesce(); + } + if (!(export instanceof BaseTable)) { + throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Ticket (" + + ticketLogName + ") is not a subscribable table."); + } + final BaseTable table = (BaseTable) export; metrics.tableId = Integer.toHexString(System.identityHashCode(table)); metrics.tableKey = BarragePerformanceLog.getKeyFor(table); @@ -488,13 +497,14 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { final BarrageSnapshotRequest snapshotRequest = BarrageSnapshotRequest .getRootAsBarrageSnapshotRequest(message.app_metadata.msgPayloadAsByteBuffer()); - final String description = "FlightService#DoExchange(snapshot, table=" - + ticketRouter.getLogNameFor(snapshotRequest.ticketAsByteBuffer(), "table") + ")"; + final String ticketLogName = + ticketRouter.getLogNameFor(snapshotRequest.ticketAsByteBuffer(), "table"); + final String description = "FlightService#DoExchange(snapshot, table=" + ticketLogName + ")"; final QueryPerformanceRecorder queryPerformanceRecorder = QueryPerformanceRecorder.newQuery( description, session.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY); try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) { - final SessionState.ExportObject> tableExport = + final SessionState.ExportObject tableExport = ticketRouter.resolve(session, snapshotRequest.ticketAsByteBuffer(), "table"); final BarragePerformanceLog.SnapshotMetricsHelper metrics = @@ -507,7 +517,15 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { .onError(listener) .submit(() -> { metrics.queueNanos = System.nanoTime() - queueStartTm; - final BaseTable table = tableExport.get(); + Object export = tableExport.get(); + if (export instanceof Table) { + export = ((Table) export).coalesce(); + } + if (!(export instanceof BaseTable)) { + throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Ticket (" + + ticketLogName + ") is not a subscribable table."); + } + final BaseTable table = (BaseTable) export; metrics.tableId = Integer.toHexString(System.identityHashCode(table)); metrics.tableKey = BarragePerformanceLog.getKeyFor(table); @@ -683,9 +701,9 @@ private synchronized void onExportResolved(final SessionState.ExportObject