Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Aug 28, 2024
1 parent 0b9362e commit 4b3aab5
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
private boolean aggregating = true;
private boolean ignoreContentAfterContinueResponse = false;

public Netty4HttpAggregator(int maxContentLength) {
this(maxContentLength, IGNORE_TEST);
}

public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
super(maxContentLength);
this.decider = decider;
Expand All @@ -49,7 +45,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
assert msg instanceof HttpObject;
if (msg instanceof HttpRequest request) {
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
aggregating = decider.test(preReq);
aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq);
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(
handlingSettings.maxContentLength(),
httpPreRequest -> enabled.incrementalBulkEnabled() && httpPreRequest.uri().contains("_bulk") == false
httpPreRequest -> enabled.get() && httpPreRequest.uri().contains("_bulk") == false
);
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,49 @@ public void testIncrementalBulk() throws IOException {
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk");
Request firstBulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
+ "{\"field\":1}\n"
+ "\r\n";

firstBulkRequest.setJsonEntity(bulkBody);

final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
int updates = 0;
for (int i = 0; i < 1000; i++) {
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":").append(i).append("}\n");
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
++updates;
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
}
}
bulk.append("\r\n");

successfulIndexingRequest.setJsonEntity(bulk.toString());
bulkRequest.setJsonEntity(bulk.toString());

final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
final Response bulkResponse = getRestClient().performRequest(bulkRequest);
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Map<String, Object> responseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
indexSuccessFul.getEntity().getContent(),
bulkResponse.getEntity().getContent(),
true
);

assertFalse((Boolean) responseMap.get("errors"));
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1000));
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
}

public void testIncrementalMalformed() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class IncrementalBulkService {

private final Client client;
private final ThreadContext threadContext;
private final Enabled enabled;
private final Supplier<Boolean> enabled;

public IncrementalBulkService(Client client, ThreadContext threadContext) {
this.client = client;
Expand All @@ -37,13 +38,17 @@ public IncrementalBulkService(Client client, ThreadContext threadContext) {
}

public IncrementalBulkService(Client client, ThreadContext threadContext, ClusterSettings clusterSettings) {
this(client, threadContext, new Enabled(clusterSettings));
}

public IncrementalBulkService(Client client, ThreadContext threadContext, Supplier<Boolean> enabled) {
this.client = client;
this.threadContext = threadContext;
this.enabled = new Enabled(clusterSettings);
this.enabled = enabled;
}

public boolean incrementalBulkEnabled() {
return enabled.incrementalBulkEnabled();
return enabled.get();
}

public Handler newBulkRequest() {
Expand All @@ -54,7 +59,7 @@ public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable Ti
return new Handler(client, threadContext, threadContext.newStoredContext(), waitForActiveShards, timeout, refresh);
}

public static class Enabled {
public static class Enabled implements Supplier<Boolean> {

private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true);

Expand All @@ -65,7 +70,8 @@ public Enabled(ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set);
}

public boolean incrementalBulkEnabled() {
@Override
public Boolean get() {
return incrementalBulksEnabled.get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,13 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
if (isLast) {
assert unParsedChunks.isEmpty();
assert channel != null;
handler.lastItems(
new ArrayList<>(items),
() -> Releasables.close(releasables),
new RestRefCountedChunkedToXContentListener<>(channel)
);
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
} else if (items.isEmpty() == false) {
handler.addItems(new ArrayList<>(items), () -> Releasables.close(releasables), () -> request.contentStream().next());
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
} else {
assert releasables.isEmpty();
request.contentStream().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
final Map<String, String> params = new HashMap<>();
params.put("pipeline", "timestamps");
new RestBulkAction(
settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY))
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
{"index":{"_id":"1"}}
Expand Down Expand Up @@ -99,8 +99,8 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
Map<String, String> params = new HashMap<>();
{
new RestBulkAction(
settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY))
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -123,8 +123,8 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
params.put("list_executed_pipelines", "true");
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY))
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -146,8 +146,8 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
{
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY))
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -170,8 +170,8 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
params.remove("list_executed_pipelines");
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY))
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand Down Expand Up @@ -201,6 +201,9 @@ public void testIncrementalParsing() {
FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withMethod(RestRequest.Method.POST)
.withBody(new HttpBody.Stream() {
@Override
public void close() {}

@Override
public ChunkHandler handler() {
return null;
Expand Down

0 comments on commit 4b3aab5

Please sign in to comment.