diff --git a/core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java b/core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java index 758f6b358ed..c10fdf1ce3c 100644 --- a/core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java +++ b/core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java @@ -18,8 +18,10 @@ package com.datastax.dse.driver.api.core.graph; import com.datastax.oss.driver.api.core.DefaultProtocolVersion; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy; +import edu.umd.cs.findbugs.annotations.Nullable; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -37,6 +39,12 @@ public interface GraphExecutionInfo { /** The statement that was executed. */ GraphStatement getStatement(); + /** @return Execution profile applied when executing given request. */ + @Nullable + default DriverExecutionProfile getExecutionProfile() { + return null; + } + /** The node that was used as a coordinator to successfully complete the query. */ Node getCoordinator(); diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index 9a7be344721..3969baaa651 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -19,11 +19,13 @@ import com.datastax.dse.driver.api.core.DseProtocolVersion; import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet; +import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet; import com.datastax.dse.driver.internal.core.DseProtocolFeature; import com.datastax.dse.driver.internal.core.cql.DseConversions; import com.datastax.dse.protocol.internal.request.Revise; import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata; import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.AsyncPagingIterable; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.DriverTimeoutException; import com.datastax.oss.driver.api.core.NodeUnavailableException; @@ -626,7 +628,7 @@ public void operationComplete(@NonNull Future future) { Throwable error = future.cause(); if (error instanceof EncoderException && error.getCause() instanceof FrameTooLongException) { - trackNodeError(node, error.getCause()); + trackNodeError(node, error.getCause(), null); lock.lock(); try { abort(error.getCause(), false); @@ -643,7 +645,7 @@ public void operationComplete(@NonNull Future future) { .getMetricUpdater() .incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName()); recordError(node, error); - trackNodeError(node, error.getCause()); + trackNodeError(node, error.getCause(), null); sendRequest(statement, null, executionIndex, retryCount, scheduleSpeculativeExecution); } } else { @@ -738,7 +740,8 @@ private void onPageTimeout(int expectedPage) { * Invoked when a continuous paging response is received, either a successful or failed one. * *

Delegates further processing to appropriate methods: {@link #processResultResponse(Result, - * Frame)} if the response was successful, or {@link #processErrorResponse(Error)} if it wasn't. + * Frame)} if the response was successful, or {@link #processErrorResponse(Error, Frame)} if it + * wasn't. * * @param response the received {@link Frame}. */ @@ -759,15 +762,15 @@ public void onResponse(@NonNull Frame response) { processResultResponse((Result) responseMessage, response); } else if (responseMessage instanceof Error) { LOG.trace("[{}] Got error response", logPrefix); - processErrorResponse((Error) responseMessage); + processErrorResponse((Error) responseMessage, response); } else { IllegalStateException error = new IllegalStateException("Unexpected response " + responseMessage); - trackNodeError(node, error); + trackNodeError(node, error, response); abort(error, false); } } catch (Throwable t) { - trackNodeError(node, t); + trackNodeError(node, t, response); abort(t, false); } } finally { @@ -830,7 +833,8 @@ public void onFailure(@NonNull Throwable error) { private void processResultResponse(@NonNull Result result, @Nullable Frame frame) { assert lock.isHeldByCurrentThread(); try { - ExecutionInfo executionInfo = createExecutionInfo(result, frame); + ExecutionInfo executionInfo = + createExecutionInfo().withServerResponse(result, frame).build(); if (result instanceof Rows) { DseRowsMetadata rowsMetadata = (DseRowsMetadata) ((Rows) result).getMetadata(); if (columnDefinitions == null) { @@ -901,7 +905,7 @@ private void processResultResponse(@NonNull Result result, @Nullable Frame frame * @param errorMessage the error message received. */ @SuppressWarnings("GuardedBy") // this method is only called with the lock held - private void processErrorResponse(@NonNull Error errorMessage) { + private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame frame) { assert lock.isHeldByCurrentThread(); if (errorMessage instanceof Unprepared) { processUnprepared((Unprepared) errorMessage); @@ -910,7 +914,7 @@ private void processErrorResponse(@NonNull Error errorMessage) { if (error instanceof BootstrappingException) { LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node); recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, frame); sendRequest(statement, null, executionIndex, retryCount, false); } else if (error instanceof QueryValidationException || error instanceof FunctionFailureException @@ -922,7 +926,7 @@ private void processErrorResponse(@NonNull Error errorMessage) { NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater(); metricUpdater.incrementCounter( DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName()); - trackNodeError(node, error); + trackNodeError(node, error, frame); abort(error, true); } else { try { @@ -1061,7 +1065,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) { + "This usually happens when you run a 'USE...' query after " + "the statement was prepared.", Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId))); - trackNodeError(node, illegalStateException); + trackNodeError(node, illegalStateException, null); fatalError = illegalStateException; } else { LOG.trace( @@ -1080,18 +1084,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) { || prepareError instanceof FunctionFailureException || prepareError instanceof ProtocolError) { LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix); - trackNodeError(node, prepareError); + trackNodeError(node, prepareError, null); fatalError = prepareError; } } } else if (exception instanceof RequestThrottlingException) { - trackNodeError(node, exception); + trackNodeError(node, exception, null); fatalError = exception; } if (fatalError == null) { LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix); recordError(node, exception); - trackNodeError(node, exception); + trackNodeError(node, exception, null); sendRequest(statement, null, executionIndex, retryCount, false); } } @@ -1119,18 +1123,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab switch (verdict.getRetryDecision()) { case RETRY_SAME: recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, null); sendRequest( verdict.getRetryRequest(statement), node, executionIndex, retryCount + 1, false); break; case RETRY_NEXT: recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, null); sendRequest( verdict.getRetryRequest(statement), null, executionIndex, retryCount + 1, false); break; case RETHROW: - trackNodeError(node, error); + trackNodeError(node, error, null); abort(error, true); break; case IGNORE: @@ -1443,12 +1447,20 @@ private void reenableAutoReadIfNeeded() { // ERROR HANDLING - private void trackNodeError(@NonNull Node node, @NonNull Throwable error) { + private void trackNodeError( + @NonNull Node node, @NonNull Throwable error, @Nullable Frame frame) { if (nodeErrorReported.compareAndSet(false, true)) { long latencyNanos = System.nanoTime() - this.messageStartTimeNanos; context .getRequestTracker() - .onNodeError(this.statement, error, latencyNanos, executionProfile, node, logPrefix); + .onNodeError( + this.statement, + error, + latencyNanos, + executionProfile, + node, + createExecutionInfo().withServerResponse(frame).build(), + logPrefix); } } @@ -1562,21 +1574,32 @@ private void completeResultSetFuture( if (resultSetClass.isInstance(pageOrError)) { if (future.complete(resultSetClass.cast(pageOrError))) { throttler.signalSuccess(ContinuousRequestHandlerBase.this); + + ExecutionInfo executionInfo = null; + if (pageOrError instanceof AsyncPagingIterable) { + executionInfo = ((AsyncPagingIterable) pageOrError).getExecutionInfo(); + } else if (pageOrError instanceof AsyncGraphResultSet) { + executionInfo = ((AsyncGraphResultSet) pageOrError).getRequestExecutionInfo(); + } + if (nodeSuccessReported.compareAndSet(false, true)) { context .getRequestTracker() - .onNodeSuccess(statement, nodeLatencyNanos, executionProfile, node, logPrefix); + .onNodeSuccess( + statement, nodeLatencyNanos, executionProfile, node, executionInfo, logPrefix); } context .getRequestTracker() - .onSuccess(statement, totalLatencyNanos, executionProfile, node, logPrefix); + .onSuccess( + statement, totalLatencyNanos, executionProfile, node, executionInfo, logPrefix); } } else { Throwable error = (Throwable) pageOrError; if (future.completeExceptionally(error)) { context .getRequestTracker() - .onError(statement, error, totalLatencyNanos, executionProfile, node, logPrefix); + .onError( + statement, error, totalLatencyNanos, executionProfile, node, null, logPrefix); if (error instanceof DriverTimeoutException) { throttler.signalTimeout(ContinuousRequestHandlerBase.this); session @@ -1590,18 +1613,13 @@ private void completeResultSetFuture( } @NonNull - private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Frame response) { - ByteBuffer pagingState = - result instanceof Rows ? ((Rows) result).getMetadata().pagingState : null; - return new DefaultExecutionInfo( + private DefaultExecutionInfo.Builder createExecutionInfo() { + return DefaultExecutionInfo.builder( statement, node, startedSpeculativeExecutionsCount.get(), executionIndex, errors, - pagingState, - response, - true, session, context, executionProfile); diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java index b6472f690d3..7500c26d691 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java @@ -18,6 +18,7 @@ package com.datastax.dse.driver.internal.core.graph; import com.datastax.dse.driver.api.core.graph.GraphStatement; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.QueryTrace; import com.datastax.oss.driver.api.core.cql.Statement; @@ -62,6 +63,11 @@ public Statement getStatement() { throw new ClassCastException("GraphStatement cannot be cast to Statement"); } + @Override + public DriverExecutionProfile getExecutionProfile() { + return graphExecutionInfo.getExecutionProfile(); + } + @Nullable @Override public Node getCoordinator() { @@ -146,6 +152,11 @@ public GraphStatement getStatement() { return (GraphStatement) executionInfo.getRequest(); } + @Override + public DriverExecutionProfile getExecutionProfile() { + return executionInfo.getExecutionProfile(); + } + @Override public Node getCoordinator() { return executionInfo.getCoordinator(); diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java index 702da69b855..dbdb58d60b9 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java @@ -329,7 +329,18 @@ private void cancelScheduledTasks() { private void setFinalResult( Result resultMessage, Frame responseFrame, NodeResponseCallback callback) { try { - ExecutionInfo executionInfo = buildExecutionInfo(callback, responseFrame); + ExecutionInfo executionInfo = + DefaultExecutionInfo.builder( + callback.statement, + callback.node, + startedSpeculativeExecutionsCount.get(), + callback.execution, + errors, + session, + context, + callback.executionProfile) + .withServerResponse(resultMessage, responseFrame) + .build(); DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(callback.statement, context); GraphProtocol subProtocol = @@ -360,9 +371,19 @@ private void setFinalResult( totalLatencyNanos = completionTimeNanos - startTimeNanos; long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos; requestTracker.onNodeSuccess( - callback.statement, nodeLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + nodeLatencyNanos, + executionProfile, + callback.node, + executionInfo, + logPrefix); requestTracker.onSuccess( - callback.statement, totalLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + totalLatencyNanos, + executionProfile, + callback.node, + executionInfo, + logPrefix); } if (sessionMetricUpdater.isEnabled( DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) { @@ -416,23 +437,6 @@ private void logServerWarnings(GraphStatement statement, List warning LOG.warn("Query '{}' generated server side warning(s): {}", statementString, warning)); } - private ExecutionInfo buildExecutionInfo(NodeResponseCallback callback, Frame responseFrame) { - DriverExecutionProfile executionProfile = - Conversions.resolveExecutionProfile(callback.statement, context); - return new DefaultExecutionInfo( - callback.statement, - callback.node, - startedSpeculativeExecutionsCount.get(), - callback.execution, - errors, - null, - responseFrame, - true, - session, - context, - executionProfile); - } - @Override public void onThrottleFailure(@NonNull RequestThrottlingException error) { DriverExecutionProfile executionProfile = @@ -446,27 +450,26 @@ private void setFinalError( GraphStatement statement, Throwable error, Node node, int execution) { DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(statement, context); + ExecutionInfo executionInfo = + DefaultExecutionInfo.builder( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + session, + context, + executionProfile) + .build(); if (error instanceof DriverException) { - ((DriverException) error) - .setExecutionInfo( - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile)); + ((DriverException) error).setExecutionInfo(executionInfo); } if (result.completeExceptionally(error)) { cancelScheduledTasks(); if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; - requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onError( + statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); @@ -859,7 +862,8 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; - requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onNodeError( + statement, error, latencyNanos, executionProfile, node, null, logPrefix); } @Override diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java b/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java index 40cfca827d1..43e1f2515b2 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.DriverException; import com.datastax.oss.driver.api.core.RequestThrottlingException; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.detach.AttachmentPoint; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.retry.RetryDecision; @@ -66,6 +67,12 @@ default Request getRequest() { @Deprecated Statement getStatement(); + /** @return Execution profile applied when executing given request. */ + @Nullable + default DriverExecutionProfile getExecutionProfile() { + return null; + } + /** * The node that acted as a coordinator for the query. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java index d29ee48d352..bf623d3de9c 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.api.core.tracker; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -36,38 +37,147 @@ public interface RequestTracker extends AutoCloseable { /** - * @deprecated This method only exists for backward compatibility. Override {@link - * #onSuccess(Request, long, DriverExecutionProfile, Node, String)} instead. + * Invoked each time a request succeeds. + * + * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, + * GenericType) session.execute} call until the result is made available to the client). + * @param executionProfile the execution profile of this request. + * @param node the node that returned the successful response. + * @param executionInfo the execution info containing the results of this request + * @param requestLogPrefix the dedicated log prefix for this request */ - @Deprecated default void onSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, - @NonNull Node node) {} + @NonNull Node node, + @NonNull ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { + // delegate call to the old method + onSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); + } /** - * Invoked each time a request succeeds. + * Invoked each time a request fails. + * + * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, + * GenericType) session.execute} call until the error is propagated to the client). + * @param executionProfile the execution profile of this request. + * @param node the node that returned the error response, or {@code null} if the error occurred + * @param executionInfo the execution info being returned to the client for this request if + * available + * @param requestLogPrefix the dedicated log prefix for this request + */ + default void onError( + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @Nullable Node node, + @Nullable ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { + // delegate call to the old method + onError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); + } + + /** + * Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request, + * long, DriverExecutionProfile, Node, String)} but at per node level. * * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, * GenericType) session.execute} call until the result is made available to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. + * @param executionInfo the execution info containing the results of this request + * @param requestLogPrefix the dedicated log prefix for this request + */ + default void onNodeSuccess( + @NonNull Request request, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { + // delegate call to the old method + onNodeSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); + } + + /** + * Invoked each time a request fails at the node level. Similar to {@link #onError(Request, + * Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level. + * + * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, + * GenericType) session.execute} call until the error is propagated to the client). + * @param executionProfile the execution profile of this request. + * @param node the node that returned the error response. + * @param executionInfo the execution info containing the results of this request if available * @param requestLogPrefix the dedicated log prefix for this request */ + default void onNodeError( + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @Nullable ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { + // delegate call to the old method + onNodeError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); + } + + /** + * Invoked when the session is ready to process user requests. + * + *

WARNING: if you use {@code session.execute()} in your tracker implementation, keep in + * mind that those requests will in turn recurse back into {@code onSuccess} / {@code onError} + * methods. Make sure you don't trigger an infinite loop; one way to do that is to use a + * custom execution profile for internal requests. + * + *

This corresponds to the moment when {@link SessionBuilder#build()} returns, or the future + * returned by {@link SessionBuilder#buildAsync()} completes. If the session initialization fails, + * this method will not get called. + * + *

Listener methods are invoked from different threads; if you store the session in a field, + * make it at least volatile to guarantee proper publication. + * + *

This method is guaranteed to be the first one invoked on this object. + * + *

The default implementation is empty. + */ + default void onSessionReady(@NonNull Session session) {} + + // ----- Below methods have been deprecated and will be removed in next releases ----- + + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. + */ + @Deprecated + default void onSuccess( + @NonNull Request request, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node) {} + + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. + */ + @Deprecated default void onSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, @NonNull String requestLogPrefix) { - // If client doesn't override onSuccess with requestLogPrefix delegate call to the old method + // delegate call to the old method onSuccess(request, latencyNanos, executionProfile, node); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onError(Request, Throwable, long, DriverExecutionProfile, Node, String)} instead. + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. */ @Deprecated default void onError( @@ -78,14 +188,11 @@ default void onError( @Nullable Node node) {} /** - * Invoked each time a request fails. - * - * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, - * GenericType) session.execute} call until the error is propagated to the client). - * @param executionProfile the execution profile of this request. - * @param node the node that returned the error response, or {@code null} if the error occurred - * @param requestLogPrefix the dedicated log prefix for this request + * @deprecated This method only exists for backward compatibility. Override {@link + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. */ + @Deprecated default void onError( @NonNull Request request, @NonNull Throwable error, @@ -93,13 +200,14 @@ default void onError( @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, @NonNull String requestLogPrefix) { - // If client doesn't override onError with requestLogPrefix delegate call to the old method + // delegate call to the old method onError(request, error, latencyNanos, executionProfile, node); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String)} instead. + * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, + * String)} instead. */ @Deprecated default void onNodeError( @@ -110,15 +218,11 @@ default void onNodeError( @NonNull Node node) {} /** - * Invoked each time a request fails at the node level. Similar to {@link #onError(Request, - * Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level. - * - * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, - * GenericType) session.execute} call until the error is propagated to the client). - * @param executionProfile the execution profile of this request. - * @param node the node that returned the error response. - * @param requestLogPrefix the dedicated log prefix for this request + * @deprecated This method only exists for backward compatibility. Override {@link + * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, + * String)} instead. */ + @Deprecated default void onNodeError( @NonNull Request request, @NonNull Throwable error, @@ -126,13 +230,14 @@ default void onNodeError( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, @NonNull String requestLogPrefix) { - // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method + // delegate call to the old method onNodeError(request, error, latencyNanos, executionProfile, node); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String)} instead. + * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. */ @Deprecated default void onNodeSuccess( @@ -142,44 +247,18 @@ default void onNodeSuccess( @NonNull Node node) {} /** - * Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request, - * long, DriverExecutionProfile, Node, String)} but at per node level. - * - * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, - * GenericType) session.execute} call until the result is made available to the client). - * @param executionProfile the execution profile of this request. - * @param node the node that returned the successful response. - * @param requestLogPrefix the dedicated log prefix for this request + * @deprecated This method only exists for backward compatibility. Override {@link + * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. */ + @Deprecated default void onNodeSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, @NonNull String requestLogPrefix) { - // If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old - // method + // delegate call to the old method onNodeSuccess(request, latencyNanos, executionProfile, node); } - - /** - * Invoked when the session is ready to process user requests. - * - *

WARNING: if you use {@code session.execute()} in your tracker implementation, keep in - * mind that those requests will in turn recurse back into {@code onSuccess} / {@code onError} - * methods. Make sure you don't trigger an infinite loop; one way to do that is to use a - * custom execution profile for internal requests. - * - *

This corresponds to the moment when {@link SessionBuilder#build()} returns, or the future - * returned by {@link SessionBuilder#buildAsync()} completes. If the session initialization fails, - * this method will not get called. - * - *

Listener methods are invoked from different threads; if you store the session in a field, - * make it at least volatile to guarantee proper publication. - * - *

This method is guaranteed to be the first one invoked on this object. - * - *

The default implementation is empty. - */ - default void onSessionReady(@NonNull Session session) {} } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index a1c6b0e5466..27c7b2a87b3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -66,7 +66,6 @@ import com.datastax.oss.protocol.internal.response.Error; import com.datastax.oss.protocol.internal.response.Result; import com.datastax.oss.protocol.internal.response.error.Unprepared; -import com.datastax.oss.protocol.internal.response.result.Rows; import com.datastax.oss.protocol.internal.response.result.SchemaChange; import com.datastax.oss.protocol.internal.response.result.SetKeyspace; import com.datastax.oss.protocol.internal.response.result.Void; @@ -205,12 +204,12 @@ private Timeout scheduleTimeout(Duration timeoutDuration) { if (timeoutDuration.toNanos() > 0) { try { return this.timer.newTimeout( - (Timeout timeout1) -> - setFinalError( - initialStatement, - new DriverTimeoutException("Query timed out after " + timeoutDuration), - null, - -1), + (Timeout timeout1) -> { + ExecutionInfo executionInfo = failedExecutionInfoNoRequestSent().build(); + setFinalError( + executionInfo, + new DriverTimeoutException("Query timed out after " + timeoutDuration)); + }, timeoutDuration.toNanos(), TimeUnit.NANOSECONDS); } catch (IllegalStateException e) { @@ -263,7 +262,8 @@ private void sendRequest( // We've reached the end of the query plan without finding any node to write to if (!result.isDone() && activeExecutionsCount.decrementAndGet() == 0) { // We're the last execution so fail the result - setFinalError(statement, AllNodesFailedException.fromErrors(this.errors), null, -1); + ExecutionInfo executionInfo = failedExecutionInfoNoRequestSent(statement).build(); + setFinalError(executionInfo, AllNodesFailedException.fromErrors(this.errors)); } } else { NodeResponseCallback nodeResponseCallback = @@ -318,7 +318,10 @@ private void setFinalResult( NodeResponseCallback callback) { try { ExecutionInfo executionInfo = - buildExecutionInfo(callback, resultMessage, responseFrame, schemaInAgreement); + defaultExecutionInfo(callback) + .withServerResponse(resultMessage, responseFrame) + .withSchemaInAgreement(schemaInAgreement) + .build(); AsyncResultSet resultSet = Conversions.toResultSet(resultMessage, executionInfo, session, context); if (result.complete(resultSet)) { @@ -334,9 +337,19 @@ private void setFinalResult( totalLatencyNanos = completionTimeNanos - startTimeNanos; long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos; requestTracker.onNodeSuccess( - callback.statement, nodeLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + nodeLatencyNanos, + executionProfile, + callback.node, + executionInfo, + logPrefix); requestTracker.onSuccess( - callback.statement, totalLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + totalLatencyNanos, + executionProfile, + callback.node, + executionInfo, + logPrefix); } if (sessionMetricUpdater.isEnabled( DefaultSessionMetric.CQL_REQUESTS, executionProfile.getName())) { @@ -358,7 +371,9 @@ private void setFinalResult( logServerWarnings(callback.statement, executionProfile, executionInfo.getWarnings()); } } catch (Throwable error) { - setFinalError(callback.statement, error, callback.node, -1); + // something unpredictable unexpected happened here that we can't blame on the request itself + ExecutionInfo executionInfo = defaultExecutionInfo(callback, -1).build(); + setFinalError(executionInfo, error); } } @@ -389,61 +404,36 @@ private void logServerWarnings( LOG.warn("Query '{}' generated server side warning(s): {}", statementString, warning)); } - private ExecutionInfo buildExecutionInfo( - NodeResponseCallback callback, - Result resultMessage, - Frame responseFrame, - boolean schemaInAgreement) { - ByteBuffer pagingState = - (resultMessage instanceof Rows) ? ((Rows) resultMessage).getMetadata().pagingState : null; - return new DefaultExecutionInfo( - callback.statement, - callback.node, - startedSpeculativeExecutionsCount.get(), - callback.execution, - errors, - pagingState, - responseFrame, - schemaInAgreement, - session, - context, - executionProfile); - } - @Override public void onThrottleFailure(@NonNull RequestThrottlingException error) { sessionMetricUpdater.incrementCounter( DefaultSessionMetric.THROTTLING_ERRORS, executionProfile.getName()); - setFinalError(initialStatement, error, null, -1); + ExecutionInfo executionInfo = failedExecutionInfoNoRequestSent().build(); + setFinalError(executionInfo, error); } - private void setFinalError(Statement statement, Throwable error, Node node, int execution) { + private void setFinalError(ExecutionInfo executionInfo, Throwable error) { if (error instanceof DriverException) { - ((DriverException) error) - .setExecutionInfo( - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile)); + ((DriverException) error).setExecutionInfo(executionInfo); } if (result.completeExceptionally(error)) { cancelScheduledTasks(); if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; - requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onError( + executionInfo.getRequest(), + error, + latencyNanos, + executionInfo.getExecutionProfile(), + executionInfo.getCoordinator(), + executionInfo, + logPrefix); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); sessionMetricUpdater.incrementCounter( - DefaultSessionMetric.CQL_CLIENT_TIMEOUTS, executionProfile.getName()); + DefaultSessionMetric.CQL_CLIENT_TIMEOUTS, + executionInfo.getExecutionProfile().getName()); } else if (!(error instanceof RequestThrottlingException)) { throttler.signalError(this, error); } @@ -491,23 +481,25 @@ private NodeResponseCallback( this.logPrefix = logPrefix + "|" + execution; } - // this gets invoked once the write completes. + // this gets invoked once the write request completes. @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { Throwable error = future.cause(); + ExecutionInfo executionInfo = CqlRequestHandler.this.defaultExecutionInfo(this).build(); if (error instanceof EncoderException && error.getCause() instanceof FrameTooLongException) { - trackNodeError(node, error.getCause(), NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, error.getCause(), node, execution); + trackNodeError(error.getCause(), executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, error.getCause()); } else { LOG.trace( "[{}] Failed to send request on {}, trying next node (cause: {})", logPrefix, channel, + error.getMessage(), error); recordError(node, error); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); ((DefaultNode) node) .getMetricUpdater() .incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName()); @@ -643,19 +635,18 @@ public void onResponse(Frame responseFrame) { LOG.trace("[{}] Got error response, processing", logPrefix); processErrorResponse((Error) responseMessage); } else { + ExecutionInfo executionInfo = defaultExecutionInfo().build(); trackNodeError( - node, new IllegalStateException("Unexpected response " + responseMessage), + executionInfo, nodeResponseTimeNanos); setFinalError( - statement, - new IllegalStateException("Unexpected response " + responseMessage), - node, - execution); + executionInfo, new IllegalStateException("Unexpected response " + responseMessage)); } } catch (Throwable t) { - trackNodeError(node, t, nodeResponseTimeNanos); - setFinalError(statement, t, node, execution); + ExecutionInfo executionInfo = defaultExecutionInfo().build(); + trackNodeError(t, executionInfo, nodeResponseTimeNanos); + setFinalError(executionInfo, t); } } @@ -689,6 +680,7 @@ private void processErrorResponse(Error errorMessage) { .start() .handle( (repreparedId, exception) -> { + ExecutionInfo executionInfo = defaultExecutionInfo().build(); if (exception != null) { // If the error is not recoverable, surface it to the client instead of retrying if (exception instanceof UnexpectedResponseException) { @@ -701,18 +693,18 @@ private void processErrorResponse(Error errorMessage) { || prepareError instanceof FunctionFailureException || prepareError instanceof ProtocolError) { LOG.trace("[{}] Unrecoverable error on reprepare, rethrowing", logPrefix); - trackNodeError(node, prepareError, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, prepareError, node, execution); + trackNodeError(prepareError, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, prepareError); return null; } } } else if (exception instanceof RequestThrottlingException) { - trackNodeError(node, exception, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, exception, node, execution); + trackNodeError(exception, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, exception); return null; } recordError(node, exception); - trackNodeError(node, exception, NANOTIME_NOT_MEASURED_YET); + trackNodeError(exception, executionInfo, NANOTIME_NOT_MEASURED_YET); LOG.trace("[{}] Reprepare failed, trying next node", logPrefix); sendRequest(statement, null, queryPlan, execution, retryCount, false); } else { @@ -726,8 +718,9 @@ private void processErrorResponse(Error errorMessage) { + "the statement was prepared.", Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId))); - trackNodeError(node, illegalStateException, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, illegalStateException, node, execution); + trackNodeError( + illegalStateException, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, illegalStateException); } LOG.trace("[{}] Reprepare sucessful, retrying", logPrefix); sendRequest(statement, node, queryPlan, execution, retryCount, false); @@ -740,16 +733,18 @@ private void processErrorResponse(Error errorMessage) { NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater(); if (error instanceof BootstrappingException) { LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node); + ExecutionInfo executionInfo = defaultExecutionInfo().build(); recordError(node, error); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); sendRequest(statement, null, queryPlan, execution, retryCount, false); } else if (error instanceof QueryValidationException || error instanceof FunctionFailureException || error instanceof ProtocolError) { LOG.trace("[{}] Unrecoverable error, rethrowing", logPrefix); metricUpdater.incrementCounter(DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName()); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, error, node, execution); + ExecutionInfo executionInfo = defaultExecutionInfo().build(); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, error); } else { RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile); RetryVerdict verdict; @@ -820,10 +815,11 @@ private void processErrorResponse(Error errorMessage) { private void processRetryVerdict(RetryVerdict verdict, Throwable error) { LOG.trace("[{}] Processing retry decision {}", logPrefix, verdict); + ExecutionInfo executionInfo = defaultExecutionInfo().build(); switch (verdict.getRetryDecision()) { case RETRY_SAME: recordError(node, error); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); sendRequest( verdict.getRetryRequest(statement), node, @@ -834,7 +830,7 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) { break; case RETRY_NEXT: recordError(node, error); - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); sendRequest( verdict.getRetryRequest(statement), null, @@ -844,8 +840,8 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) { false); break; case RETHROW: - trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET); - setFinalError(statement, error, node, execution); + trackNodeError(error, executionInfo, NANOTIME_NOT_MEASURED_YET); + setFinalError(executionInfo, error); break; case IGNORE: setFinalResult(Void.INSTANCE, null, true, this); @@ -871,7 +867,7 @@ private void updateErrorMetrics( metricUpdater.incrementCounter(ignoresOnError, executionProfile.getName()); break; case RETHROW: - // nothing do do + // nothing to do } } @@ -891,11 +887,10 @@ public void onFailure(Throwable error) { RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile); verdict = retryPolicy.onRequestAbortedVerdict(statement, error, retryCount); } catch (Throwable cause) { + ExecutionInfo executionInfo = defaultExecutionInfo().build(); setFinalError( - statement, - new IllegalStateException("Unexpected error while invoking the retry policy", cause), - null, - execution); + executionInfo, + new IllegalStateException("Unexpected error while invoking the retry policy", cause)); return; } } @@ -923,7 +918,8 @@ public void cancel() { * measured. If {@link #NANOTIME_NOT_MEASURED_YET}, it hasn't and we need to measure it now * (this is to avoid unnecessary calls to System.nanoTime) */ - private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNanos) { + private void trackNodeError( + Throwable error, ExecutionInfo executionInfo, long nodeResponseTimeNanos) { if (requestTracker instanceof NoopRequestTracker) { return; } @@ -931,7 +927,18 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; - requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onNodeError( + statement, + error, + latencyNanos, + executionProfile, + executionInfo.getCoordinator(), + executionInfo, + logPrefix); + } + + private DefaultExecutionInfo.Builder defaultExecutionInfo() { + return CqlRequestHandler.this.defaultExecutionInfo(this, execution); } @Override @@ -939,4 +946,37 @@ public String toString() { return logPrefix; } } + + private DefaultExecutionInfo.Builder defaultExecutionInfo(NodeResponseCallback callback) { + return defaultExecutionInfo(callback, callback.execution); + } + + private DefaultExecutionInfo.Builder defaultExecutionInfo( + NodeResponseCallback callback, int execution) { + return new DefaultExecutionInfo.Builder( + callback.statement, + callback.node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + session, + context, + executionProfile); + } + + private DefaultExecutionInfo.Builder failedExecutionInfoNoRequestSent() { + return failedExecutionInfoNoRequestSent(initialStatement); + } + + private DefaultExecutionInfo.Builder failedExecutionInfoNoRequestSent(Statement statement) { + return new DefaultExecutionInfo.Builder( + statement, + null, + startedSpeculativeExecutionsCount.get(), + -1, + errors, + session, + context, + executionProfile); + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java index 3ab57ddc598..1dd4354d6e6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultExecutionInfo.java @@ -28,6 +28,8 @@ import com.datastax.oss.driver.internal.core.session.DefaultSession; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.protocol.internal.Frame; +import com.datastax.oss.protocol.internal.response.Result; +import com.datastax.oss.protocol.internal.response.result.Rows; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.nio.ByteBuffer; @@ -57,7 +59,7 @@ public class DefaultExecutionInfo implements ExecutionInfo { private final InternalDriverContext context; private final DriverExecutionProfile executionProfile; - public DefaultExecutionInfo( + private DefaultExecutionInfo( Request request, Node coordinator, int speculativeExecutionCount, @@ -102,6 +104,11 @@ public Request getRequest() { return request; } + @Override + public DriverExecutionProfile getExecutionProfile() { + return executionProfile; + } + @Nullable @Override public Node getCoordinator() { @@ -189,4 +196,91 @@ public int getResponseSizeInBytes() { public int getCompressedResponseSizeInBytes() { return compressedResponseSizeInBytes; } + + public static Builder builder( + Request request, + Node coordinator, + int speculativeExecutionCount, + int successfulExecutionIndex, + List> errors, + DefaultSession session, + InternalDriverContext context, + DriverExecutionProfile executionProfile) { + return new Builder( + request, + coordinator, + speculativeExecutionCount, + successfulExecutionIndex, + errors, + session, + context, + executionProfile); + } + + public static class Builder { + private final Request request; + private final Node coordinator; + private final int speculativeExecutionCount; + private final int successfulExecutionIndex; + private final List> errors; + private final DefaultSession session; + private final InternalDriverContext context; + private final DriverExecutionProfile executionProfile; + + private Result response; + private Frame frame; + private boolean schemaInAgreement = true; + + public Builder( + Request request, + Node coordinator, + int speculativeExecutionCount, + int successfulExecutionIndex, + List> errors, + DefaultSession session, + InternalDriverContext context, + DriverExecutionProfile executionProfile) { + this.request = request; + this.coordinator = coordinator; + this.speculativeExecutionCount = speculativeExecutionCount; + this.successfulExecutionIndex = successfulExecutionIndex; + this.errors = errors; + this.session = session; + this.context = context; + this.executionProfile = executionProfile; + } + + public Builder withServerResponse(Result response, Frame frame) { + this.response = response; + this.frame = frame; + return this; + } + + /** Client received a response, but it could not be parsed to expected message. */ + public Builder withServerResponse(Frame frame) { + return withServerResponse(null, frame); + } + + public Builder withSchemaInAgreement(boolean schemaInAgreement) { + this.schemaInAgreement = schemaInAgreement; + return this; + } + + public DefaultExecutionInfo build() { + final ByteBuffer pagingState = + (response instanceof Rows) ? ((Rows) response).getMetadata().pagingState : null; + return new DefaultExecutionInfo( + request, + coordinator, + speculativeExecutionCount, + successfulExecutionIndex, + errors, + pagingState, + frame, + schemaInAgreement, + session, + context, + executionProfile); + } + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 47edcdfe53e..42b3358dfa6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -240,6 +241,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { updateResponseTimes(node); } @@ -251,6 +253,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String logPrefix) { updateResponseTimes(node); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java index d4d20f3eb78..3173a56b1df 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.internal.core.tracker; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -82,9 +83,12 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { invokeTrackers( - tracker -> tracker.onSuccess(request, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onSuccess( + request, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onSuccess"); } @@ -96,9 +100,12 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String logPrefix) { invokeTrackers( - tracker -> tracker.onError(request, error, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onError( + request, error, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onError"); } @@ -109,9 +116,12 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { invokeTrackers( - tracker -> tracker.onNodeSuccess(request, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onNodeSuccess( + request, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onNodeSuccess"); } @@ -123,10 +133,12 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + ExecutionInfo executionInfo, @NonNull String logPrefix) { invokeTrackers( tracker -> - tracker.onNodeError(request, error, latencyNanos, executionProfile, node, logPrefix), + tracker.onNodeError( + request, error, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onNodeError"); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java index 09ac27e5e75..921a1135c03 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java @@ -19,10 +19,12 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import net.jcip.annotations.ThreadSafe; /** @@ -42,6 +44,7 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String requestPrefix) { // nothing to do } @@ -53,6 +56,7 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String requestPrefix) { // nothing to do } @@ -64,6 +68,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String requestPrefix) { // nothing to do } @@ -74,6 +79,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String requestPrefix) { // nothing to do } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java index 235ef051b40..fa51e281071 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java @@ -20,11 +20,13 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.SessionBuilder; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import java.time.Duration; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; @@ -86,6 +88,7 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { boolean successEnabled = @@ -139,6 +142,7 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED, false)) { @@ -183,6 +187,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String logPrefix) { // Nothing to do } @@ -193,6 +198,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { // Nothing to do } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java index a816183e9ee..29709e0b11d 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java @@ -31,6 +31,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.matches; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -498,6 +499,7 @@ public void should_invoke_request_tracker(DseProtocolVersion version) { anyLong(), any(DriverExecutionProfile.class), eq(node1), + nullable(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onNodeSuccess( @@ -505,6 +507,7 @@ public void should_invoke_request_tracker(DseProtocolVersion version) { anyLong(), any(DriverExecutionProfile.class), eq(node2), + any(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onSuccess( @@ -512,6 +515,7 @@ public void should_invoke_request_tracker(DseProtocolVersion version) { anyLong(), any(DriverExecutionProfile.class), eq(node2), + any(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verifyNoMoreInteractions(requestTracker); }); diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java index 9f325003610..09713b4ac6e 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java @@ -56,6 +56,7 @@ import com.datastax.oss.driver.api.core.Version; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.internal.core.cql.Conversions; @@ -521,6 +522,7 @@ public void should_invoke_request_tracker_and_update_metrics( anyLong(), any(DriverExecutionProfile.class), eq(node), + any(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onNodeSuccess( @@ -528,6 +530,7 @@ public void should_invoke_request_tracker_and_update_metrics( anyLong(), any(DriverExecutionProfile.class), eq(node), + any(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verifyNoMoreInteractions(requestTracker); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java index ecc087fb8ac..533fe3be87a 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java @@ -18,10 +18,13 @@ package com.datastax.oss.driver.internal.core.cql; import static com.datastax.oss.driver.Assertions.assertThatStage; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockingDetails; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -29,16 +32,21 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.servererrors.BootstrappingException; +import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.response.Error; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletionStage; import org.junit.Test; +import org.mockito.invocation.Invocation; public class CqlRequestHandlerTrackerTest extends CqlRequestHandlerTestBase { - @Test public void should_invoke_request_tracker() { try (RequestHandlerTestHarness harness = @@ -72,6 +80,7 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node1), + nullable(ExecutionInfo.class), any(String.class)); verify(requestTracker) .onNodeSuccess( @@ -79,6 +88,7 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node2), + any(ExecutionInfo.class), any(String.class)); verify(requestTracker) .onSuccess( @@ -86,12 +96,36 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node2), + any(ExecutionInfo.class), any(String.class)); verifyNoMoreInteractions(requestTracker); }); + + // verify that passed ExecutionInfo object had correct details + List invocations = + new ArrayList<>(mockingDetails(requestTracker).getInvocations()); + checkExecutionInfo( + (ExecutionInfo) invocations.get(0).getRawArguments()[5], + UNDEFINED_IDEMPOTENCE_STATEMENT, + node1); + checkExecutionInfo( + (ExecutionInfo) invocations.get(1).getRawArguments()[4], + UNDEFINED_IDEMPOTENCE_STATEMENT, + node2); + checkExecutionInfo( + (ExecutionInfo) invocations.get(2).getRawArguments()[4], + UNDEFINED_IDEMPOTENCE_STATEMENT, + node2); } } + private void checkExecutionInfo( + ExecutionInfo executionInfo, Request expectedRequest, Node expectedNode) { + assertThat(executionInfo.getRequest()).isEqualTo(expectedRequest); + assertThat(executionInfo.getExecutionProfile()).isNotNull(); + assertThat(executionInfo.getCoordinator()).isEqualTo(expectedNode); + } + @Test public void should_not_invoke_noop_request_tracker() { try (RequestHandlerTestHarness harness = diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java index bcc6439a2a5..f2009b4fb9b 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java @@ -22,6 +22,7 @@ import static org.mockito.BDDMockito.given; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; @@ -34,6 +35,7 @@ public class DefaultLoadBalancingPolicyRequestTrackerTest extends LoadBalancingP @Mock Request request; @Mock DriverExecutionProfile profile; + @Mock ExecutionInfo executionInfo; final String logPrefix = "lbp-test-log-prefix"; private DefaultLoadBalancingPolicy policy; @@ -65,7 +67,7 @@ public void should_record_first_response_time_on_node_success() { nextNanoTime = 123; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -83,7 +85,7 @@ public void should_record_second_response_time_on_node_success() { nextNanoTime = 456; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -107,8 +109,8 @@ public void should_record_further_response_times_on_node_success() { nextNanoTime = 789; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); - policy.onNodeSuccess(request, 0, profile, node2, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); + policy.onNodeSuccess(request, 0, profile, node2, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -133,7 +135,7 @@ public void should_record_first_response_time_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -152,7 +154,7 @@ public void should_record_second_response_time_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -177,8 +179,8 @@ public void should_record_further_response_times_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); - policy.onNodeError(request, iae, 0, profile, node2, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); + policy.onNodeError(request, iae, 0, profile, node2, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java index 8dcad99b459..adbad606712 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java @@ -28,6 +28,7 @@ import ch.qos.logback.core.Appender; import com.datastax.oss.driver.api.core.DriverExecutionException; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -51,6 +52,7 @@ public class MultiplexingRequestTrackerTest { @Mock private DriverExecutionProfile profile; @Mock private Node node; @Mock private Session session; + @Mock private ExecutionInfo executionInfo; @Mock private Appender appender; @Captor private ArgumentCaptor loggingEventCaptor; @@ -111,12 +113,12 @@ public void should_notify_onSuccess() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onSuccess(request, 123456L, profile, node, "test"); + .onSuccess(request, 123456L, profile, node, executionInfo, "test"); // when - tracker.onSuccess(request, 123456L, profile, node, "test"); + tracker.onSuccess(request, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onSuccess(request, 123456L, profile, node, "test"); - verify(child2).onSuccess(request, 123456L, profile, node, "test"); + verify(child1).onSuccess(request, 123456L, profile, node, executionInfo, "test"); + verify(child2).onSuccess(request, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -129,12 +131,12 @@ public void should_notify_onError() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onError(request, error, 123456L, profile, node, "test"); + .onError(request, error, 123456L, profile, node, executionInfo, "test"); // when - tracker.onError(request, error, 123456L, profile, node, "test"); + tracker.onError(request, error, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onError(request, error, 123456L, profile, node, "test"); - verify(child2).onError(request, error, 123456L, profile, node, "test"); + verify(child1).onError(request, error, 123456L, profile, node, executionInfo, "test"); + verify(child2).onError(request, error, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -147,12 +149,12 @@ public void should_notify_onNodeSuccess() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onNodeSuccess(request, 123456L, profile, node, "test"); + .onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); // when - tracker.onNodeSuccess(request, 123456L, profile, node, "test"); + tracker.onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onNodeSuccess(request, 123456L, profile, node, "test"); - verify(child2).onNodeSuccess(request, 123456L, profile, node, "test"); + verify(child1).onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); + verify(child2).onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -165,12 +167,12 @@ public void should_notify_onNodeError() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onNodeError(request, error, 123456L, profile, node, "test"); + .onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); // when - tracker.onNodeError(request, error, 123456L, profile, node, "test"); + tracker.onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onNodeError(request, error, 123456L, profile, node, "test"); - verify(child2).onNodeError(request, error, 123456L, profile, node, "test"); + verify(child1).onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); + verify(child2).onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java index eae98339637..f22475b5aca 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java @@ -20,11 +20,13 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.internal.core.tracker.RequestLogFormatter; import com.datastax.oss.driver.internal.core.tracker.RequestLogger; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; public class RequestNodeLoggerExample extends RequestLogger { @@ -39,6 +41,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String logPrefix) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED)) { return; @@ -75,6 +78,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { boolean successEnabled = executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED);