From 4b3aab502d88eeb2745956801ebe9a9c787ac174 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 28 Aug 2024 15:30:07 -0600 Subject: [PATCH] Fix --- .../http/netty4/Netty4HttpAggregator.java | 6 +--- .../netty4/Netty4HttpServerTransport.java | 2 +- .../http/IncrementalBulkRestIT.java | 33 +++++++++++++++---- .../action/bulk/IncrementalBulkService.java | 16 ++++++--- .../rest/action/document/RestBulkAction.java | 10 +++--- .../action/document/RestBulkActionTests.java | 23 +++++++------ 6 files changed, 57 insertions(+), 33 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java index 031e803737ee8..d7c5fc0963224 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -35,10 +35,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator { private boolean aggregating = true; private boolean ignoreContentAfterContinueResponse = false; - public Netty4HttpAggregator(int maxContentLength) { - this(maxContentLength, IGNORE_TEST); - } - public Netty4HttpAggregator(int maxContentLength, Predicate decider) { super(maxContentLength); this.decider = decider; @@ -49,7 +45,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception assert msg instanceof HttpObject; if (msg instanceof HttpRequest request) { var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request); - aggregating = decider.test(preReq); + aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq); } if (aggregating || msg instanceof FullHttpRequest) { super.channelRead(ctx, msg); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 8dc0169fcf630..8d9f132028578 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -373,7 +373,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { // combines the HTTP message pieces into a single full HTTP request (with headers and body) final HttpObjectAggregator aggregator = new Netty4HttpAggregator( handlingSettings.maxContentLength(), - httpPreRequest -> enabled.incrementalBulkEnabled() && httpPreRequest.uri().contains("_bulk") == false + httpPreRequest -> enabled.get() && httpPreRequest.uri().contains("_bulk") == false ); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index e530080863cc6..fcb0c1044d22f 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -41,28 +41,49 @@ public void testIncrementalBulk() throws IOException { final Response indexCreatedResponse = getRestClient().performRequest(createRequest); assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); - Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk"); + Request firstBulkRequest = new Request("POST", "/index_name/_bulk"); + + // index documents for the rollup job + String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + + "{\"field\":1}\n" + + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n" + + "{\"field\":1}\n" + + "\r\n"; + + firstBulkRequest.setJsonEntity(bulkBody); + + final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request bulkRequest = new Request("POST", "/index_name/_bulk"); // index documents for the rollup job final StringBuilder bulk = new StringBuilder(); + bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"); + int updates = 0; for (int i = 0; i < 1000; i++) { bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n"); bulk.append("{\"field\":").append(i).append("}\n"); + if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) { + ++updates; + bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"); + bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n"); + } } bulk.append("\r\n"); - successfulIndexingRequest.setJsonEntity(bulk.toString()); + bulkRequest.setJsonEntity(bulk.toString()); - final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest); - assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + final Response bulkResponse = getRestClient().performRequest(bulkRequest); + assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); Map responseMap = XContentHelper.convertToMap( JsonXContent.jsonXContent, - indexSuccessFul.getEntity().getContent(), + bulkResponse.getEntity().getContent(), true ); assertFalse((Boolean) responseMap.get("errors")); - assertThat(((List) responseMap.get("items")).size(), equalTo(1000)); + assertThat(((List) responseMap.get("items")).size(), equalTo(1001 + updates)); } public void testIncrementalMalformed() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java index fe686c47d96f6..2eb681f012576 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java @@ -23,12 +23,13 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; public class IncrementalBulkService { private final Client client; private final ThreadContext threadContext; - private final Enabled enabled; + private final Supplier enabled; public IncrementalBulkService(Client client, ThreadContext threadContext) { this.client = client; @@ -37,13 +38,17 @@ public IncrementalBulkService(Client client, ThreadContext threadContext) { } public IncrementalBulkService(Client client, ThreadContext threadContext, ClusterSettings clusterSettings) { + this(client, threadContext, new Enabled(clusterSettings)); + } + + public IncrementalBulkService(Client client, ThreadContext threadContext, Supplier enabled) { this.client = client; this.threadContext = threadContext; - this.enabled = new Enabled(clusterSettings); + this.enabled = enabled; } public boolean incrementalBulkEnabled() { - return enabled.incrementalBulkEnabled(); + return enabled.get(); } public Handler newBulkRequest() { @@ -54,7 +59,7 @@ public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable Ti return new Handler(client, threadContext, threadContext.newStoredContext(), waitForActiveShards, timeout, refresh); } - public static class Enabled { + public static class Enabled implements Supplier { private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true); @@ -65,7 +70,8 @@ public Enabled(ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set); } - public boolean incrementalBulkEnabled() { + @Override + public Boolean get() { return incrementalBulksEnabled.get(); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 8afdc3d7ab3a5..cddd595996530 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -234,15 +234,13 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo if (isLast) { assert unParsedChunks.isEmpty(); assert channel != null; - handler.lastItems( - new ArrayList<>(items), - () -> Releasables.close(releasables), - new RestRefCountedChunkedToXContentListener<>(channel) - ); + ArrayList> toPass = new ArrayList<>(items); items.clear(); + handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); } else if (items.isEmpty() == false) { - handler.addItems(new ArrayList<>(items), () -> Releasables.close(releasables), () -> request.contentStream().next()); + ArrayList> toPass = new ArrayList<>(items); items.clear(); + handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next()); } else { assert releasables.isEmpty(); request.contentStream().next(); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index c6140cadfd375..5a6629a2bb0ea 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -64,8 +64,8 @@ public void bulk(BulkRequest request, ActionListener listener) { final Map params = new HashMap<>(); params.put("pipeline", "timestamps"); new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray(""" {"index":{"_id":"1"}} @@ -99,8 +99,8 @@ public void bulk(BulkRequest request, ActionListener listener) { Map params = new HashMap<>(); { new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -123,8 +123,8 @@ public void bulk(BulkRequest request, ActionListener listener) { params.put("list_executed_pipelines", "true"); bulkCalled.set(false); new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -146,8 +146,8 @@ public void bulk(BulkRequest request, ActionListener listener) { { bulkCalled.set(false); new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -170,8 +170,8 @@ public void bulk(BulkRequest request, ActionListener listener) { params.remove("list_executed_pipelines"); bulkCalled.set(false); new RestBulkAction( - settings(IndexVersion.current()).put(RestBulkAction.INCREMENTAL_BULK.getKey(), false).build(), - new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY)) + settings(IndexVersion.current()).build(), + new IncrementalBulkService(mock(Client.class), new ThreadContext(Settings.EMPTY), () -> false) ).handleRequest( new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withParams(params) @@ -201,6 +201,9 @@ public void testIncrementalParsing() { FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withMethod(RestRequest.Method.POST) .withBody(new HttpBody.Stream() { + @Override + public void close() {} + @Override public ChunkHandler handler() { return null;