From 1bdb92d9078f3a1d05c558ba6f5952a7917f6e21 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 18:57:48 -0600 Subject: [PATCH] Change --- .../http/IncrementalBulkRestIT.java | 10 +++----- .../action/bulk/IncrementalBulkService.java | 3 +-- .../rest/action/document/RestBulkAction.java | 23 +++++++++---------- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index 663964223840c..94db419f5a7f9 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -75,7 +75,7 @@ public void testIncrementalMalformed() throws IOException { final Response indexCreatedResponse = getRestClient().performRequest(createRequest); assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); - Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); + Request bulkRequest = new Request("POST", "/index_name/_bulk"); // index documents for the rollup job final StringBuilder bulk = new StringBuilder(); @@ -84,12 +84,8 @@ public void testIncrementalMalformed() throws IOException { bulk.append("{}\n"); bulk.append("\r\n"); - successfulIndexingRequest.setJsonEntity(bulk.toString()); - - ResponseException responseException = expectThrows( - ResponseException.class, - () -> getRestClient().performRequest(successfulIndexingRequest) - ); + bulkRequest.setJsonEntity(bulk.toString()); + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 7c2dd3c4f9a76..a7ba5980260d3 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -130,8 +130,7 @@ public void lastItems(List> items, Releasable releasable, Act public void onResponse(BulkResponse bulkResponse) { responses.add(bulkResponse); releaseCurrentReferences(); - BulkResponse response = combineResponses(); - listener.onResponse(response); + listener.onResponse(combineResponses()); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index cd6d6f283f892..be2acbaefdc3b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -186,11 +186,8 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo } final BytesReference data; - final Releasable releasable; + int bytesConsumed; try { - // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in - // BulkRequest#add is fine - unParsedChunks.add(chunk); if (unParsedChunks.size() > 1) { @@ -199,7 +196,9 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo data = chunk; } - int bytesConsumed = parser.incrementalParse( + // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in + // BulkRequest#add is fine + bytesConsumed = parser.incrementalParse( data, defaultIndex, defaultRouting, @@ -217,21 +216,24 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo stringDeduplicator ); - releasable = accountParsing(bytesConsumed); - } catch (Exception e) { + // TODO: This needs to be better + Releasables.close(handler); + Releasables.close(unParsedChunks); + unParsedChunks.clear(); new RestToXContentListener<>(channel).onFailure(e); isException = true; return; } + final Releasable releasable = accountParsing(bytesConsumed); if (isLast) { assert unParsedChunks.isEmpty(); assert channel != null; handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel)); items.clear(); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), releasable, () -> { request.contentStream().next(); }); + handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next()); items.clear(); } else { releasable.close(); @@ -240,8 +242,6 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo @Override public void close() { - Releasables.close(handler); - Releasables.close(unParsedChunks); RequestBodyChunkConsumer.super.close(); } @@ -268,7 +268,6 @@ public boolean supportsBulkContent() { @Override public boolean allowsUnsafeBuffers() { - // TODO: Does this change with the chunking? - return true; + return false; } }