From 74d69cfb8ecf5a10ceb478d02e9fdee188d8a1cf Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 27 Aug 2024 23:08:04 -0600 Subject: [PATCH] More --- .../netty4/Netty4HttpServerTransport.java | 12 +++++-- .../Netty4HttpServerTransportTests.java | 4 ++- .../action/bulk/IncrementalBulkService.java | 31 +++++++++++++++++++ .../elasticsearch/node/NodeConstruction.java | 6 +++- .../rest/action/document/RestBulkAction.java | 4 +-- 5 files changed, 49 insertions(+), 8 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index bca6f224d9682..8dc0169fcf630 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.ThreadWatchdog; @@ -96,6 +97,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private final TLSConfig tlsConfig; private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate; private final HttpValidator httpValidator; + private final IncrementalBulkService.Enabled enabled; private final ThreadWatchdog threadWatchdog; private final int readTimeoutMillis; @@ -134,6 +136,7 @@ public Netty4HttpServerTransport( this.acceptChannelPredicate = acceptChannelPredicate; this.httpValidator = httpValidator; this.threadWatchdog = networkService.getThreadWatchdog(); + this.enabled = new IncrementalBulkService.Enabled(clusterSettings); this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings); @@ -279,7 +282,7 @@ public void onException(HttpChannel channel, Exception cause) { } public ChannelHandler configureServerChannelHandler() { - return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator); + return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled); } static final AttributeKey HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel"); @@ -292,19 +295,22 @@ protected static class HttpChannelHandler extends ChannelInitializer { private final TLSConfig tlsConfig; private final BiPredicate acceptChannelPredicate; private final HttpValidator httpValidator; + private final IncrementalBulkService.Enabled enabled; protected HttpChannelHandler( final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings, final TLSConfig tlsConfig, @Nullable final BiPredicate acceptChannelPredicate, - @Nullable final HttpValidator httpValidator + @Nullable final HttpValidator httpValidator, + IncrementalBulkService.Enabled enabled ) { this.transport = transport; this.handlingSettings = handlingSettings; this.tlsConfig = tlsConfig; this.acceptChannelPredicate = acceptChannelPredicate; this.httpValidator = httpValidator; + this.enabled = enabled; } @Override @@ -367,7 +373,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { // combines the HTTP message pieces into a single full HTTP request (with headers and body) final HttpObjectAggregator aggregator = new Netty4HttpAggregator( handlingSettings.maxContentLength(), - httpPreRequest -> httpPreRequest.uri().contains("_bulk") == false + httpPreRequest -> enabled.incrementalBulkEnabled() && httpPreRequest.uri().contains("_bulk") == false ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 9e213e6468356..ad25c35283cac 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchWrapperException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.Request; @@ -418,7 +419,8 @@ public ChannelHandler configureServerChannelHandler() { handlingSettings, TLSConfig.noTLS(), null, - randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null) + randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null), + new IncrementalBulkService.Enabled(clusterSettings) ) { @Override protected void initChannel(Channel ch) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index 4a735884fd244..fe686c47d96f6 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -12,23 +12,38 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.action.document.RestBulkAction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class IncrementalBulkService { private final Client client; private final ThreadContext threadContext; + private final Enabled enabled; public IncrementalBulkService(Client client, ThreadContext threadContext) { this.client = client; this.threadContext = threadContext; + this.enabled = new Enabled(); + } + + public IncrementalBulkService(Client client, ThreadContext threadContext, ClusterSettings clusterSettings) { + this.client = client; + this.threadContext = threadContext; + this.enabled = new Enabled(clusterSettings); + } + + public boolean incrementalBulkEnabled() { + return enabled.incrementalBulkEnabled(); } public Handler newBulkRequest() { @@ -39,6 +54,22 @@ public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable Ti return new Handler(client, threadContext, threadContext.newStoredContext(), waitForActiveShards, timeout, refresh); } + public static class Enabled { + + private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true); + + public Enabled() {} + + public Enabled(ClusterSettings clusterSettings) { + incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK)); + clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set); + } + + public boolean incrementalBulkEnabled() { + return incrementalBulksEnabled.get(); + } + } + public static class Handler implements Releasable { private final Client client; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index b9e49c8373f58..c52633aae5b97 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -877,7 +877,11 @@ record PluginServiceInstances( terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null); final IndexingPressure indexingLimits = new IndexingPressure(settings); - final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext()); + final IncrementalBulkService incrementalBulkService = new IncrementalBulkService( + client, + threadPool.getThreadContext(), + clusterService.getClusterSettings() + ); ActionModule actionModule = new ActionModule( settings, diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index b3766cd071c69..042bebfc3eb57 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -59,13 +59,11 @@ public class RestBulkAction extends BaseRestHandler { public static final Setting INCREMENTAL_BULK = boolSetting("rest.incremental_bulk", true, Setting.Property.NodeScope); private final boolean allowExplicitIndex; - private final boolean incrementalBulk; private final IncrementalBulkService bulkHandler; public RestBulkAction(Settings settings, IncrementalBulkService bulkHandler) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.bulkHandler = bulkHandler; - this.incrementalBulk = INCREMENTAL_BULK.get(settings); } @Override @@ -87,7 +85,7 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - if (incrementalBulk == false) { + if (bulkHandler.incrementalBulkEnabled() == false) { if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) { request.param("type"); }