Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Aug 29, 2024
1 parent 4c1b20c commit 321377a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,7 @@ public Handler newBulkRequest() {
}

public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
return new Handler(
client,
threadContext,
threadContext.newStoredContext(),
indexingPressure,
waitForActiveShards,
timeout,
refresh
);
return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh);
}

public static class Enabled implements Supplier<Boolean> {
Expand All @@ -104,7 +96,6 @@ public static class Handler implements Releasable {

private final Client client;
private final ThreadContext threadContext;
private final ThreadContext.StoredContext requestContext;
private final IndexingPressure indexingPressure;
private final ActiveShardCount waitForActiveShards;
private final TimeValue timeout;
Expand All @@ -114,21 +105,21 @@ public static class Handler implements Releasable {
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
private boolean globalFailure = false;
private boolean incrementalRequestSubmitted = false;
private ThreadContext.StoredContext requestContext;
private Exception bulkActionLevelFailure = null;
private BulkRequest bulkRequest = null;

protected Handler(
Client client,
ThreadContext threadContext,
ThreadContext.StoredContext requestContext,
IndexingPressure indexingPressure,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
) {
this.client = client;
this.threadContext = threadContext;
this.requestContext = requestContext;
this.requestContext = threadContext.newStoredContext();
this.indexingPressure = indexingPressure;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
Expand Down Expand Up @@ -163,7 +154,10 @@ public void onResponse(BulkResponse bulkResponse) {
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems));
}, () -> {
requestContext = threadContext.newStoredContext();
nextItems.run();
}));
}
} else {
nextItems.run();
Expand All @@ -185,7 +179,6 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
} else {
assert bulkRequest != null;
if (internalAddItems(items, releasable)) {

try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
requestContext.restore();
client.bulk(bulkRequest, new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -38,6 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand Down Expand Up @@ -128,15 +130,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
request.param("type");
}

return new ChunkHandler(
allowExplicitIndex,
request,
bulkHandler.newBulkRequest(
request.param("wait_for_active_shards"),
request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT),
request.param("refresh")
)
);
String waitForActiveShards = request.param("wait_for_active_shards");
TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
String refresh = request.param("refresh");
return new ChunkHandler(allowExplicitIndex, request, () -> bulkHandler.newBulkRequest(waitForActiveShards, timeout, refresh));
}
}

Expand All @@ -154,14 +151,15 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
private final Boolean defaultRequireAlias;
private final boolean defaultRequireDataStream;
private final BulkRequestParser parser;
private final IncrementalBulkService.Handler handler;
private final Supplier<IncrementalBulkService.Handler> handlerSupplier;
private IncrementalBulkService.Handler handler;

private volatile RestChannel restChannel;
private boolean isException;
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);

ChunkHandler(boolean allowExplicitIndex, RestRequest request, IncrementalBulkService.Handler handler) {
ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
this.allowExplicitIndex = allowExplicitIndex;
this.request = request;
this.defaultIndex = request.param("index");
Expand All @@ -171,18 +169,21 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
this.parser = new BulkRequestParser(true, request.getRestApiVersion());
this.handler = handler;
// TODO: Fix type deprecation logging
this.parser = new BulkRequestParser(false, request.getRestApiVersion());
this.handlerSupplier = handlerSupplier;
}

@Override
public void accept(RestChannel restChannel) {
this.restChannel = restChannel;
this.handler = handlerSupplier.get();
request.contentStream().next();
}

@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
assert handler != null;
assert channel == restChannel;
if (isException) {
chunk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void next() {
RestBulkAction.ChunkHandler chunkHandler = new RestBulkAction.ChunkHandler(
true,
request,
new IncrementalBulkService.Handler(null, null, null, null, null, null, null) {
() -> new IncrementalBulkService.Handler(null, null, null, null, null, null) {

@Override
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
Expand Down

0 comments on commit 321377a

Please sign in to comment.