Skip to content

Commit

Permalink
Change
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Aug 27, 2024
1 parent 3c8d8c0 commit 2b3bcd5
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.elasticsearch.readiness.ReadinessService;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
Expand Down Expand Up @@ -243,6 +244,7 @@ public void apply(Settings value, Settings current, Settings previous) {
Metadata.SETTING_READ_ONLY_SETTING,
Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
RestBulkAction.INCREMENTAL_BULK,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
package org.elasticsearch.rest.action.document;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestParser;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Releasable;
Expand All @@ -37,6 +40,7 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;

Expand All @@ -51,9 +55,12 @@
*/
@ServerlessScope(Scope.PUBLIC)
public class RestBulkAction extends BaseRestHandler {

public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting("rest.incremental_bulk", true, Setting.Property.NodeScope);

private final boolean allowExplicitIndex;
private final boolean incrementalBulk;
private final ThreadContext threadContext;
private volatile IncrementalBulkService bulkHandler;

Expand All @@ -64,6 +71,7 @@ public RestBulkAction(Settings settings) {
public RestBulkAction(Settings settings, ThreadContext threadContext) {
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
this.threadContext = threadContext;
this.incrementalBulk = INCREMENTAL_BULK.get(settings);
}

@Override
Expand All @@ -85,26 +93,61 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
// TODO: Move this to CTOR and hook everything up
synchronized (this) {
if (bulkHandler == null) {
bulkHandler = new IncrementalBulkService(client, threadContext);
if (incrementalBulk == false) {
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
}
BulkRequest bulkRequest = new BulkRequest();
String defaultIndex = request.param("index");
String defaultRouting = request.param("routing");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
request.getRestApiVersion()
);

if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
} else {
// TODO: Move this to CTOR and hook everything up
synchronized (this) {
if (bulkHandler == null) {
bulkHandler = new IncrementalBulkService(client, threadContext);
}
}

return new ChunkHandler(
allowExplicitIndex,
request,
bulkHandler.newBulkRequest(
request.param("wait_for_active_shards"),
request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT),
request.param("refresh")
)
);
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}

return new ChunkHandler(
allowExplicitIndex,
request,
bulkHandler.newBulkRequest(
request.param("wait_for_active_shards"),
request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT),
request.param("refresh")
)
);
}
}

private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
Expand Down Expand Up @@ -195,8 +238,10 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel));
items.clear();
} else if (items.isEmpty() == false) {
handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next());
handler.addItems(new ArrayList<>(items), releasable, () -> { request.contentStream().next(); });
items.clear();
} else {
releasable.close();
}
}

Expand All @@ -214,7 +259,6 @@ private Releasable accountParsing(int bytesConsumed) {
releasables.add(reference);
if (bytesConsumed >= reference.length()) {
bytesConsumed -= reference.length();
releasables.add(reference);
} else {
unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed));
bytesConsumed = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
};
final Map<String, String> params = new HashMap<>();
params.put("pipeline", "timestamps");
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build()).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
{"index":{"_id":"1"}}
{"field1":"val1"}
Expand Down Expand Up @@ -82,82 +82,86 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
};
Map<String, String> params = new HashMap<>();
{
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
{"index":{"_id":"1"}}
{"field1":"val1"}
{"index":{"_id":"2"}}
{"field1":"val2"}
"""), XContentType.JSON)
.withMethod(RestRequest.Method.POST)
.build(),
mock(RestChannel.class),
verifyingClient
);
new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build())
.handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
{"index":{"_id":"1"}}
{"field1":"val1"}
{"index":{"_id":"2"}}
{"field1":"val2"}
"""), XContentType.JSON)
.withMethod(RestRequest.Method.POST)
.build(),
mock(RestChannel.class),
verifyingClient
);
assertThat(bulkCalled.get(), equalTo(true));
assertThat(listExecutedPipelinesRequest1.get(), equalTo(false));
assertThat(listExecutedPipelinesRequest2.get(), equalTo(false));
}
{
params.put("list_executed_pipelines", "true");
bulkCalled.set(false);
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
{"index":{"_id":"1"}}
{"field1":"val1"}
{"index":{"_id":"2"}}
{"field1":"val2"}
"""), XContentType.JSON)
.withMethod(RestRequest.Method.POST)
.build(),
mock(RestChannel.class),
verifyingClient
);
new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build())
.handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
{"index":{"_id":"1"}}
{"field1":"val1"}
{"index":{"_id":"2"}}
{"field1":"val2"}
"""), XContentType.JSON)
.withMethod(RestRequest.Method.POST)
.build(),
mock(RestChannel.class),
verifyingClient
);
assertThat(bulkCalled.get(), equalTo(true));
assertThat(listExecutedPipelinesRequest1.get(), equalTo(true));
assertThat(listExecutedPipelinesRequest2.get(), equalTo(true));
}
{
bulkCalled.set(false);
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
{"index":{"_id":"1", "list_executed_pipelines": "false"}}
{"field1":"val1"}
{"index":{"_id":"2"}}
{"field1":"val2"}
"""), XContentType.JSON)
.withMethod(RestRequest.Method.POST)
.build(),
mock(RestChannel.class),
verifyingClient
);
new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build())
.handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
{"index":{"_id":"1", "list_executed_pipelines": "false"}}
{"field1":"val1"}
{"index":{"_id":"2"}}
{"field1":"val2"}
"""), XContentType.JSON)
.withMethod(RestRequest.Method.POST)
.build(),
mock(RestChannel.class),
verifyingClient
);
assertThat(bulkCalled.get(), equalTo(true));
assertThat(listExecutedPipelinesRequest1.get(), equalTo(false));
assertThat(listExecutedPipelinesRequest2.get(), equalTo(true));
}
{
params.remove("list_executed_pipelines");
bulkCalled.set(false);
new RestBulkAction(settings(IndexVersion.current()).build()).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
{"index":{"_id":"1", "list_executed_pipelines": "true"}}
{"field1":"val1"}
{"index":{"_id":"2"}}
{"field1":"val2"}
"""), XContentType.JSON)
.withMethod(RestRequest.Method.POST)
.build(),
mock(RestChannel.class),
verifyingClient
);
new RestBulkAction(settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build())
.handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
.withContent(new BytesArray("""
{"index":{"_id":"1", "list_executed_pipelines": "true"}}
{"field1":"val1"}
{"index":{"_id":"2"}}
{"field1":"val2"}
"""), XContentType.JSON)
.withMethod(RestRequest.Method.POST)
.build(),
mock(RestChannel.class),
verifyingClient
);
assertThat(bulkCalled.get(), equalTo(true));
assertThat(listExecutedPipelinesRequest1.get(), equalTo(true));
assertThat(listExecutedPipelinesRequest2.get(), equalTo(false));
Expand Down

0 comments on commit 2b3bcd5

Please sign in to comment.