From 101108676a5be32677833cd07128fca2936fa383 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 16:59:13 -0600 Subject: [PATCH] Changes --- .../action/bulk/IncrementalBulkService.java | 25 +++++++++++-------- .../elasticsearch/node/NodeConstruction.java | 2 +- .../rest/action/document/RestBulkAction.java | 3 ++- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index d2c64e62e4e00..28c92e5bcd5c4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -24,25 +24,28 @@ public class IncrementalBulkService { private final Client client; - private final ThreadContext threadContext; - public IncrementalBulkService(Client client, ThreadContext threadContext) { + public IncrementalBulkService(Client client) { this.client = client; - this.threadContext = threadContext; } public Handler newBulkRequest() { - return newBulkRequest(null, null, null); + return newBulkRequest(() -> {}, null, null, null); } - public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) { - return new Handler(client, threadContext, waitForActiveShards, timeout, refresh); + public Handler newBulkRequest( + ThreadContext.StoredContext storedContext, + @Nullable String waitForActiveShards, + @Nullable TimeValue timeout, + @Nullable String refresh + ) { + return new Handler(client, storedContext, waitForActiveShards, timeout, refresh); } public static class Handler implements Releasable { private final Client client; - private final ThreadContext threadContext; + private final ThreadContext.StoredContext storedContext; private final ActiveShardCount waitForActiveShards; private final TimeValue timeout; private final String refresh; @@ -56,13 +59,13 @@ public static class Handler implements Releasable { private Handler( Client client, - ThreadContext threadContext, + ThreadContext.StoredContext storedContext, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh ) { this.client = client; - this.threadContext = threadContext; + this.storedContext = storedContext; this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null; this.timeout = timeout; this.refresh = refresh; @@ -81,6 +84,7 @@ public void addItems(List> items, Releasable releasable, Runn final boolean isFirstRequest = incrementalRequestSubmitted == false; incrementalRequestSubmitted = true; + storedContext.restore(); client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() { @Override @@ -114,8 +118,7 @@ public void lastItems(List> items, Releasable releasable, Act assert bulkRequest != null; internalAddItems(items, releasable); - threadContext.addResponseHeader("X-elastic-product", "Elasticsearch"); - + storedContext.restore(); client.bulk(bulkRequest, new ActionListener<>() { private final boolean isFirstRequest = incrementalRequestSubmitted == false; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index b59d1d23fea92..2f76daa7e42ed 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -965,7 +965,7 @@ record PluginServiceInstances( ); final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule); final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext()); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client); SnapshotsService snapshotsService = new SnapshotsService( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index fc0dda1b45fe1..ee923daa6b877 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -130,7 +130,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // TODO: Move this to CTOR and hook everything up synchronized (this) { if (bulkHandler == null) { - bulkHandler = new IncrementalBulkService(client, threadContext); + bulkHandler = new IncrementalBulkService(client); } } @@ -142,6 +142,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC allowExplicitIndex, request, bulkHandler.newBulkRequest( + threadContext.newStoredContext(), request.param("wait_for_active_shards"), request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT), request.param("refresh")