diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index c023b00ec820f..127e967742e9b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -111,6 +111,7 @@ import org.elasticsearch.readiness.ReadinessService; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.SearchService; @@ -243,6 +244,7 @@ public void apply(Settings value, Settings current, Settings previous) { Metadata.SETTING_READ_ONLY_SETTING, Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE, + RestBulkAction.INCREMENTAL_BULK, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 055a23a5fa546..fc0dda1b45fe1 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -9,13 +9,16 @@ package org.elasticsearch.rest.action.document; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestParser; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.IncrementalBulkService; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; @@ -37,6 +40,7 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.PUT; @@ -51,9 +55,12 @@ */ @ServerlessScope(Scope.PUBLIC) public class RestBulkAction extends BaseRestHandler { + public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; + public static final Setting INCREMENTAL_BULK = boolSetting("rest.incremental_bulk", true, Setting.Property.NodeScope); private final boolean allowExplicitIndex; + private final boolean incrementalBulk; private final ThreadContext threadContext; private volatile IncrementalBulkService bulkHandler; @@ -64,6 +71,7 @@ public RestBulkAction(Settings settings) { public RestBulkAction(Settings settings, ThreadContext threadContext) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.threadContext = threadContext; + this.incrementalBulk = INCREMENTAL_BULK.get(settings); } @Override @@ -85,26 +93,61 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - // TODO: Move this to CTOR and hook everything up - synchronized (this) { - if (bulkHandler == null) { - bulkHandler = new IncrementalBulkService(client, threadContext); + if (incrementalBulk == false) { + if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { + request.param("type"); } - } + BulkRequest bulkRequest = new BulkRequest(); + String defaultIndex = request.param("index"); + String defaultRouting = request.param("routing"); + FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request); + String defaultPipeline = request.param("pipeline"); + boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); + } + Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false); + boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false); + bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); + bulkRequest.setRefreshPolicy(request.param("refresh")); + bulkRequest.add( + request.requiredContent(), + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + request.getXContentType(), + request.getRestApiVersion() + ); - if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { - request.param("type"); - } + return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel)); + } else { + // TODO: Move this to CTOR and hook everything up + synchronized (this) { + if (bulkHandler == null) { + bulkHandler = new IncrementalBulkService(client, threadContext); + } + } - return new ChunkHandler( - allowExplicitIndex, - request, - bulkHandler.newBulkRequest( - request.param("wait_for_active_shards"), - request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), - request.param("refresh") - ) - ); + if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { + request.param("type"); + } + + return new ChunkHandler( + allowExplicitIndex, + request, + bulkHandler.newBulkRequest( + request.param("wait_for_active_shards"), + request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), + request.param("refresh") + ) + ); + } } private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { @@ -195,8 +238,10 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel)); items.clear(); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next()); + handler.addItems(new ArrayList<>(items), releasable, () -> { request.contentStream().next(); }); items.clear(); + } else { + releasable.close(); } } @@ -214,7 +259,6 @@ private Releasable accountParsing(int bytesConsumed) { releasables.add(reference); if (bytesConsumed >= reference.length()) { bytesConsumed -= reference.length(); - releasables.add(reference); } else { unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed)); bytesConsumed = 0; diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index caeb0f36a1000..838f96b25e099 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -50,7 +50,7 @@ public void bulk(BulkRequest request, ActionListener listener) { }; final Map params = new HashMap<>(); params.put("pipeline", "timestamps"); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} {"field1":"val1"} @@ -82,20 +82,21 @@ public void bulk(BulkRequest request, ActionListener listener) { }; Map params = new HashMap<>(); { - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) + .handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(false)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(false)); @@ -103,40 +104,42 @@ public void bulk(BulkRequest request, ActionListener listener) { { params.put("list_executed_pipelines", "true"); bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) + .handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(true)); } { bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1", "list_executed_pipelines": "false"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) + .handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1", "list_executed_pipelines": "false"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(false)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(true)); @@ -144,20 +147,21 @@ public void bulk(BulkRequest request, ActionListener listener) { { params.remove("list_executed_pipelines"); bulkCalled.set(false); - new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest( - new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") - .withParams(params) - .withContent(new BytesArray(""" - {"index":{"_id":"1", "list_executed_pipelines": "true"}} - {"field1":"val1"} - {"index":{"_id":"2"}} - {"field1":"val2"} - """), XContentType.JSON) - .withMethod(RestRequest.Method.POST) - .build(), - mock(RestChannel.class), - verifyingClient - ); + new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()) + .handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withParams(params) + .withContent(new BytesArray(""" + {"index":{"_id":"1", "list_executed_pipelines": "true"}} + {"field1":"val1"} + {"index":{"_id":"2"}} + {"field1":"val2"} + """), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(), + mock(RestChannel.class), + verifyingClient + ); assertThat(bulkCalled.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest1.get(), equalTo(true)); assertThat(listExecutedPipelinesRequest2.get(), equalTo(false));