diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 70d36063182..32712aba8b1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -100,6 +100,9 @@ public static Builder newBuilder(final String name) { public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP = "PeriodicUpdateGraph.targetCycleDurationMillis"; + public static final int DEFAULT_TARGET_CYCLE_DURATION_MILLIS = + Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000); + private final long defaultTargetCycleDurationMillis; private volatile long targetCycleDurationMillis; private final ThreadInitializationFactory threadInitializationFactory; @@ -1166,8 +1169,7 @@ public static PeriodicUpdateGraph getInstance(final String name) { public static final class Builder { private final boolean allowUnitTestMode = Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false); - private long targetCycleDurationMillis = - Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000); + private long targetCycleDurationMillis = DEFAULT_TARGET_CYCLE_DURATION_MILLIS; private long minimumCycleDurationToLogNanos = DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS; private String name; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java index afb1d07d990..3ea376610ee 100755 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/BarrageUtil.java @@ -18,6 +18,7 @@ import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.ChunkType; import io.deephaven.configuration.Configuration; +import io.deephaven.engine.context.PoisonedUpdateGraph; import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; @@ -30,6 +31,7 @@ import io.deephaven.engine.table.impl.remote.ConstructSnapshot; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.table.impl.util.BarrageMessage; +import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.extensions.barrage.BarrageMessageWriter; import io.deephaven.extensions.barrage.BarrageOptions; @@ -965,9 +967,16 @@ public static void createAndSendStaticSnapshot( // very simplistic logic to take the last snapshot and extrapolate max // number of rows that will not exceed the target UGP processing time // percentage - PeriodicUpdateGraph updateGraph = table.getUpdateGraph().cast(); + final long targetCycleDurationMillis; + final UpdateGraph updateGraph = table.getUpdateGraph(); + if (updateGraph == null || updateGraph instanceof PoisonedUpdateGraph) { + targetCycleDurationMillis = PeriodicUpdateGraph.DEFAULT_TARGET_CYCLE_DURATION_MILLIS; + } else { + targetCycleDurationMillis = updateGraph.cast() + .getTargetCycleDurationMillis(); + } long targetNanos = (long) (TARGET_SNAPSHOT_PERCENTAGE - * updateGraph.getTargetCycleDurationMillis() + * targetCycleDurationMillis * 1000000); long nanosPerCell = elapsed / (msg.rowsIncluded.size() * columnCount); diff --git a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlTicketHelper.java b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlTicketHelper.java index f749913a823..f47211bd8df 100644 --- a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlTicketHelper.java +++ b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlTicketHelper.java @@ -30,6 +30,7 @@ import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_EXPORTED_KEYS_TYPE_URL; import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_IMPORTED_KEYS_TYPE_URL; import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_PRIMARY_KEYS_TYPE_URL; +import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_SQL_INFO_TYPE_URL; import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_TABLES_TYPE_URL; import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_TABLE_TYPES_TYPE_URL; @@ -113,6 +114,8 @@ private static T visit(Any ticket, TicketVisitor visitor, String logId) { return visitor.visit(unpack(ticket, CommandGetImportedKeys.class, logId)); case COMMAND_GET_EXPORTED_KEYS_TYPE_URL: return visitor.visit(unpack(ticket, CommandGetExportedKeys.class, logId)); + case COMMAND_GET_SQL_INFO_TYPE_URL: + return visitor.visit(unpack(ticket, CommandGetSqlInfo.class, logId)); } throw invalidTicket(logId); } diff --git a/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java b/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java index d20d8d50c9e..0b56d813c59 100644 --- a/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java +++ b/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java @@ -60,7 +60,6 @@ import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetExportedKeys; import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetImportedKeys; import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys; -import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo; import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes; import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables; import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetXdbcTypeInfo; @@ -641,11 +640,21 @@ public void insertPrepared() { } @Test - public void getSqlInfo() { - getSchemaUnimplemented(() -> flightSqlClient.getSqlInfoSchema(), CommandGetSqlInfo.getDescriptor()); - commandUnimplemented(() -> flightSqlClient.getSqlInfo(), CommandGetSqlInfo.getDescriptor()); - misbehave(CommandGetSqlInfo.getDefaultInstance(), CommandGetSqlInfo.getDescriptor()); - unpackable(CommandGetSqlInfo.getDescriptor(), CommandGetSqlInfo.class); + public void getSqlInfo() throws Exception { + final SchemaResult schemaResult = flightSqlClient.getSqlInfoSchema(); + final FlightInfo info = flightSqlClient.getSqlInfo(); + try (final FlightStream stream = flightSqlClient.getStream(endpoint(info).getTicket())) { + assertThat(schemaResult.getSchema()).isEqualTo(stream.getSchema()); + + int numRows = 0; + int flightCount = 0; + while (stream.next()) { + ++flightCount; + numRows += stream.getRoot().getRowCount(); + } + assertThat(flightCount).isEqualTo(1); + assertThat(numRows).isEqualTo(8); + } } @Test