From 2da13148ac970af48f25a3b04b7e406aacb1b8e8 Mon Sep 17 00:00:00 2001 From: mduggan-starburst Date: Mon, 18 Nov 2024 23:33:40 -0500 Subject: [PATCH] Add failure event for queries that were posted but not submitted --- .../io/trino/dispatcher/DispatchManager.java | 9 +- .../dispatcher/QueuedStatementResource.java | 121 +++++++++++++++++- .../java/io/trino/event/QueryMonitor.java | 95 +++++++------- 3 files changed, 172 insertions(+), 53 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/DispatchManager.java b/core/trino-main/src/main/java/io/trino/dispatcher/DispatchManager.java index f49114735147a..905d5f4e9531d 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/DispatchManager.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/DispatchManager.java @@ -376,10 +376,13 @@ public Optional getDispatchInfo(QueryId queryId) }); } - public void cancelQuery(QueryId queryId) + public boolean cancelQuery(QueryId queryId) { - queryTracker.tryGetQuery(queryId) - .ifPresent(DispatchQuery::cancel); + return queryTracker.tryGetQuery(queryId) + .map(dispatchQuery -> { + dispatchQuery.cancel(); + return true; + }).orElse(false); } public void failQuery(QueryId queryId, Throwable cause) diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource.java b/core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource.java index 1b10eb1f97968..e41aae00c09b3 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource.java @@ -25,13 +25,19 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; +import io.trino.SessionRepresentation; import io.trino.client.QueryError; import io.trino.client.QueryResults; import io.trino.client.StatementStats; import io.trino.client.TypedQueryData; +import io.trino.event.QueryMonitor; import io.trino.execution.ExecutionFailureInfo; +import io.trino.execution.LocationFactory; import io.trino.execution.QueryManagerConfig; import io.trino.execution.QueryState; +import io.trino.operator.RetryPolicy; +import io.trino.server.BasicQueryInfo; +import io.trino.server.BasicQueryStats; import io.trino.server.ExternalUriInfo; import io.trino.server.GoneException; import io.trino.server.HttpRequestSessionContextFactory; @@ -43,7 +49,11 @@ import io.trino.server.security.ResourceSecurity; import io.trino.spi.ErrorCode; import io.trino.spi.QueryId; +import io.trino.spi.TrinoException; import io.trino.spi.security.Identity; +import io.trino.spi.type.TimeZoneKey; +import io.trino.sql.SqlEnvironmentConfig; +import io.trino.sql.SqlPath; import io.trino.tracing.TrinoAttributes; import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; @@ -68,6 +78,9 @@ import jakarta.ws.rs.core.Response; import java.net.URI; +import java.security.Principal; +import java.time.Instant; +import java.util.Locale; import java.util.Optional; import java.util.OptionalDouble; import java.util.concurrent.ConcurrentHashMap; @@ -95,7 +108,10 @@ import static io.trino.server.protocol.Slug.Context.QUEUED_QUERY; import static io.trino.server.security.ResourceSecurity.AccessType.AUTHENTICATED_USER; import static io.trino.server.security.ResourceSecurity.AccessType.PUBLIC; +import static io.trino.spi.StandardErrorCode.ABANDONED_QUERY; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.spi.StandardErrorCode.USER_CANCELED; +import static io.trino.util.Failures.toFailure; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; @@ -121,6 +137,9 @@ public class QueuedStatementResource private final boolean compressionEnabled; private final QueryManager queryManager; + private final QueryMonitor queryMonitor; + private final String defaultSqlPath; + private final LocationFactory locationFactory; @Inject public QueuedStatementResource( @@ -130,7 +149,10 @@ public QueuedStatementResource( DispatchExecutor executor, QueryInfoUrlFactory queryInfoUrlTemplate, ServerConfig serverConfig, - QueryManagerConfig queryManagerConfig) + QueryManagerConfig queryManagerConfig, + QueryMonitor queryMonitor, + SqlEnvironmentConfig sqlEnvironmentConfig, + LocationFactory locationFactory) { this.sessionContextFactory = requireNonNull(sessionContextFactory, "sessionContextFactory is null"); this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); @@ -140,6 +162,9 @@ public QueuedStatementResource( this.queryInfoUrlFactory = requireNonNull(queryInfoUrlTemplate, "queryInfoUrlTemplate is null"); this.compressionEnabled = serverConfig.isQueryResultsCompressionEnabled(); queryManager = new QueryManager(queryManagerConfig.getClientTimeout()); + this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); + this.defaultSqlPath = requireNonNull(sqlEnvironmentConfig.getPath(), "path is null"); + this.locationFactory = requireNonNull(locationFactory, "locationFactory is null"); } @PostConstruct @@ -182,7 +207,7 @@ private Query registerQuery(String statement, HttpServletRequest servletRequest, MultivaluedMap headers = httpHeaders.getRequestHeaders(); SessionContext sessionContext = sessionContextFactory.createSessionContext(headers, remoteAddress, identity); - Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory, tracer); + Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory, tracer, queryMonitor, defaultSqlPath, locationFactory); queryManager.registerQuery(query); // let authentication filter know that identity lifecycle has been handed off @@ -306,12 +331,24 @@ private static final class Query private final Span querySpan; private final Slug slug = Slug.createNew(); private final AtomicLong lastToken = new AtomicLong(); + private final QueryMonitor queryMonitor; + private final String defaultPath; + private final LocationFactory locationFactory; private final long initTime = System.nanoTime(); private final AtomicReference submissionGate = new AtomicReference<>(); private final SettableFuture creationFuture = SettableFuture.create(); - - public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, QueryInfoUrlFactory queryInfoUrlFactory, Tracer tracer) + private boolean cancelled; + + public Query( + String query, + SessionContext sessionContext, + DispatchManager dispatchManager, + QueryInfoUrlFactory queryInfoUrlFactory, + Tracer tracer, + QueryMonitor queryMonitor, + String defaultPath, + LocationFactory locationFactory) { this.query = requireNonNull(query, "query is null"); this.sessionContext = requireNonNull(sessionContext, "sessionContext is null"); @@ -323,6 +360,9 @@ public Query(String query, SessionContext sessionContext, DispatchManager dispat this.querySpan = tracer.spanBuilder("query") .setAttribute(TrinoAttributes.QUERY_ID, queryId.toString()) .startSpan(); + this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); + this.defaultPath = requireNonNull(defaultPath, "defaultPath is null"); + this.locationFactory = requireNonNull(locationFactory, "locationFactory is null"); } public QueryId getQueryId() @@ -400,15 +440,86 @@ public QueryResults getQueryResults(long token, ExternalUriInfo externalUriInfo) public void cancel() { - creationFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor()); + cancelled = true; + creationFuture.addListener(() -> cancelInternal(), directExecutor()); + } + + private void cancelInternal() + { + if (!dispatchManager.cancelQuery(queryId)) { + queryMonitor.queryImmediateFailureEvent( + getBasicQueryInfoForFailure(USER_CANCELED.toErrorCode()), + toFailure(new TrinoException(USER_CANCELED, "Query was canceled"))); + } } public void destroy() { querySpan.setStatus(StatusCode.ERROR).end(); + if (!submissionGate.get()) { + if (cancelled) { + queryMonitor.queryImmediateFailureEvent( + getBasicQueryInfoForFailure(USER_CANCELED.toErrorCode()), + toFailure(new TrinoException(USER_CANCELED, "Query was canceled"))); + } + else { + queryMonitor.queryImmediateFailureEvent( + getBasicQueryInfoForFailure(ABANDONED_QUERY.toErrorCode()), + toFailure(new TrinoException(ABANDONED_QUERY, "Query was abandoned %s was abandoned by the client, as it never checked for results after submission".formatted(queryId)))); + } + } sessionContext.getIdentity().destroy(); } + private BasicQueryInfo getBasicQueryInfoForFailure(ErrorCode errorCode) + { + return new BasicQueryInfo( + queryId, + new SessionRepresentation( + queryId.toString(), + querySpan, + sessionContext.getTransactionId(), + sessionContext.supportClientTransaction(), + sessionContext.getIdentity().getUser(), + sessionContext.getOriginalIdentity().getUser(), + sessionContext.getIdentity().getGroups(), + sessionContext.getOriginalIdentity().getGroups(), + sessionContext.getIdentity().getPrincipal().map(Principal::toString), + sessionContext.getIdentity().getEnabledRoles(), + sessionContext.getSource(), + sessionContext.getCatalog(), + sessionContext.getSchema(), + SqlPath.buildPath(sessionContext.getPath().orElse(defaultPath), sessionContext.getCatalog()), + sessionContext.getTraceToken(), + sessionContext.getTimeZoneId().map(TimeZoneKey::getTimeZoneKey).orElse(null), + sessionContext.getLanguage().map(s -> Locale.forLanguageTag(s)).orElse(Locale.getDefault()), + sessionContext.getRemoteUserAddress(), + sessionContext.getUserAgent(), + sessionContext.getClientInfo(), + sessionContext.getClientTags(), + sessionContext.getClientCapabilities(), + sessionContext.getResourceEstimates(), + Instant.now(), + sessionContext.getSystemProperties(), + sessionContext.getCatalogSessionProperties(), + sessionContext.getIdentity().getCatalogRoles(), + sessionContext.getPreparedStatements(), + sessionContext.getProtocolHeaders().getProtocolName(), + sessionContext.getQueryDataEncoding()), + Optional.empty(), + FAILED, + false, + locationFactory.createQueryLocation(queryId), + query, + Optional.empty(), + Optional.empty(), + BasicQueryStats.immediateFailureQueryStats(), + errorCode.getType(), + errorCode, + Optional.empty(), + RetryPolicy.NONE); + } + private QueryResults createQueryResults(long token, ExternalUriInfo externalUriInfo, DispatchInfo dispatchInfo) { URI nextUri = getNextUri(token, externalUriInfo, dispatchInfo); diff --git a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java index 858680e218c90..972f5cde3cc40 100644 --- a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java +++ b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java @@ -177,6 +177,55 @@ public void queryCreatedEvent(BasicQueryInfo queryInfo) Optional::empty))); } + private static QueryStatistics createEmptyStatistics(Duration queuedTime) + { + return new QueryStatistics( + ofMillis(0), + ofMillis(0), + ofMillis(0), + queuedTime, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ImmutableList.of(), + 0, + true, + ImmutableList.of(), + ImmutableList.of(), + ImmutableList.of(), + ImmutableList.of(), + ImmutableList.of(), + Optional.empty()); + } + public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailureInfo failure) { eventListenerManager.queryCompleted(requiresAnonymizedPlan -> new QueryCompletedEvent( @@ -194,51 +243,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur Optional.empty(), Optional.empty(), Optional::empty), - new QueryStatistics( - ofMillis(0), - ofMillis(0), - ofMillis(0), - ofMillis(queryInfo.getQueryStats().getQueuedTime().toMillis()), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - 0, - ImmutableList.of(), - 0, - true, - ImmutableList.of(), - ImmutableList.of(), - ImmutableList.of(), - ImmutableList.of(), - ImmutableList.of(), - Optional.empty()), + createEmptyStatistics(ofMillis(queryInfo.getQueryStats().getQueuedTime().toMillis())), createQueryContext( queryInfo.getSession(), queryInfo.getResourceGroupId(),