diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index ebe35689d93f9..52d114224450c 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -40,9 +40,56 @@ public void testIndexingPressureStats() throws IOException { assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); + successfulIndexingRequest.setJsonEntity(""" { "index" : { "_index" : "index_name" } } - { "field1" : "value1" } + { "field" : "value1" } + { "index" : { "_index" : "index_name" } } + { "field" : "value2" } + { "index" : { "_index" : "index_name" } } + { "field" : "value3" } + { "index" : { "_index" : "index_name" } } + { "field" : "value4" } + { "index" : { "_index" : "index_name" } } + { "field" : "value5" } + { "index" : { "_index" : "index_name" } } + { "field" : "value6" } + { "index" : { "_index" : "index_name" } } + { "field" : "value7" } + { "index" : { "_index" : "index_name" } } + { "field" : "value8" } + { "index" : { "_index" : "index_name" } } + { "field" : "value9" } + { "index" : { "_index" : "index_name" } } + { "field" : "value10" } + { "index" : { "_index" : "index_name" } } + { "field" : "value11" } + { "index" : { "_index" : "index_name" } } + { "field" : "value12" } + { "index" : { "_index" : "index_name" } } + { "field" : "value13" } + { "index" : { "_index" : "index_name" } } + { "field" : "value14" } + { "index" : { "_index" : "index_name" } } + { "field" : "value15" } + { "index" : { "_index" : "index_name" } } + { "field" : "value16" } + { "index" : { "_index" : "index_name" } } + { "field" : "value17" } + { "index" : { "_index" : "index_name" } } + { "field" : "value18" } + { "index" : { "_index" : "index_name" } } + { "field" : "value19" } + { "index" : { "_index" : "index_name" } } + { "field" : "value20" } + { "index" : { "_index" : "index_name" } } + { "field" : "value21" } + { "index" : { "_index" : "index_name" } } + { "field" : "value22" } + { "index" : { "_index" : "index_name" } } + { "field" : "value23" } + { "index" : { "_index" : "index_name" } } + { "field" : "value24" } """); final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index ea9cffc472202..d2c64e62e4e00 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -103,7 +103,7 @@ public void onFailure(Exception e) { private boolean shouldBackOff() { // TODO: Implement Real Memory Logic - return bulkRequest.requests().size() >= 16; + return false; } public void lastItems(List> items, Releasable releasable, ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 6ea8d1abced43..055a23a5fa546 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -13,10 +13,12 @@ import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; @@ -94,11 +96,20 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC request.param("type"); } - return new ChunkHandler(request); + return new ChunkHandler( + allowExplicitIndex, + request, + bulkHandler.newBulkRequest( + request.param("wait_for_active_shards"), + request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), + request.param("refresh") + ) + ); } - private class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { + private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { + private final boolean allowExplicitIndex; private final RestRequest request; private final Map stringDeduplicator = new HashMap<>(); @@ -116,7 +127,8 @@ private class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); private final ArrayList> items = new ArrayList<>(4); - private ChunkHandler(RestRequest request) { + private ChunkHandler(boolean allowExplicitIndex, RestRequest request, IncrementalBulkService.Handler handler) { + this.allowExplicitIndex = allowExplicitIndex; this.request = request; this.defaultIndex = request.param("index"); this.defaultRouting = request.param("routing"); @@ -126,11 +138,7 @@ private ChunkHandler(RestRequest request) { this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); this.parser = new BulkRequestParser(true, request.getRestApiVersion()); - handler = bulkHandler.newBulkRequest( - request.param("wait_for_active_shards"), - request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), - request.param("refresh") - ); + this.handler = handler; } @Override @@ -142,7 +150,8 @@ public void accept(RestChannel restChannel) throws Exception { public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { assert channel == restChannel; - final ReleasableBytesReference data; + final BytesReference data; + final Releasable releasable; try { // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in // BulkRequest#add is fine @@ -150,11 +159,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo unParsedChunks.add(chunk); if (unParsedChunks.size() > 1) { - ReleasableBytesReference[] bytesReferences = unParsedChunks.toArray(new ReleasableBytesReference[0]); - data = new ReleasableBytesReference( - CompositeBytesReference.of(bytesReferences), - () -> Releasables.close(bytesReferences) - ); + data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0])); } else { data = chunk; } @@ -177,7 +182,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo stringDeduplicator ); - accountParsing(bytesConsumed); + releasable = accountParsing(bytesConsumed); } catch (IOException e) { // TODO: Exception Handling @@ -187,10 +192,10 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo if (isLast) { assert unParsedChunks.isEmpty(); assert channel != null; - handler.lastItems(new ArrayList<>(items), data, new RestRefCountedChunkedToXContentListener<>(channel)); + handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel)); items.clear(); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), data, () -> request.contentStream().next()); + handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next()); items.clear(); } } @@ -202,16 +207,20 @@ public void close() { RequestBodyChunkConsumer.super.close(); } - private void accountParsing(int bytesConsumed) { + private Releasable accountParsing(int bytesConsumed) { + ArrayList releasables = new ArrayList<>(unParsedChunks.size()); while (bytesConsumed > 0) { ReleasableBytesReference reference = unParsedChunks.removeFirst(); + releasables.add(reference); if (bytesConsumed >= reference.length()) { bytesConsumed -= reference.length(); + releasables.add(reference); } else { unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed)); bytesConsumed = 0; } } + return () -> Releasables.close(releasables); } }