diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index 94db419f5a7f9..e530080863cc6 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; +import java.util.List; import java.util.Map; import static org.elasticsearch.rest.RestStatus.OK; @@ -24,6 +25,7 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) public class IncrementalBulkRestIT extends HttpSmokeTestCase { + @SuppressWarnings("unchecked") public void testIncrementalBulk() throws IOException { Request createRequest = new Request("PUT", "/index_name"); createRequest.setJsonEntity(""" @@ -58,6 +60,9 @@ public void testIncrementalBulk() throws IOException { indexSuccessFul.getEntity().getContent(), true ); + + assertFalse((Boolean) responseMap.get("errors")); + assertThat(((List) responseMap.get("items")).size(), equalTo(1000)); } public void testIncrementalMalformed() throws IOException { @@ -86,6 +91,6 @@ public void testIncrementalMalformed() throws IOException { bulkRequest.setJsonEntity(bulk.toString()); - ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); + expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); } } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index f12a9d461cd4b..c6140cadfd375 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -195,6 +195,8 @@ public void bulk(BulkRequest request, ActionListener listener) { public void testIncrementalParsing() { ArrayList> docs = new ArrayList<>(); + AtomicBoolean isLast = new AtomicBoolean(false); + AtomicBoolean next = new AtomicBoolean(false); FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withMethod(RestRequest.Method.POST) @@ -205,13 +207,11 @@ public ChunkHandler handler() { } @Override - public void setHandler(ChunkHandler chunkHandler) { - - } + public void setHandler(ChunkHandler chunkHandler) {} @Override public void next() { - + next.set(true); } }) .withHeaders(Map.of("Content-Type", Collections.singletonList("application/json"))) @@ -233,18 +233,51 @@ public void addItems(List> items, Releasable releasable, Runn public void lastItems(List> items, Releasable releasable, ActionListener listener) { releasable.close(); docs.addAll(items); + isLast.set(true); } } ); chunkHandler.accept(channel); - chunkHandler.handleChunk( - channel, - ReleasableBytesReference.wrap(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n")), - false - ); + ReleasableBytesReference r1 = new ReleasableBytesReference(new BytesArray("{\"index\":{\"_index\":\"index_name\"}}\n"), () -> {}); + chunkHandler.handleChunk(channel, r1, false); assertThat(docs, empty()); - chunkHandler.handleChunk(channel, ReleasableBytesReference.wrap(new BytesArray("{\"field\":1}")), false); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + + ReleasableBytesReference r2 = new ReleasableBytesReference(new BytesArray("{\"field\":1}"), () -> {}); + chunkHandler.handleChunk(channel, r2, false); assertThat(docs, empty()); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + assertTrue(r1.hasReferences()); + assertTrue(r2.hasReferences()); + + ReleasableBytesReference r3 = new ReleasableBytesReference(new BytesArray("\n{\"delete\":"), () -> {}); + chunkHandler.handleChunk(channel, r3, false); + assertThat(docs, hasSize(1)); + assertFalse(next.get()); + assertFalse(isLast.get()); + assertFalse(r1.hasReferences()); + assertFalse(r2.hasReferences()); + assertTrue(r3.hasReferences()); + + ReleasableBytesReference r4 = new ReleasableBytesReference(new BytesArray("{\"_index\":\"test\",\"_id\":\"2\"}}"), () -> {}); + chunkHandler.handleChunk(channel, r4, false); + assertThat(docs, hasSize(1)); + assertTrue(next.get()); + next.set(false); + assertFalse(isLast.get()); + + ReleasableBytesReference r5 = new ReleasableBytesReference(new BytesArray("\n"), () -> {}); + chunkHandler.handleChunk(channel, r5, true); + assertThat(docs, hasSize(2)); + assertFalse(next.get()); + assertTrue(isLast.get()); + assertFalse(r3.hasReferences()); + assertFalse(r4.hasReferences()); + assertFalse(r5.hasReferences()); } }