Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failure event for queries that were posted but not submitted #24205

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,13 @@ public Optional<DispatchInfo> getDispatchInfo(QueryId queryId)
});
}

public void cancelQuery(QueryId queryId)
public boolean cancelQuery(QueryId queryId)
{
queryTracker.tryGetQuery(queryId)
.ifPresent(DispatchQuery::cancel);
return queryTracker.tryGetQuery(queryId)
.map(dispatchQuery -> {
dispatchQuery.cancel();
return true;
}).orElse(false);
}

public void failQuery(QueryId queryId, Throwable cause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.trino.SessionRepresentation;
import io.trino.client.QueryError;
import io.trino.client.QueryResults;
import io.trino.client.StatementStats;
import io.trino.client.TypedQueryData;
import io.trino.event.QueryMonitor;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.LocationFactory;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.QueryState;
import io.trino.operator.RetryPolicy;
import io.trino.server.BasicQueryInfo;
import io.trino.server.BasicQueryStats;
import io.trino.server.ExternalUriInfo;
import io.trino.server.GoneException;
import io.trino.server.HttpRequestSessionContextFactory;
Expand All @@ -43,7 +49,11 @@
import io.trino.server.security.ResourceSecurity;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
import io.trino.spi.security.Identity;
import io.trino.spi.type.TimeZoneKey;
import io.trino.sql.SqlEnvironmentConfig;
import io.trino.sql.SqlPath;
import io.trino.tracing.TrinoAttributes;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
Expand All @@ -68,6 +78,9 @@
import jakarta.ws.rs.core.Response;

import java.net.URI;
import java.security.Principal;
import java.time.Instant;
import java.util.Locale;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -95,7 +108,10 @@
import static io.trino.server.protocol.Slug.Context.QUEUED_QUERY;
import static io.trino.server.security.ResourceSecurity.AccessType.AUTHENTICATED_USER;
import static io.trino.server.security.ResourceSecurity.AccessType.PUBLIC;
import static io.trino.spi.StandardErrorCode.ABANDONED_QUERY;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.USER_CANCELED;
import static io.trino.util.Failures.toFailure;
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Expand All @@ -121,6 +137,9 @@ public class QueuedStatementResource

private final boolean compressionEnabled;
private final QueryManager queryManager;
private final QueryMonitor queryMonitor;
private final String defaultSqlPath;
private final LocationFactory locationFactory;

@Inject
public QueuedStatementResource(
Expand All @@ -130,7 +149,10 @@ public QueuedStatementResource(
DispatchExecutor executor,
QueryInfoUrlFactory queryInfoUrlTemplate,
ServerConfig serverConfig,
QueryManagerConfig queryManagerConfig)
QueryManagerConfig queryManagerConfig,
QueryMonitor queryMonitor,
SqlEnvironmentConfig sqlEnvironmentConfig,
LocationFactory locationFactory)
{
this.sessionContextFactory = requireNonNull(sessionContextFactory, "sessionContextFactory is null");
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
Expand All @@ -140,6 +162,9 @@ public QueuedStatementResource(
this.queryInfoUrlFactory = requireNonNull(queryInfoUrlTemplate, "queryInfoUrlTemplate is null");
this.compressionEnabled = serverConfig.isQueryResultsCompressionEnabled();
queryManager = new QueryManager(queryManagerConfig.getClientTimeout());
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
this.defaultSqlPath = requireNonNull(sqlEnvironmentConfig.getPath(), "path is null");
this.locationFactory = requireNonNull(locationFactory, "locationFactory is null");
}

@PostConstruct
Expand Down Expand Up @@ -182,7 +207,7 @@ private Query registerQuery(String statement, HttpServletRequest servletRequest,
MultivaluedMap<String, String> headers = httpHeaders.getRequestHeaders();

SessionContext sessionContext = sessionContextFactory.createSessionContext(headers, remoteAddress, identity);
Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory, tracer);
Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory, tracer, queryMonitor, defaultSqlPath, locationFactory);
queryManager.registerQuery(query);

// let authentication filter know that identity lifecycle has been handed off
Expand Down Expand Up @@ -306,12 +331,24 @@ private static final class Query
private final Span querySpan;
private final Slug slug = Slug.createNew();
private final AtomicLong lastToken = new AtomicLong();
private final QueryMonitor queryMonitor;
private final String defaultPath;
private final LocationFactory locationFactory;

private final long initTime = System.nanoTime();
private final AtomicReference<Boolean> submissionGate = new AtomicReference<>();
private final SettableFuture<Void> creationFuture = SettableFuture.create();

public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, QueryInfoUrlFactory queryInfoUrlFactory, Tracer tracer)
private boolean cancelled;

public Query(
String query,
SessionContext sessionContext,
DispatchManager dispatchManager,
QueryInfoUrlFactory queryInfoUrlFactory,
Tracer tracer,
QueryMonitor queryMonitor,
String defaultPath,
LocationFactory locationFactory)
{
this.query = requireNonNull(query, "query is null");
this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");
Expand All @@ -323,6 +360,9 @@ public Query(String query, SessionContext sessionContext, DispatchManager dispat
this.querySpan = tracer.spanBuilder("query")
.setAttribute(TrinoAttributes.QUERY_ID, queryId.toString())
.startSpan();
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
this.defaultPath = requireNonNull(defaultPath, "defaultPath is null");
this.locationFactory = requireNonNull(locationFactory, "locationFactory is null");
}

public QueryId getQueryId()
Expand Down Expand Up @@ -400,15 +440,86 @@ public QueryResults getQueryResults(long token, ExternalUriInfo externalUriInfo)

public void cancel()
{
creationFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor());
cancelled = true;
creationFuture.addListener(() -> cancelInternal(), directExecutor());
}

private void cancelInternal()
{
if (!dispatchManager.cancelQuery(queryId)) {
queryMonitor.queryImmediateFailureEvent(
getBasicQueryInfoForFailure(USER_CANCELED.toErrorCode()),
toFailure(new TrinoException(USER_CANCELED, "Query was canceled")));
}
}

public void destroy()
{
querySpan.setStatus(StatusCode.ERROR).end();
if (!submissionGate.get()) {
if (cancelled) {
queryMonitor.queryImmediateFailureEvent(
getBasicQueryInfoForFailure(USER_CANCELED.toErrorCode()),
toFailure(new TrinoException(USER_CANCELED, "Query was canceled")));
}
else {
queryMonitor.queryImmediateFailureEvent(
getBasicQueryInfoForFailure(ABANDONED_QUERY.toErrorCode()),
toFailure(new TrinoException(ABANDONED_QUERY, "Query was abandoned %s was abandoned by the client, as it never checked for results after submission".formatted(queryId))));
}
}
sessionContext.getIdentity().destroy();
}

private BasicQueryInfo getBasicQueryInfoForFailure(ErrorCode errorCode)
{
return new BasicQueryInfo(
queryId,
new SessionRepresentation(
queryId.toString(),
querySpan,
sessionContext.getTransactionId(),
sessionContext.supportClientTransaction(),
sessionContext.getIdentity().getUser(),
sessionContext.getOriginalIdentity().getUser(),
sessionContext.getIdentity().getGroups(),
sessionContext.getOriginalIdentity().getGroups(),
sessionContext.getIdentity().getPrincipal().map(Principal::toString),
sessionContext.getIdentity().getEnabledRoles(),
sessionContext.getSource(),
sessionContext.getCatalog(),
sessionContext.getSchema(),
SqlPath.buildPath(sessionContext.getPath().orElse(defaultPath), sessionContext.getCatalog()),
sessionContext.getTraceToken(),
sessionContext.getTimeZoneId().map(TimeZoneKey::getTimeZoneKey).orElse(null),
sessionContext.getLanguage().map(s -> Locale.forLanguageTag(s)).orElse(Locale.getDefault()),
sessionContext.getRemoteUserAddress(),
sessionContext.getUserAgent(),
sessionContext.getClientInfo(),
sessionContext.getClientTags(),
sessionContext.getClientCapabilities(),
sessionContext.getResourceEstimates(),
Instant.now(),
sessionContext.getSystemProperties(),
sessionContext.getCatalogSessionProperties(),
sessionContext.getIdentity().getCatalogRoles(),
sessionContext.getPreparedStatements(),
sessionContext.getProtocolHeaders().getProtocolName(),
sessionContext.getQueryDataEncoding()),
Optional.empty(),
FAILED,
false,
locationFactory.createQueryLocation(queryId),
query,
Optional.empty(),
Optional.empty(),
BasicQueryStats.immediateFailureQueryStats(),
errorCode.getType(),
errorCode,
Optional.empty(),
RetryPolicy.NONE);
}

private QueryResults createQueryResults(long token, ExternalUriInfo externalUriInfo, DispatchInfo dispatchInfo)
{
URI nextUri = getNextUri(token, externalUriInfo, dispatchInfo);
Expand Down
95 changes: 50 additions & 45 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,55 @@ public void queryCreatedEvent(BasicQueryInfo queryInfo)
Optional::empty)));
}

private static QueryStatistics createEmptyStatistics(Duration queuedTime)
{
return new QueryStatistics(
ofMillis(0),
ofMillis(0),
ofMillis(0),
queuedTime,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
ImmutableList.of(),
0,
true,
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
Optional.empty());
}

public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailureInfo failure)
{
eventListenerManager.queryCompleted(requiresAnonymizedPlan -> new QueryCompletedEvent(
Expand All @@ -194,51 +243,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
Optional.empty(),
Optional.empty(),
Optional::empty),
new QueryStatistics(
ofMillis(0),
ofMillis(0),
ofMillis(0),
ofMillis(queryInfo.getQueryStats().getQueuedTime().toMillis()),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
ImmutableList.of(),
0,
true,
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
ImmutableList.of(),
Optional.empty()),
createEmptyStatistics(ofMillis(queryInfo.getQueryStats().getQueuedTime().toMillis())),
createQueryContext(
queryInfo.getSession(),
queryInfo.getResourceGroupId(),
Expand Down
Loading