diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 18c591166e720..fd376fcd07688 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -39,6 +39,7 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV9; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; @@ -49,6 +50,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.tasks.MockTaskManager; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; @@ -584,8 +586,13 @@ public void sendRequest( // poor mans request cloning... BytesStreamOutput bStream = new BytesStreamOutput(); request.writeTo(bStream); - RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action); - final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput()); + final TransportRequest clonedRequest; + if (request instanceof BytesTransportRequest) { + clonedRequest = copyRawBytesForBwC(bStream); + } else { + RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action); + clonedRequest = reg.newRequest(bStream.bytes().streamInput()); + } assert clonedRequest.getClass().equals(MasterNodeRequestHelper.unwrapTermOverride(request).getClass()) : clonedRequest + " vs " + request; @@ -633,6 +640,15 @@ protected void doRun() throws IOException { } } + // Some request handlers read back a BytesTransportRequest + // into a different class that cannot be re-serialized (i.e. JOIN_VALIDATE_ACTION_NAME), + // in those cases we just copy the raw bytes back to a BytesTransportRequest. + // This is only needed for the BwC for JOIN_VALIDATE_ACTION_NAME and can be removed in the next major + @UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_COORDINATION) + private static TransportRequest copyRawBytesForBwC(BytesStreamOutput bStream) throws IOException { + return new BytesTransportRequest(bStream.bytes().streamInput()); + } + @Override public void clearCallback() { synchronized (this) {