From 495fdda5183ecba4143b81ae17768d77610b8ce8 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Fri, 7 Jun 2024 13:01:46 +0200 Subject: [PATCH] Add ExecutionInfo to RequestTracker callbacks --- .../api/core/graph/GraphExecutionInfo.java | 8 + .../ContinuousRequestHandlerBase.java | 32 +-- .../graph/GraphExecutionInfoConverter.java | 11 + .../core/graph/GraphRequestHandler.java | 52 ++--- .../driver/api/core/cql/ExecutionInfo.java | 7 + .../api/core/tracker/RequestTracker.java | 211 +++++++++--------- .../internal/core/cql/CqlRequestHandler.java | 203 +++++++++-------- .../core/cql/DefaultExecutionInfo.java | 96 +++++++- .../cql/CqlRequestHandlerTrackerTest.java | 30 +++ 9 files changed, 393 insertions(+), 257 deletions(-) diff --git a/core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java b/core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java index 758f6b358ed..c10fdf1ce3c 100644 --- a/core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java +++ b/core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java @@ -18,8 +18,10 @@ package com.datastax.dse.driver.api.core.graph; import com.datastax.oss.driver.api.core.DefaultProtocolVersion; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy; +import edu.umd.cs.findbugs.annotations.Nullable; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -37,6 +39,12 @@ public interface GraphExecutionInfo { /** The statement that was executed. */ GraphStatement getStatement(); + /** @return Execution profile applied when executing given request. */ + @Nullable + default DriverExecutionProfile getExecutionProfile() { + return null; + } + /** The node that was used as a coordinator to successfully complete the query. */ Node getCoordinator(); diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index 32ebf4ad640..3969baaa651 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -833,7 +833,8 @@ public void onFailure(@NonNull Throwable error) { private void processResultResponse(@NonNull Result result, @Nullable Frame frame) { assert lock.isHeldByCurrentThread(); try { - ExecutionInfo executionInfo = createExecutionInfo(result, frame); + ExecutionInfo executionInfo = + createExecutionInfo().withServerResponse(result, frame).build(); if (result instanceof Rows) { DseRowsMetadata rowsMetadata = (DseRowsMetadata) ((Rows) result).getMetadata(); if (columnDefinitions == null) { @@ -1458,7 +1459,7 @@ private void trackNodeError( latencyNanos, executionProfile, node, - createExecutionInfo(frame), + createExecutionInfo().withServerResponse(frame).build(), logPrefix); } } @@ -1576,7 +1577,7 @@ private void completeResultSetFuture( ExecutionInfo executionInfo = null; if (pageOrError instanceof AsyncPagingIterable) { - executionInfo = ((AsyncPagingIterable) pageOrError).getExecutionInfo(); + executionInfo = ((AsyncPagingIterable) pageOrError).getExecutionInfo(); } else if (pageOrError instanceof AsyncGraphResultSet) { executionInfo = ((AsyncGraphResultSet) pageOrError).getRequestExecutionInfo(); } @@ -1612,34 +1613,13 @@ private void completeResultSetFuture( } @NonNull - private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Frame response) { - ByteBuffer pagingState = - result instanceof Rows ? ((Rows) result).getMetadata().pagingState : null; - return new DefaultExecutionInfo( + private DefaultExecutionInfo.Builder createExecutionInfo() { + return DefaultExecutionInfo.builder( statement, node, startedSpeculativeExecutionsCount.get(), executionIndex, errors, - pagingState, - response, - true, - session, - context, - executionProfile); - } - - @NonNull - private ExecutionInfo createExecutionInfo(@Nullable Frame response) { - return new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - executionIndex, - errors, - null, - response, - true, session, context, executionProfile); diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java index b6472f690d3..7500c26d691 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java @@ -18,6 +18,7 @@ package com.datastax.dse.driver.internal.core.graph; import com.datastax.dse.driver.api.core.graph.GraphStatement; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.QueryTrace; import com.datastax.oss.driver.api.core.cql.Statement; @@ -62,6 +63,11 @@ public Statement getStatement() { throw new ClassCastException("GraphStatement cannot be cast to Statement"); } + @Override + public DriverExecutionProfile getExecutionProfile() { + return graphExecutionInfo.getExecutionProfile(); + } + @Nullable @Override public Node getCoordinator() { @@ -146,6 +152,11 @@ public GraphStatement getStatement() { return (GraphStatement) executionInfo.getRequest(); } + @Override + public DriverExecutionProfile getExecutionProfile() { + return executionInfo.getExecutionProfile(); + } + @Override public Node getCoordinator() { return executionInfo.getCoordinator(); diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java index 1e5bf982ad5..dbdb58d60b9 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java @@ -329,7 +329,18 @@ private void cancelScheduledTasks() { private void setFinalResult( Result resultMessage, Frame responseFrame, NodeResponseCallback callback) { try { - ExecutionInfo executionInfo = buildExecutionInfo(callback, responseFrame); + ExecutionInfo executionInfo = + DefaultExecutionInfo.builder( + callback.statement, + callback.node, + startedSpeculativeExecutionsCount.get(), + callback.execution, + errors, + session, + context, + callback.executionProfile) + .withServerResponse(resultMessage, responseFrame) + .build(); DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(callback.statement, context); GraphProtocol subProtocol = @@ -426,23 +437,6 @@ private void logServerWarnings(GraphStatement statement, List warning LOG.warn("Query '{}' generated server side warning(s): {}", statementString, warning)); } - private ExecutionInfo buildExecutionInfo(NodeResponseCallback callback, Frame responseFrame) { - DriverExecutionProfile executionProfile = - Conversions.resolveExecutionProfile(callback.statement, context); - return new DefaultExecutionInfo( - callback.statement, - callback.node, - startedSpeculativeExecutionsCount.get(), - callback.execution, - errors, - null, - responseFrame, - true, - session, - context, - executionProfile); - } - @Override public void onThrottleFailure(@NonNull RequestThrottlingException error) { DriverExecutionProfile executionProfile = @@ -457,18 +451,16 @@ private void setFinalError( DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(statement, context); ExecutionInfo executionInfo = - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile); + DefaultExecutionInfo.builder( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + session, + context, + executionProfile) + .build(); if (error instanceof DriverException) { ((DriverException) error).setExecutionInfo(executionInfo); } diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java b/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java index 40cfca827d1..43e1f2515b2 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.DriverException; import com.datastax.oss.driver.api.core.RequestThrottlingException; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.detach.AttachmentPoint; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.retry.RetryDecision; @@ -66,6 +67,12 @@ default Request getRequest() { @Deprecated Statement getStatement(); + /** @return Execution profile applied when executing given request. */ + @Nullable + default DriverExecutionProfile getExecutionProfile() { + return null; + } + /** * The node that acted as a coordinator for the query. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java index a3988f360f4..bf623d3de9c 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java @@ -37,33 +37,52 @@ public interface RequestTracker extends AutoCloseable { /** - * @deprecated This method only exists for backward compatibility. Override {@link - * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. + * Invoked each time a request succeeds. + * + * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, + * GenericType) session.execute} call until the result is made available to the client). + * @param executionProfile the execution profile of this request. + * @param node the node that returned the successful response. + * @param executionInfo the execution info containing the results of this request + * @param requestLogPrefix the dedicated log prefix for this request */ - @Deprecated default void onSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, - @NonNull Node node) {} + @NonNull Node node, + @NonNull ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { + // delegate call to the old method + onSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); + } /** - * @deprecated This method only exists for backward compatibility. Override {@link - * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. + * Invoked each time a request fails. + * + * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, + * GenericType) session.execute} call until the error is propagated to the client). + * @param executionProfile the execution profile of this request. + * @param node the node that returned the error response, or {@code null} if the error occurred + * @param executionInfo the execution info being returned to the client for this request if + * available + * @param requestLogPrefix the dedicated log prefix for this request */ - @Deprecated - default void onSuccess( + default void onError( @NonNull Request request, + @NonNull Throwable error, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, - @NonNull Node node, + @Nullable Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { - // If client doesn't override onSuccess with requestLogPrefix delegate call to the old method - onSuccess(request, latencyNanos, executionProfile, node); + // delegate call to the old method + onError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } /** - * Invoked each time a request succeeds. + * Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request, + * long, DriverExecutionProfile, Node, String)} but at per node level. * * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, * GenericType) session.execute} call until the result is made available to the client). @@ -72,29 +91,88 @@ default void onSuccess( * @param executionInfo the execution info containing the results of this request * @param requestLogPrefix the dedicated log prefix for this request */ - default void onSuccess( + default void onNodeSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, @NonNull ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { - // If client doesn't override onSuccess with executionInfo delegate call to the old method - onSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); + // delegate call to the old method + onNodeSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); + } + + /** + * Invoked each time a request fails at the node level. Similar to {@link #onError(Request, + * Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level. + * + * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, + * GenericType) session.execute} call until the error is propagated to the client). + * @param executionProfile the execution profile of this request. + * @param node the node that returned the error response. + * @param executionInfo the execution info containing the results of this request if available + * @param requestLogPrefix the dedicated log prefix for this request + */ + default void onNodeError( + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @Nullable ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { + // delegate call to the old method + onNodeError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } + /** + * Invoked when the session is ready to process user requests. + * + *

WARNING: if you use {@code session.execute()} in your tracker implementation, keep in + * mind that those requests will in turn recurse back into {@code onSuccess} / {@code onError} + * methods. Make sure you don't trigger an infinite loop; one way to do that is to use a + * custom execution profile for internal requests. + * + *

This corresponds to the moment when {@link SessionBuilder#build()} returns, or the future + * returned by {@link SessionBuilder#buildAsync()} completes. If the session initialization fails, + * this method will not get called. + * + *

Listener methods are invoked from different threads; if you store the session in a field, + * make it at least volatile to guarantee proper publication. + * + *

This method is guaranteed to be the first one invoked on this object. + * + *

The default implementation is empty. + */ + default void onSessionReady(@NonNull Session session) {} + + // ----- Below methods have been deprecated and will be removed in next releases ----- + /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, String)} - * instead. + * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. */ @Deprecated - default void onError( + default void onSuccess( @NonNull Request request, - @NonNull Throwable error, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, - @Nullable Node node) {} + @NonNull Node node) {} + + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. + */ + @Deprecated + default void onSuccess( + @NonNull Request request, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String requestLogPrefix) { + // delegate call to the old method + onSuccess(request, latencyNanos, executionProfile, node); + } /** * @deprecated This method only exists for backward compatibility. Override {@link @@ -107,33 +185,23 @@ default void onError( @NonNull Throwable error, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, - @Nullable Node node, - @NonNull String requestLogPrefix) { - // If client doesn't override onError with requestLogPrefix delegate call to the old method - onError(request, error, latencyNanos, executionProfile, node); - } + @Nullable Node node) {} /** - * Invoked each time a request fails. - * - * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, - * GenericType) session.execute} call until the error is propagated to the client). - * @param executionProfile the execution profile of this request. - * @param node the node that returned the error response, or {@code null} if the error occurred - * @param executionInfo the execution info being returned to the client for this request if - * available - * @param requestLogPrefix the dedicated log prefix for this request + * @deprecated This method only exists for backward compatibility. Override {@link + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. */ + @Deprecated default void onError( @NonNull Request request, @NonNull Throwable error, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, - @Nullable ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { // delegate call to the old method - onError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); + onError(request, error, latencyNanos, executionProfile, node); } /** @@ -162,33 +230,10 @@ default void onNodeError( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, @NonNull String requestLogPrefix) { - // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method + // delegate call to the old method onNodeError(request, error, latencyNanos, executionProfile, node); } - /** - * Invoked each time a request fails at the node level. Similar to {@link #onError(Request, - * Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level. - * - * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, - * GenericType) session.execute} call until the error is propagated to the client). - * @param executionProfile the execution profile of this request. - * @param node the node that returned the error response. - * @param executionInfo the execution info containing the results of this request if available - * @param requestLogPrefix the dedicated log prefix for this request - */ - default void onNodeError( - @NonNull Request request, - @NonNull Throwable error, - long latencyNanos, - @NonNull DriverExecutionProfile executionProfile, - @NonNull Node node, - @Nullable ExecutionInfo executionInfo, - @NonNull String requestLogPrefix) { - // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method - onNodeError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); - } - /** * @deprecated This method only exists for backward compatibility. Override {@link * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} @@ -213,51 +258,7 @@ default void onNodeSuccess( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, @NonNull String requestLogPrefix) { - // If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old - // method - onNodeSuccess(request, latencyNanos, executionProfile, node); - } - - /** - * Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request, - * long, DriverExecutionProfile, Node, String)} but at per node level. - * - * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, - * GenericType) session.execute} call until the result is made available to the client). - * @param executionProfile the execution profile of this request. - * @param node the node that returned the successful response. - * @param executionInfo the execution info containing the results of this request - * @param requestLogPrefix the dedicated log prefix for this request - */ - default void onNodeSuccess( - @NonNull Request request, - long latencyNanos, - @NonNull DriverExecutionProfile executionProfile, - @NonNull Node node, - @NonNull ExecutionInfo executionInfo, - @NonNull String requestLogPrefix) { // delegate call to the old method - onNodeSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); + onNodeSuccess(request, latencyNanos, executionProfile, node); } - - /** - * Invoked when the session is ready to process user requests. - * - *

WARNING: if you use {@code session.execute()} in your tracker implementation, keep in - * mind that those requests will in turn recurse back into {@code onSuccess} / {@code onError} - * methods. Make sure you don't trigger an infinite loop; one way to do that is to use a - * custom execution profile for internal requests. - * - *

This corresponds to the moment when {@link SessionBuilder#build()} returns, or the future - * returned by {@link SessionBuilder#buildAsync()} completes. If the session initialization fails, - * this method will not get called. - * - *

Listener methods are invoked from different threads; if you store the session in a field, - * make it at least volatile to guarantee proper publication. - * - *

This method is guaranteed to be the first one invoked on this object. - * - *

The default implementation is empty. - */ - default void onSessionReady(@NonNull Session session) {} } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index 54c9cdefadd..27c7b2a87b3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -66,7 +66,6 @@ import com.datastax.oss.protocol.internal.response.Error; import com.datastax.oss.protocol.internal.response.Result; import com.datastax.oss.protocol.internal.response.error.Unprepared; -import com.datastax.oss.protocol.internal.response.result.Rows; import com.datastax.oss.protocol.internal.response.result.SchemaChange; import com.datastax.oss.protocol.internal.response.result.SetKeyspace; import com.datastax.oss.protocol.internal.response.result.Void; @@ -205,12 +204,12 @@ private Timeout scheduleTimeout(Duration timeoutDuration) { if (timeoutDuration.toNanos() > 0) { try { return this.timer.newTimeout( - (Timeout timeout1) -> - setFinalError( - initialStatement, - new DriverTimeoutException("Query timed out after " + timeoutDuration), - null, - -1), + (Timeout timeout1) -> { + ExecutionInfo executionInfo = failedExecutionInfoNoRequestSent().build(); + setFinalError( + executionInfo, + new DriverTimeoutException("Query timed out after " + timeoutDuration)); + }, timeoutDuration.toNanos(), TimeUnit.NANOSECONDS); } catch (IllegalStateException e) { @@ -263,7 +262,8 @@ private void sendRequest( // We've reached the end of the query plan without finding any node to write to if (!result.isDone() && activeExecutionsCount.decrementAndGet() == 0) { // We're the last execution so fail the result - setFinalError(statement, AllNodesFailedException.fromErrors(this.errors), null, -1); + ExecutionInfo executionInfo = failedExecutionInfoNoRequestSent(statement).build(); + setFinalError(executionInfo, AllNodesFailedException.fromErrors(this.errors)); } } else { NodeResponseCallback nodeResponseCallback = @@ -318,7 +318,10 @@ private void setFinalResult( NodeResponseCallback callback) { try { ExecutionInfo executionInfo = - buildExecutionInfo(callback, resultMessage, responseFrame, schemaInAgreement); + defaultExecutionInfo(callback) + .withServerResponse(resultMessage, responseFrame) + .withSchemaInAgreement(schemaInAgreement) + .build(); AsyncResultSet resultSet = Conversions.toResultSet(resultMessage, executionInfo, session, context); if (result.complete(resultSet)) { @@ -368,7 +371,9 @@ private void setFinalResult( logServerWarnings(callback.statement, executionProfile, executionInfo.getWarnings()); } } catch (Throwable error) { - setFinalError(callback.statement, error, callback.node, -1); + // something unpredictable unexpected happened here that we can't blame on the request itself + ExecutionInfo executionInfo = defaultExecutionInfo(callback, -1).build(); + setFinalError(executionInfo, error); } } @@ -399,50 +404,15 @@ private void logServerWarnings( LOG.warn("Query '{}' generated server side warning(s): {}", statementString, warning)); } - private ExecutionInfo buildExecutionInfo( - NodeResponseCallback callback, - Result resultMessage, - Frame responseFrame, - boolean schemaInAgreement) { - ByteBuffer pagingState = - (resultMessage instanceof Rows) ? ((Rows) resultMessage).getMetadata().pagingState : null; - return new DefaultExecutionInfo( - callback.statement, - callback.node, - startedSpeculativeExecutionsCount.get(), - callback.execution, - errors, - pagingState, - responseFrame, - schemaInAgreement, - session, - context, - executionProfile); - } - @Override public void onThrottleFailure(@NonNull RequestThrottlingException error) { sessionMetricUpdater.incrementCounter( DefaultSessionMetric.THROTTLING_ERRORS, executionProfile.getName()); - setFinalError(initialStatement, error, null, -1); + ExecutionInfo executionInfo = failedExecutionInfoNoRequestSent().build(); + setFinalError(executionInfo, error); } - private void setFinalError(Statement statement, Throwable error, Node node, int execution) { - DriverExecutionProfile executionProfile = - Conversions.resolveExecutionProfile(statement, context); - ExecutionInfo executionInfo = - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile); + private void setFinalError(ExecutionInfo executionInfo, Throwable error) { if (error instanceof DriverException) { ((DriverException) error).setExecutionInfo(executionInfo); } @@ -451,12 +421,19 @@ private void setFinalError(Statement statement, Throwable error, Node node, i if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; requestTracker.onError( - statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); + executionInfo.getRequest(), + error, + latencyNanos, + executionInfo.getExecutionProfile(), + executionInfo.getCoordinator(), + executionInfo, + logPrefix); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); sessionMetricUpdater.incrementCounter( - DefaultSessionMetric.CQL_CLIENT_TIMEOUTS, executionProfile.getName()); + DefaultSessionMetric.CQL_CLIENT_TIMEOUTS, + executionInfo.getExecutionProfile().getName()); } else if (!(error instanceof RequestThrottlingException)) { throttler.signalError(this, error); } @@ -504,23 +481,25 @@ private NodeResponseCallback( this.logPrefix = logPrefix + "|" + execution; } - // this gets invoked once the write completes. + // this gets invoked once the write request completes. @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { Throwable error = future.cause(); + ExecutionInfo executionInfo = CqlRequestHandler.this.defaultExecutionInfo(this).build(); if (error instanceof EncoderException && error.getCause() instanceof FrameTooLongException) { - trackNodeError(node, error.getCause(), NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, error.getCause(), node, execution); + trackNodeError(error.getCause(), executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, error.getCause()); } else { LOG.trace( "[{}] Failed to send request on {}, trying next node (cause: {})", logPrefix, channel, + error.getMessage(), error); recordError(node, error); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); ((DefaultNode) node) .getMetricUpdater() .incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName()); @@ -656,19 +635,18 @@ public void onResponse(Frame responseFrame) { LOG.trace("[{}] Got error response, processing", logPrefix); processErrorResponse((Error) responseMessage); } else { + ExecutionInfo executionInfo = defaultExecutionInfo().build(); trackNodeError( - node, new IllegalStateException("Unexpected response " + responseMessage), + executionInfo, nodeResponseTimeNanos); setFinalError( - statement, - new IllegalStateException("Unexpected response " + responseMessage), - node, - execution); + executionInfo, new IllegalStateException("Unexpected response " + responseMessage)); } } catch (Throwable t) { - trackNodeError(node, t, nodeResponseTimeNanos); - setFinalError(statement, t, node, execution); + ExecutionInfo executionInfo = defaultExecutionInfo().build(); + trackNodeError(t, executionInfo, nodeResponseTimeNanos); + setFinalError(executionInfo, t); } } @@ -702,6 +680,7 @@ private void processErrorResponse(Error errorMessage) { .start() .handle( (repreparedId, exception) -> { + ExecutionInfo executionInfo = defaultExecutionInfo().build(); if (exception != null) { // If the error is not recoverable, surface it to the client instead of retrying if (exception instanceof UnexpectedResponseException) { @@ -714,18 +693,18 @@ private void processErrorResponse(Error errorMessage) { || prepareError instanceof FunctionFailureException || prepareError instanceof ProtocolError) { LOG.trace("[{}] Unrecoverable error on reprepare, rethrowing", logPrefix); - trackNodeError(node, prepareError, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, prepareError, node, execution); + trackNodeError(prepareError, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, prepareError); return null; } } } else if (exception instanceof RequestThrottlingException) { - trackNodeError(node, exception, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, exception, node, execution); + trackNodeError(exception, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, exception); return null; } recordError(node, exception); - trackNodeError(node, exception, NANOTIME_NOT_MEASURED_YET); + trackNodeError(exception, executionInfo, NANOTIME_NOT_MEASURED_YET); LOG.trace("[{}] Reprepare failed, trying next node", logPrefix); sendRequest(statement, null, queryPlan, execution, retryCount, false); } else { @@ -739,8 +718,9 @@ private void processErrorResponse(Error errorMessage) { + "the statement was prepared.", Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId))); - trackNodeError(node, illegalStateException, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, illegalStateException, node, execution); + trackNodeError( + illegalStateException, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, illegalStateException); } LOG.trace("[{}] Reprepare sucessful, retrying", logPrefix); sendRequest(statement, node, queryPlan, execution, retryCount, false); @@ -753,16 +733,18 @@ private void processErrorResponse(Error errorMessage) { NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater(); if (error instanceof BootstrappingException) { LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node); + ExecutionInfo executionInfo = defaultExecutionInfo().build(); recordError(node, error); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); sendRequest(statement, null, queryPlan, execution, retryCount, false); } else if (error instanceof QueryValidationException || error instanceof FunctionFailureException || error instanceof ProtocolError) { LOG.trace("[{}] Unrecoverable error, rethrowing", logPrefix); metricUpdater.incrementCounter(DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName()); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, error, node, execution); + ExecutionInfo executionInfo = defaultExecutionInfo().build(); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, error); } else { RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile); RetryVerdict verdict; @@ -833,10 +815,11 @@ private void processErrorResponse(Error errorMessage) { private void processRetryVerdict(RetryVerdict verdict, Throwable error) { LOG.trace("[{}] Processing retry decision {}", logPrefix, verdict); + ExecutionInfo executionInfo = defaultExecutionInfo().build(); switch (verdict.getRetryDecision()) { case RETRY_SAME: recordError(node, error); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); sendRequest( verdict.getRetryRequest(statement), node, @@ -847,7 +830,7 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) { break; case RETRY_NEXT: recordError(node, error); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); sendRequest( verdict.getRetryRequest(statement), null, @@ -857,8 +840,8 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) { false); break; case RETHROW: - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, error, node, execution); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, error); break; case IGNORE: setFinalResult(Void.INSTANCE, null, true, this); @@ -884,7 +867,7 @@ private void updateErrorMetrics( metricUpdater.incrementCounter(ignoresOnError, executionProfile.getName()); break; case RETHROW: - // nothing do do + // nothing to do } } @@ -904,11 +887,10 @@ public void onFailure(Throwable error) { RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile); verdict = retryPolicy.onRequestAbortedVerdict(statement, error, retryCount); } catch (Throwable cause) { + ExecutionInfo executionInfo = defaultExecutionInfo().build(); setFinalError( - statement, - new IllegalStateException("Unexpected error while invoking the retry policy", cause), - null, - execution); + executionInfo, + new IllegalStateException("Unexpected error while invoking the retry policy", cause)); return; } } @@ -936,7 +918,8 @@ public void cancel() { * measured. If {@link #NANOTIME_NOT_MEASURED_YET}, it hasn't and we need to measure it now * (this is to avoid unnecessary calls to System.nanoTime) */ - private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNanos) { + private void trackNodeError( + Throwable error, ExecutionInfo executionInfo, long nodeResponseTimeNanos) { if (requestTracker instanceof NoopRequestTracker) { return; } @@ -944,21 +927,18 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; - ExecutionInfo executionInfo = - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile); requestTracker.onNodeError( - statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); + statement, + error, + latencyNanos, + executionProfile, + executionInfo.getCoordinator(), + executionInfo, + logPrefix); + } + + private DefaultExecutionInfo.Builder defaultExecutionInfo() { + return CqlRequestHandler.this.defaultExecutionInfo(this, execution); } @Override @@ -966,4 +946,37 @@ public String toString() { return logPrefix; } } + + private DefaultExecutionInfo.Builder defaultExecutionInfo(NodeResponseCallback callback) { + return defaultExecutionInfo(callback, callback.execution); + } + + private DefaultExecutionInfo.Builder defaultExecutionInfo( + NodeResponseCallback callback, int execution) { + return new DefaultExecutionInfo.Builder( + callback.statement, + callback.node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + session, + context, + executionProfile); + } + + private DefaultExecutionInfo.Builder failedExecutionInfoNoRequestSent() { + return failedExecutionInfoNoRequestSent(initialStatement); + } + + private DefaultExecutionInfo.Builder failedExecutionInfoNoRequestSent(Statement statement) { + return new DefaultExecutionInfo.Builder( + statement, + null, + startedSpeculativeExecutionsCount.get(), + -1, + errors, + session, + context, + executionProfile); + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java index 3ab57ddc598..1dd4354d6e6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java @@ -28,6 +28,8 @@ import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.protocol.internal.Frame; +import com.datastax.oss.protocol.internal.response.Result; +import com.datastax.oss.protocol.internal.response.result.Rows; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.nio.ByteBuffer; @@ -57,7 +59,7 @@ public class DefaultExecutionInfo implements ExecutionInfo { private final InternalDriverContext context; private final DriverExecutionProfile executionProfile; - public DefaultExecutionInfo( + private DefaultExecutionInfo( Request request, Node coordinator, int speculativeExecutionCount, @@ -102,6 +104,11 @@ public Request getRequest() { return request; } + @Override + public DriverExecutionProfile getExecutionProfile() { + return executionProfile; + } + @Nullable @Override public Node getCoordinator() { @@ -189,4 +196,91 @@ public int getResponseSizeInBytes() { public int getCompressedResponseSizeInBytes() { return compressedResponseSizeInBytes; } + + public static Builder builder( + Request request, + Node coordinator, + int speculativeExecutionCount, + int successfulExecutionIndex, + List> errors, + DefaultSession session, + InternalDriverContext context, + DriverExecutionProfile executionProfile) { + return new Builder( + request, + coordinator, + speculativeExecutionCount, + successfulExecutionIndex, + errors, + session, + context, + executionProfile); + } + + public static class Builder { + private final Request request; + private final Node coordinator; + private final int speculativeExecutionCount; + private final int successfulExecutionIndex; + private final List> errors; + private final DefaultSession session; + private final InternalDriverContext context; + private final DriverExecutionProfile executionProfile; + + private Result response; + private Frame frame; + private boolean schemaInAgreement = true; + + public Builder( + Request request, + Node coordinator, + int speculativeExecutionCount, + int successfulExecutionIndex, + List> errors, + DefaultSession session, + InternalDriverContext context, + DriverExecutionProfile executionProfile) { + this.request = request; + this.coordinator = coordinator; + this.speculativeExecutionCount = speculativeExecutionCount; + this.successfulExecutionIndex = successfulExecutionIndex; + this.errors = errors; + this.session = session; + this.context = context; + this.executionProfile = executionProfile; + } + + public Builder withServerResponse(Result response, Frame frame) { + this.response = response; + this.frame = frame; + return this; + } + + /** Client received a response, but it could not be parsed to expected message. */ + public Builder withServerResponse(Frame frame) { + return withServerResponse(null, frame); + } + + public Builder withSchemaInAgreement(boolean schemaInAgreement) { + this.schemaInAgreement = schemaInAgreement; + return this; + } + + public DefaultExecutionInfo build() { + final ByteBuffer pagingState = + (response instanceof Rows) ? ((Rows) response).getMetadata().pagingState : null; + return new DefaultExecutionInfo( + request, + coordinator, + speculativeExecutionCount, + successfulExecutionIndex, + errors, + pagingState, + frame, + schemaInAgreement, + session, + context, + executionProfile); + } + } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java index c5a9d6d4cb2..533fe3be87a 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java @@ -18,11 +18,13 @@ package com.datastax.oss.driver.internal.core.cql; import static com.datastax.oss.driver.Assertions.assertThatStage; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockingDetails; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -31,13 +33,18 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.servererrors.BootstrappingException; +import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.response.Error; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletionStage; import org.junit.Test; +import org.mockito.invocation.Invocation; public class CqlRequestHandlerTrackerTest extends CqlRequestHandlerTestBase { @Test @@ -93,9 +100,32 @@ public void should_invoke_request_tracker() { any(String.class)); verifyNoMoreInteractions(requestTracker); }); + + // verify that passed ExecutionInfo object had correct details + List invocations = + new ArrayList<>(mockingDetails(requestTracker).getInvocations()); + checkExecutionInfo( + (ExecutionInfo) invocations.get(0).getRawArguments()[5], + UNDEFINED_IDEMPOTENCE_STATEMENT, + node1); + checkExecutionInfo( + (ExecutionInfo) invocations.get(1).getRawArguments()[4], + UNDEFINED_IDEMPOTENCE_STATEMENT, + node2); + checkExecutionInfo( + (ExecutionInfo) invocations.get(2).getRawArguments()[4], + UNDEFINED_IDEMPOTENCE_STATEMENT, + node2); } } + private void checkExecutionInfo( + ExecutionInfo executionInfo, Request expectedRequest, Node expectedNode) { + assertThat(executionInfo.getRequest()).isEqualTo(expectedRequest); + assertThat(executionInfo.getExecutionProfile()).isNotNull(); + assertThat(executionInfo.getCoordinator()).isEqualTo(expectedNode); + } + @Test public void should_not_invoke_noop_request_tracker() { try (RequestHandlerTestHarness harness =