diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index fc2ae139205d4..dda28fb7da2f6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -10,6 +10,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; @@ -94,6 +95,11 @@ public BulkRequest(StreamInput in) throws IOException { for (DocWriteRequest request : requests) { indices.add(Objects.requireNonNull(request.index(), "request index must not be null")); } + if (in.getTransportVersion().onOrAfter(TransportVersions.BULK_INCREMENTAL_STATE)) { + incrementalState = new BulkRequest.IncrementalState(in); + } else { + incrementalState = BulkRequest.IncrementalState.EMPTY; + } } public BulkRequest(@Nullable String globalIndex) { @@ -445,6 +451,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(requests, DocWriteRequest::writeDocumentRequest); refreshPolicy.writeTo(out); out.writeTimeValue(timeout); + if (out.getTransportVersion().onOrAfter(TransportVersions.BULK_INCREMENTAL_STATE)) { + incrementalState.writeTo(out); + } } @Override