Skip to content

Commit

Permalink
finish getSqlInfo up to new schema
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Dec 2, 2024
1 parent 65d596d commit efa5dee
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public static Builder newBuilder(final String name) {

public static final String DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP =
"PeriodicUpdateGraph.targetCycleDurationMillis";
public static final int DEFAULT_TARGET_CYCLE_DURATION_MILLIS =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);

private final long defaultTargetCycleDurationMillis;
private volatile long targetCycleDurationMillis;
private final ThreadInitializationFactory threadInitializationFactory;
Expand Down Expand Up @@ -1166,8 +1169,7 @@ public static PeriodicUpdateGraph getInstance(final String name) {
public static final class Builder {
private final boolean allowUnitTestMode =
Configuration.getInstance().getBooleanWithDefault(ALLOW_UNIT_TEST_MODE_PROP, false);
private long targetCycleDurationMillis =
Configuration.getInstance().getIntegerWithDefault(DEFAULT_TARGET_CYCLE_DURATION_MILLIS_PROP, 1000);
private long targetCycleDurationMillis = DEFAULT_TARGET_CYCLE_DURATION_MILLIS;
private long minimumCycleDurationToLogNanos = DEFAULT_MINIMUM_CYCLE_DURATION_TO_LOG_NANOSECONDS;

private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.ChunkType;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.PoisonedUpdateGraph;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
Expand All @@ -30,6 +31,7 @@
import io.deephaven.engine.table.impl.remote.ConstructSnapshot;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.extensions.barrage.BarrageMessageWriter;
import io.deephaven.extensions.barrage.BarrageOptions;
Expand Down Expand Up @@ -965,9 +967,16 @@ public static void createAndSendStaticSnapshot(
// very simplistic logic to take the last snapshot and extrapolate max
// number of rows that will not exceed the target UGP processing time
// percentage
PeriodicUpdateGraph updateGraph = table.getUpdateGraph().cast();
final long targetCycleDurationMillis;
final UpdateGraph updateGraph = table.getUpdateGraph();
if (updateGraph == null || updateGraph instanceof PoisonedUpdateGraph) {
targetCycleDurationMillis = PeriodicUpdateGraph.DEFAULT_TARGET_CYCLE_DURATION_MILLIS;
} else {
targetCycleDurationMillis = updateGraph.<PeriodicUpdateGraph>cast()
.getTargetCycleDurationMillis();
}
long targetNanos = (long) (TARGET_SNAPSHOT_PERCENTAGE
* updateGraph.getTargetCycleDurationMillis()
* targetCycleDurationMillis
* 1000000);

long nanosPerCell = elapsed / (msg.rowsIncluded.size() * columnCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_EXPORTED_KEYS_TYPE_URL;
import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_IMPORTED_KEYS_TYPE_URL;
import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_PRIMARY_KEYS_TYPE_URL;
import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_SQL_INFO_TYPE_URL;
import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_TABLES_TYPE_URL;
import static io.deephaven.server.flightsql.FlightSqlSharedConstants.COMMAND_GET_TABLE_TYPES_TYPE_URL;

Expand Down Expand Up @@ -113,6 +114,8 @@ private static <T> T visit(Any ticket, TicketVisitor<T> visitor, String logId) {
return visitor.visit(unpack(ticket, CommandGetImportedKeys.class, logId));
case COMMAND_GET_EXPORTED_KEYS_TYPE_URL:
return visitor.visit(unpack(ticket, CommandGetExportedKeys.class, logId));
case COMMAND_GET_SQL_INFO_TYPE_URL:
return visitor.visit(unpack(ticket, CommandGetSqlInfo.class, logId));
}
throw invalidTicket(logId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetExportedKeys;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetImportedKeys;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetXdbcTypeInfo;
Expand Down Expand Up @@ -641,11 +640,21 @@ public void insertPrepared() {
}

@Test
public void getSqlInfo() {
getSchemaUnimplemented(() -> flightSqlClient.getSqlInfoSchema(), CommandGetSqlInfo.getDescriptor());
commandUnimplemented(() -> flightSqlClient.getSqlInfo(), CommandGetSqlInfo.getDescriptor());
misbehave(CommandGetSqlInfo.getDefaultInstance(), CommandGetSqlInfo.getDescriptor());
unpackable(CommandGetSqlInfo.getDescriptor(), CommandGetSqlInfo.class);
public void getSqlInfo() throws Exception {
final SchemaResult schemaResult = flightSqlClient.getSqlInfoSchema();
final FlightInfo info = flightSqlClient.getSqlInfo();
try (final FlightStream stream = flightSqlClient.getStream(endpoint(info).getTicket())) {
assertThat(schemaResult.getSchema()).isEqualTo(stream.getSchema());

int numRows = 0;
int flightCount = 0;
while (stream.next()) {
++flightCount;
numRows += stream.getRoot().getRowCount();
}
assertThat(flightCount).isEqualTo(1);
assertThat(numRows).isEqualTo(8);
}
}

@Test
Expand Down

0 comments on commit efa5dee

Please sign in to comment.