Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/partial-rest-requests' into incr…
Browse files Browse the repository at this point in the history
…emental_bulk_rest
  • Loading branch information
Tim-Brooks committed Aug 28, 2024
2 parents aba3eb3 + fc4d650 commit 16cfc81
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;

public class IncrementalBulkIT extends ESIntegTestCase {
Expand All @@ -55,6 +56,14 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(IngestClientIT.ExtendedIngestTestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), "512B")
.build();
}

public void testSingleBulkRequest() {
String index = "test";
createIndex(index);
Expand All @@ -81,6 +90,71 @@ public void testSingleBulkRequest() {
assertFalse(refCounted.hasReferences());
}

public void testIndexingPressureRejection() {
String index = "test";
createIndex(index);

String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);

try (Releasable r = indexingPressure.markCoordinatingOperationStarted(1, indexingPressure.stats().getMemoryLimit(), true)) {
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});

if (randomBoolean()) {
AtomicBoolean nextPage = new AtomicBoolean(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
}

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);

expectThrows(EsRejectedExecutionException.class, future::actionGet);
assertFalse(refCounted.hasReferences());
}
}

public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
String index = "test";
createIndex(index);

String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);

IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();

AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
AtomicBoolean nextPage = new AtomicBoolean(false);

IndexRequest indexRequest = indexRequest(index);
long total = indexRequest.ramBytesUsed();
while (total < 512) {
refCounted.incRef();
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
nextPage.set(false);
indexRequest = indexRequest(index);
total += indexRequest.ramBytesUsed();
}

assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L));
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));

assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);

BulkResponse bulkResponse = future.actionGet();
assertNoFailures(bulkResponse);
assertFalse(refCounted.hasReferences());
}

public void testMultipleBulkPartsWithBackoff() {
ExecutorService executorService = Executors.newFixedThreadPool(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ private void completeBulkOperation() {
responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos),
BulkResponse.NO_INGEST_TOOK,
new BulkRequest.IncrementalState(shortCircuitShardFailures)
new BulkRequest.IncrementalState(shortCircuitShardFailures, bulkRequest.incrementalState().indexingPressureAccounted())
)
);
// Allow memory for bulk shard request items to be reclaimed before all items have been completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,12 @@ public boolean isSimulated() {
return false; // Always false, but may be overridden by a subclass
}

record IncrementalState(Map<ShardId, Exception> shardLevelFailures) implements Writeable {
record IncrementalState(Map<ShardId, Exception> shardLevelFailures, boolean indexingPressureAccounted) implements Writeable {

static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap());
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap(), false);

IncrementalState(StreamInput in) throws IOException {
this(in.readMap(ShardId::new, input -> input.readException()));
this(in.readMap(ShardId::new, input -> input.readException()), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,55 @@

package org.elasticsearch.action.bulk;

import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.rest.action.document.RestBulkAction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class IncrementalBulkService {

private final Client client;
private final IndexingPressure indexingPressure;
private final ThreadContext threadContext;
private final Supplier<Boolean> enabled;

public IncrementalBulkService(Client client, ThreadContext threadContext) {
this.client = client;
this.threadContext = threadContext;
this.enabled = new Enabled();
public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) {
this(client, indexingPressure, threadContext, new Enabled());
}

public IncrementalBulkService(Client client, ThreadContext threadContext, ClusterSettings clusterSettings) {
this(client, threadContext, new Enabled(clusterSettings));
public IncrementalBulkService(
Client client,
IndexingPressure indexingPressure,
ThreadContext threadContext,
ClusterSettings clusterSettings
) {
this(client, indexingPressure, threadContext, new Enabled(clusterSettings));
}

public IncrementalBulkService(Client client, ThreadContext threadContext, Supplier<Boolean> enabled) {
public IncrementalBulkService(
Client client,
IndexingPressure indexingPressure,
ThreadContext threadContext,
Supplier<Boolean> enabled
) {
this.client = client;
this.indexingPressure = indexingPressure;
this.threadContext = threadContext;
this.enabled = enabled;
}
Expand All @@ -56,7 +70,15 @@ public Handler newBulkRequest() {
}

public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
return new Handler(client, threadContext, threadContext.newStoredContext(), waitForActiveShards, timeout, refresh);
return new Handler(
client,
threadContext,
threadContext.newStoredContext(),
indexingPressure,
waitForActiveShards,
timeout,
refresh
);
}

public static class Enabled implements Supplier<Boolean> {
Expand All @@ -78,9 +100,12 @@ public Boolean get() {

public static class Handler implements Releasable {

public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);

private final Client client;
private final ThreadContext threadContext;
private final ThreadContext.StoredContext requestContext;
private final IndexingPressure indexingPressure;
private final ActiveShardCount waitForActiveShards;
private final TimeValue timeout;
private final String refresh;
Expand All @@ -96,17 +121,19 @@ protected Handler(
Client client,
ThreadContext threadContext,
ThreadContext.StoredContext requestContext,
IndexingPressure indexingPressure,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
) {
this.client = client;
this.threadContext = threadContext;
this.requestContext = requestContext;
this.indexingPressure = indexingPressure;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
this.refresh = refresh;
createNewBulkRequest(BulkRequest.IncrementalState.EMPTY);
createNewBulkRequest(EMPTY_STATE);
}

public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
Expand All @@ -115,28 +142,31 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
nextItems.run();
} else {
assert bulkRequest != null;
internalAddItems(items, releasable);

if (shouldBackOff()) {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;

try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
requestContext.restore();
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {

@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(bulkResponse.getIncrementalState());
}

@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems));
if (internalAddItems(items, releasable)) {
if (shouldBackOff()) {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
requestContext.restore();
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {

@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
);
}

@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems));
}
} else {
nextItems.run();
}
} else {
nextItems.run();
Expand All @@ -145,8 +175,7 @@ public void onFailure(Exception e) {
}

private boolean shouldBackOff() {
// TODO: Implement Real Memory Logic
return bulkRequest.requests().size() >= 16;
return indexingPressure.shouldSplitBulks();
}

public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
Expand All @@ -155,27 +184,30 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
errorResponse(listener);
} else {
assert bulkRequest != null;
internalAddItems(items, releasable);
if (internalAddItems(items, releasable)) {

try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
requestContext.restore();
client.bulk(bulkRequest, new ActionListener<>() {
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
requestContext.restore();
client.bulk(bulkRequest, new ActionListener<>() {

private final boolean isFirstRequest = incrementalRequestSubmitted == false;
private final boolean isFirstRequest = incrementalRequestSubmitted == false;

@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}

@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
}
} else {
errorResponse(listener);
}
}
}
Expand Down Expand Up @@ -216,9 +248,22 @@ private void addItemLevelFailures(List<DocWriteRequest<?>> items) {
responses.add(new BulkResponse(bulkItemResponses, 0, 0));
}

private void internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
bulkRequest.add(items);
releasables.add(releasable);
private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
try {
bulkRequest.add(items);
releasables.add(releasable);
releasables.add(
indexingPressure.markCoordinatingOperationStarted(
items.size(),
items.stream().mapToLong(Accountable::ramBytesUsed).sum(),
false
)
);
return true;
} catch (EsRejectedExecutionException e) {
handleBulkFailure(incrementalRequestSubmitted == false, e);
return false;
}
}

private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
clusterService.state().metadata().getIndicesLookup(),
systemIndices
);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final Releasable releasable;
if (bulkRequest.incrementalState().indexingPressureAccounted()) {
releasable = () -> {};
} else {
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
}
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ public void apply(Settings value, Settings current, Settings previous) {
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
IndexingPressure.MAX_INDEXING_BYTES,
IndexingPressure.SPLIT_BULK_THRESHOLD,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING,
Expand Down
Loading

0 comments on commit 16cfc81

Please sign in to comment.