Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Aug 27, 2024
1 parent 2b3bcd5 commit 1011086
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,28 @@
public class IncrementalBulkService {

private final Client client;
private final ThreadContext threadContext;

public IncrementalBulkService(Client client, ThreadContext threadContext) {
public IncrementalBulkService(Client client) {
this.client = client;
this.threadContext = threadContext;
}

public Handler newBulkRequest() {
return newBulkRequest(null, null, null);
return newBulkRequest(() -> {}, null, null, null);
}

public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
return new Handler(client, threadContext, waitForActiveShards, timeout, refresh);
public Handler newBulkRequest(
ThreadContext.StoredContext storedContext,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
) {
return new Handler(client, storedContext, waitForActiveShards, timeout, refresh);
}

public static class Handler implements Releasable {

private final Client client;
private final ThreadContext threadContext;
private final ThreadContext.StoredContext storedContext;
private final ActiveShardCount waitForActiveShards;
private final TimeValue timeout;
private final String refresh;
Expand All @@ -56,13 +59,13 @@ public static class Handler implements Releasable {

private Handler(
Client client,
ThreadContext threadContext,
ThreadContext.StoredContext storedContext,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
) {
this.client = client;
this.threadContext = threadContext;
this.storedContext = storedContext;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
this.refresh = refresh;
Expand All @@ -81,6 +84,7 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;

storedContext.restore();
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {

@Override
Expand Down Expand Up @@ -114,8 +118,7 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
assert bulkRequest != null;
internalAddItems(items, releasable);

threadContext.addResponseHeader("X-elastic-product", "Elasticsearch");

storedContext.restore();
client.bulk(bulkRequest, new ActionListener<>() {

private final boolean isFirstRequest = incrementalRequestSubmitted == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ record PluginServiceInstances(
);
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
final IndexingPressure indexingLimits = new IndexingPressure(settings);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext());
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client);

SnapshotsService snapshotsService = new SnapshotsService(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// TODO: Move this to CTOR and hook everything up
synchronized (this) {
if (bulkHandler == null) {
bulkHandler = new IncrementalBulkService(client, threadContext);
bulkHandler = new IncrementalBulkService(client);
}
}

Expand All @@ -142,6 +142,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
allowExplicitIndex,
request,
bulkHandler.newBulkRequest(
threadContext.newStoredContext(),
request.param("wait_for_active_shards"),
request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT),
request.param("refresh")
Expand Down

0 comments on commit 1011086

Please sign in to comment.