Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Aug 28, 2024
1 parent c007b5a commit 74d69cf
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.ThreadWatchdog;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final TLSConfig tlsConfig;
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
private final HttpValidator httpValidator;
private final IncrementalBulkService.Enabled enabled;
private final ThreadWatchdog threadWatchdog;
private final int readTimeoutMillis;

Expand Down Expand Up @@ -134,6 +136,7 @@ public Netty4HttpServerTransport(
this.acceptChannelPredicate = acceptChannelPredicate;
this.httpValidator = httpValidator;
this.threadWatchdog = networkService.getThreadWatchdog();
this.enabled = new IncrementalBulkService.Enabled(clusterSettings);

this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);

Expand Down Expand Up @@ -279,7 +282,7 @@ public void onException(HttpChannel channel, Exception cause) {
}

public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator);
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled);
}

static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
Expand All @@ -292,19 +295,22 @@ protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
private final TLSConfig tlsConfig;
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
private final HttpValidator httpValidator;
private final IncrementalBulkService.Enabled enabled;

protected HttpChannelHandler(
final Netty4HttpServerTransport transport,
final HttpHandlingSettings handlingSettings,
final TLSConfig tlsConfig,
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
@Nullable final HttpValidator httpValidator
@Nullable final HttpValidator httpValidator,
IncrementalBulkService.Enabled enabled
) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.tlsConfig = tlsConfig;
this.acceptChannelPredicate = acceptChannelPredicate;
this.httpValidator = httpValidator;
this.enabled = enabled;
}

@Override
Expand Down Expand Up @@ -367,7 +373,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(
handlingSettings.maxContentLength(),
httpPreRequest -> httpPreRequest.uri().contains("_bulk") == false
httpPreRequest -> enabled.incrementalBulkEnabled() && httpPreRequest.uri().contains("_bulk") == false
);
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
Expand Down Expand Up @@ -418,7 +419,8 @@ public ChannelHandler configureServerChannelHandler() {
handlingSettings,
TLSConfig.noTLS(),
null,
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null),
new IncrementalBulkService.Enabled(clusterSettings)
) {
@Override
protected void initChannel(Channel ch) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,38 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.action.document.RestBulkAction;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class IncrementalBulkService {

private final Client client;
private final ThreadContext threadContext;
private final Enabled enabled;

public IncrementalBulkService(Client client, ThreadContext threadContext) {
this.client = client;
this.threadContext = threadContext;
this.enabled = new Enabled();
}

public IncrementalBulkService(Client client, ThreadContext threadContext, ClusterSettings clusterSettings) {
this.client = client;
this.threadContext = threadContext;
this.enabled = new Enabled(clusterSettings);
}

public boolean incrementalBulkEnabled() {
return enabled.incrementalBulkEnabled();
}

public Handler newBulkRequest() {
Expand All @@ -39,6 +54,22 @@ public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable Ti
return new Handler(client, threadContext, threadContext.newStoredContext(), waitForActiveShards, timeout, refresh);
}

public static class Enabled {

private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true);

public Enabled() {}

public Enabled(ClusterSettings clusterSettings) {
incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK));
clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set);
}

public boolean incrementalBulkEnabled() {
return incrementalBulksEnabled.get();
}
}

public static class Handler implements Releasable {

private final Client client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,11 @@ record PluginServiceInstances(
terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);

final IndexingPressure indexingLimits = new IndexingPressure(settings);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, threadPool.getThreadContext());
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
client,
threadPool.getThreadContext(),
clusterService.getClusterSettings()
);

ActionModule actionModule = new ActionModule(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ public class RestBulkAction extends BaseRestHandler {
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting("rest.incremental_bulk", true, Setting.Property.NodeScope);

private final boolean allowExplicitIndex;
private final boolean incrementalBulk;
private final IncrementalBulkService bulkHandler;

public RestBulkAction(Settings settings, IncrementalBulkService bulkHandler) {
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
this.bulkHandler = bulkHandler;
this.incrementalBulk = INCREMENTAL_BULK.get(settings);
}

@Override
Expand All @@ -87,7 +85,7 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (incrementalBulk == false) {
if (bulkHandler.incrementalBulkEnabled() == false) {
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
Expand Down

0 comments on commit 74d69cf

Please sign in to comment.