diff --git a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java index 2f103832a0a..6fb7b4cdf76 100644 --- a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java +++ b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java @@ -23,6 +23,8 @@ import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.util.ColumnHolder; +import io.deephaven.engine.updategraph.NotificationQueue; +import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.barrage.util.ArrowIpcUtil; import io.deephaven.extensions.barrage.util.BarrageUtil; @@ -33,10 +35,11 @@ import io.deephaven.io.logger.Logger; import io.deephaven.proto.backplane.grpc.ExportNotification; import io.deephaven.proto.util.ByteHelper; +import io.deephaven.qst.TableCreator; +import io.deephaven.qst.table.ParentsVisitor; import io.deephaven.qst.table.TableSpec; import io.deephaven.qst.table.TicketTable; import io.deephaven.server.auth.AuthorizationProvider; -import io.deephaven.server.console.ScopeTicketResolver; import io.deephaven.server.session.ActionResolver; import io.deephaven.server.session.CommandResolver; import io.deephaven.server.session.SessionState; @@ -92,6 +95,7 @@ import java.security.SecureRandom; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -165,10 +169,6 @@ public final class FlightSqlResolver implements ActionResolver, CommandResolver @VisibleForTesting static final Schema DATASET_SCHEMA_SENTINEL = new Schema(List.of(Field.nullable("DO_NOT_USE", Utf8.INSTANCE))); - // Unable to depends on TicketRouter, would be a circular dependency atm (since TicketRouter depends on all the - // TicketResolvers). - // private final TicketRouter router; - private final ScopeTicketResolver scopeTicketResolver; private final Scheduler scheduler; private final Authorization authorization; private final KeyedObjectHashMap queries; @@ -177,10 +177,8 @@ public final class FlightSqlResolver implements ActionResolver, CommandResolver @Inject public FlightSqlResolver( final AuthorizationProvider authProvider, - final ScopeTicketResolver scopeTicketResolver, final Scheduler scheduler) { this.authorization = Objects.requireNonNull(authProvider.getTicketResolverAuthorization()); - this.scopeTicketResolver = Objects.requireNonNull(scopeTicketResolver); this.scheduler = Objects.requireNonNull(scheduler); this.queries = new KeyedObjectHashMap<>(QUERY_KEY); this.preparedStatements = new KeyedObjectHashMap<>(PREPARED_STATEMENT_KEY); @@ -642,26 +640,82 @@ interface TicketHandlerReleasable extends TicketHandler { void release(); } - private Table executeSqlQuery(SessionState session, String sql) { - // See SQLTODO(catalog-reader-implementation) - final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); - // noinspection unchecked,rawtypes - final Map queryScopeTables = - (Map) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table); - final TableSpec tableSpec = Sql.parseSql(sql, queryScopeTables, TicketTable::fromQueryScopeField, null); - // Note: this is doing io.deephaven.server.session.TicketResolver.Authorization.transform, but not - // io.deephaven.auth.ServiceAuthWiring - // TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization logic + private Table executeSqlQuery(String sql) { + final ExecutionContext executionContext = ExecutionContext.getContext(); + final QueryScope queryScope = executionContext.getQueryScope(); + // We aren't managing the liveness of Tables that come verbatim (authorization un-transformed) from the query + // scope (we are ensuring that any transformed, or operation created, tables don't escape to a higher-layer's + // liveness scope). In the case where they either are already not live, or become not live by the time the + // operation logic is executed, an appropriate exception will be thrown. While this is a liveness race, it isn't + // technically much different than a liveness race possible via ScopeTicketResolver.resolve. + // + // The proper way to do this would be to re-model the table execution logic of GrpcTableOperation (gRPC) into a + // QST form, whereby table dependencies are presented as properly-scoped, liveness-managed Exports for the + // duration of the operation. try (final SafeCloseable ignored = LivenessScopeStack.open()) { - final Table table = tableSpec.logic() - .create(new TableCreatorScopeTickets(TableCreatorImpl.INSTANCE, scopeTicketResolver, session)); - if (table.isRefreshing()) { - table.retainReference(); + // See SQLTODO(catalog-reader-implementation) + // Unfortunately, we must do authorization.transform on all the query scope tables up-front; to make this + // on-demand, we need to implement a Calcite Catalog Reader (non-trivial). Technically, parseSql only needs + // to know the definitions of the tables; we could consider a table-specific authorization interface that + // presents the specialization Authorization.transformedDefinition(Table) to make this cheaper in a lot of + // cases. + final Map queryScopeTables = + queryScope.toMap(o -> queryScopeAuthorizedTableMapper(queryScope, o), (n, t) -> t != null); + final TableSpec tableSpec = + Sql.parseSql(sql, queryScopeTables, TableCreatorScopeTickets::ticketTable, null); + final TableCreator tableCreator = + new TableCreatorScopeTickets(TableCreatorImpl.INSTANCE, queryScopeTables); + // We could consider doing finer-grained sharedLock in the future; right now, taking it for the whole + // operation if any of the TicketTable sources are refreshing. + final List
refreshingTables = new ArrayList<>(); + for (final TableSpec node : ParentsVisitor.reachable(List.of(tableSpec))) { + // Of the source tables, SQL can produce a NewTable or a TicketTable (until we introduce custom + // functions, where we could conceivable have it produce EmptyTable, TimeTable, etc). + if (!(node instanceof TicketTable)) { + continue; + } + final Table sourceTable = tableCreator.of((TicketTable) node); + if (sourceTable.isRefreshing()) { + refreshingTables.add(sourceTable); + } + } + final UpdateGraph updateGraph = refreshingTables.isEmpty() + ? null + : NotificationQueue.Dependency.getUpdateGraph(null, refreshingTables.toArray(new Table[0])); + // Note: Authorization.transform has already been performed, but we are _not_ doing + // io.deephaven.auth.ServiceAuthWiring checks. + // TODO(deephaven-core#6307): Declarative server-side table execution logic that preserves authorization + // logic + try ( + final SafeCloseable ignored0 = + updateGraph == null ? null : executionContext.withUpdateGraph(updateGraph).open(); + final SafeCloseable ignored1 = + updateGraph == null ? null : updateGraph.sharedLock().lockCloseable()) { + final Table table = tableSpec.logic().create(tableCreator); + if (table.isRefreshing()) { + table.retainReference(); + } + return table; } - return table; } } + private Table queryScopeTableMapper(QueryScope queryScope, Object object) { + if (object == null) { + return null; + } + object = queryScope.unwrapObject(object); + if (!(object instanceof Table)) { + return null; + } + return (Table) object; + } + + private Table queryScopeAuthorizedTableMapper(QueryScope queryScope, Object object) { + final Table table = queryScopeTableMapper(queryScope, object); + return table == null ? null : authorization.transform(table); + } + /** * This is the base class for "easy" commands; that is, commands that have a fixed schema and are cheap to * initialize. @@ -841,7 +895,7 @@ private synchronized QueryBase executeImpl(C command) { protected void executeSql(String sql) { try { - table = executeSqlQuery(session, sql); + table = executeSqlQuery(sql); } catch (SqlParseException e) { throw error(Code.INVALID_ARGUMENT, "query can't be parsed", e); } catch (UnsupportedSqlOperation e) { @@ -1319,8 +1373,11 @@ private Table getTablesEmpty(boolean includeSchema, Map attribut private Table getTables(boolean includeSchema, QueryScope queryScope, Map attributes, Predicate tableNameFilter) { Objects.requireNonNull(attributes); - final Map queryScopeTables = - (Map) (Map) queryScope.toMap(queryScope::unwrapObject, (n, t) -> t instanceof Table); + // Note: _not_ using queryScopeAuthorizedTable mapper; we can have a more efficient implementation when + // !includeSchema that only needs to check authorization.isDeniedAccess. + final Map queryScopeTables = queryScope.toMap( + o -> queryScopeTableMapper(queryScope, o), + (tableName, table) -> table != null && tableNameFilter.test(tableName)); final int size = queryScopeTables.size(); final String[] catalogNames = new String[size]; final String[] dbSchemaNames = new String[size]; @@ -1330,9 +1387,6 @@ private Table getTables(boolean includeSchema, QueryScope queryScope, Map e : queryScopeTables.entrySet()) { final String tableName = e.getKey(); - if (!tableNameFilter.test(tableName)) { - continue; - } final Schema schema; if (includeSchema) { final Table table = authorization.transform(e.getValue()); diff --git a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/TableCreatorScopeTickets.java b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/TableCreatorScopeTickets.java index c254d5ad99a..e2cd52c52b6 100644 --- a/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/TableCreatorScopeTickets.java +++ b/extensions/flight-sql/src/main/java/io/deephaven/server/flightsql/TableCreatorScopeTickets.java @@ -3,31 +3,35 @@ // package io.deephaven.server.flightsql; +import io.deephaven.base.verify.Assert; import io.deephaven.engine.table.Table; import io.deephaven.qst.TableCreator; import io.deephaven.qst.TableCreatorDelegate; import io.deephaven.qst.table.TicketTable; -import io.deephaven.server.console.ScopeTicketResolver; -import io.deephaven.server.session.SessionState; -import java.nio.ByteBuffer; +import java.util.Map; import java.util.Objects; final class TableCreatorScopeTickets extends TableCreatorDelegate
{ - private final ScopeTicketResolver scopeTicketResolver; - private final SessionState session; + static TicketTable ticketTable(String variableName) { + return TicketTable.fromQueryScopeField(variableName); + } + + private final Map map; - TableCreatorScopeTickets(TableCreator
delegate, ScopeTicketResolver scopeTicketResolver, - SessionState session) { + TableCreatorScopeTickets(TableCreator
delegate, Map map) { super(delegate); - this.scopeTicketResolver = Objects.requireNonNull(scopeTicketResolver); - this.session = session; + this.map = Objects.requireNonNull(map); } @Override public Table of(TicketTable ticketTable) { - return scopeTicketResolver.
resolve(session, ByteBuffer.wrap(ticketTable.ticket()), - TableCreatorScopeTickets.class.getSimpleName()).get(); + final byte[] ticket = ticketTable.ticket(); + Assert.gt(ticket.length, "ticket.length", 2); + Assert.eq(ticket[0], "ticket[0]", (byte) 's'); + Assert.eq(ticket[1], "ticket[1]", (byte) '/'); + final String variableName = new String(ticket, 2, ticket.length - 2); + return Objects.requireNonNull(map.get(variableName)); } } diff --git a/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java b/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java index d20d8d50c9e..b761c7aa039 100644 --- a/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java +++ b/extensions/flight-sql/src/test/java/io/deephaven/server/flightsql/FlightSqlTest.java @@ -11,6 +11,7 @@ import dagger.Component; import dagger.Module; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; @@ -86,6 +87,8 @@ import javax.inject.Singleton; import java.io.PrintStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; @@ -893,6 +896,55 @@ public void unknownAction() { actionNoResolver(() -> doAction(action), type); } + @Test + public void refreshingTableTest() throws Exception { + // Set a start time so we can test we get out the expected number of rows + final Instant startTime = Instant.now().minus(Duration.ofHours(1)); + final Table tt1 = TableTools.timeTableBuilder() + .startTime(startTime) + .period(Duration.ofSeconds(1)) + .build() + .view("Timestamp1=Timestamp", "Id=ii % 11") + .lastBy("Id"); + final Table tt2 = TableTools.timeTableBuilder() + .startTime(startTime) + .period(Duration.ofSeconds(5)) + .build() + .view("Timestamp2=Timestamp", "Id=ii % 11") + .lastBy("Id"); + final QueryScope queryScope = ExecutionContext.getContext().getQueryScope(); + queryScope.putParam("my_table_1", tt1); + queryScope.putParam("my_table_2", tt2); + try { + final String query = "SELECT\n" + + " my_table_1.Id,\n" + + " my_table_1.Timestamp1,\n" + + " my_table_2.Timestamp2\n" + + "FROM\n" + + " my_table_1\n" + + " INNER JOIN my_table_2 ON my_table_1.Id = my_table_2.Id"; + { + final FlightInfo info = flightSqlClient.execute(query); + consume(info, 1, 11, false); + } + { + final PreparedStatement prepared = flightSqlClient.prepare(query); + { + final FlightInfo info = prepared.execute(); + consume(info, 1, 11, false); + } + { + final FlightInfo info = prepared.execute(); + consume(info, 1, 11, false); + } + } + } finally { + queryScope.putParam("my_table_2", null); + queryScope.putParam("my_table_1", null); + } + + } + private Result doAction(Action action) { final Iterator it = flightClient.doAction(action); if (!it.hasNext()) {