From 5757e000d4a9cebd68289f1665f6cdf0b60dfa94 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 27 Aug 2024 10:33:57 -0700 Subject: [PATCH] release stream chunk queue on bad request (#112227) --- .../Netty4IncrementalRequestHandlingIT.java | 28 +++++++++++++++++++ .../http/netty4/Netty4HttpRequest.java | 3 +- .../netty4/Netty4HttpRequestBodyStream.java | 21 +++++++++++++- .../java/org/elasticsearch/http/HttpBody.java | 6 +++- 4 files changed, 55 insertions(+), 3 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index b3139fd336a70..363af80c2eb13 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -315,6 +315,29 @@ public void testOversizedChunkedEncodingNoLimits() throws Exception { } } + // ensures that we dont leak buffers in stream on 400-bad-request + // some bad requests are dispatched from rest-controller before reaching rest handler + // test relies on netty's buffer leak detection + public void testBadRequestReleaseQueuedChunks() throws Exception { + try (var ctx = setupClientCtx()) { + for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { + var id = opaqueId(reqNo); + var contentSize = randomIntBetween(0, maxContentLength()); + var req = httpRequest(id, contentSize); + var content = randomContent(contentSize, true); + + // set unacceptable content-type + req.headers().set(CONTENT_TYPE, "unknown"); + ctx.clientChannel.writeAndFlush(req); + ctx.clientChannel.writeAndFlush(content); + + var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue); + assertEquals(HttpResponseStatus.BAD_REQUEST, resp.status()); + resp.release(); + } + } + } + private int maxContentLength() { return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength(); } @@ -513,6 +536,11 @@ public Collection getRestHandlers( Predicate clusterSupportsFeature ) { return List.of(new BaseRestHandler() { + @Override + public boolean allowsUnsafeBuffers() { + return true; + } + @Override public String getName() { return ROUTE; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 2d1caba3c477e..56cbbe0265927 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -60,7 +60,7 @@ public class Netty4HttpRequest implements HttpRequest { EmptyHttpHeaders.INSTANCE ), new AtomicBoolean(false), - false, + true, contentStream, null ); @@ -115,6 +115,7 @@ public HttpBody body() { public void release() { if (pooled && released.compareAndSet(false, true)) { request.release(); + content.close(); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 8497e3ee8a40d..02e5139a0e3aa 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -30,11 +30,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final Queue chunkQueue = new ArrayDeque<>(); private boolean requested = false; private boolean hasLast = false; + private boolean closing = false; private HttpBody.ChunkHandler handler; public Netty4HttpRequestBodyStream(Channel channel) { this.channel = channel; - channel.closeFuture().addListener((f) -> releaseQueuedChunks()); + channel.closeFuture().addListener((f) -> doClose()); channel.config().setAutoRead(false); } @@ -70,6 +71,10 @@ public void next() { } public void handleNettyContent(HttpContent httpContent) { + if (closing) { + httpContent.release(); + return; + } assert handler != null : "handler must be set before processing http content"; if (requested && chunkQueue.isEmpty()) { sendChunk(httpContent); @@ -111,4 +116,18 @@ private void releaseQueuedChunks() { } } + @Override + public void close() { + if (channel.eventLoop().inEventLoop()) { + doClose(); + } else { + channel.eventLoop().submit(this::doClose); + } + } + + private void doClose() { + closing = true; + releaseQueuedChunks(); + channel.config().setAutoRead(true); + } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpBody.java b/server/src/main/java/org/elasticsearch/http/HttpBody.java index b4d88b837b117..9da1bc85b2a29 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpBody.java +++ b/server/src/main/java/org/elasticsearch/http/HttpBody.java @@ -12,11 +12,12 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; /** * A super-interface for different HTTP content implementations */ -public sealed interface HttpBody permits HttpBody.Full, HttpBody.Stream { +public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpBody.Stream { static Full fromBytesReference(BytesReference bytesRef) { return new ByteRefHttpBody(bytesRef); @@ -55,6 +56,9 @@ default Stream asStream() { */ non-sealed interface Full extends HttpBody { BytesReference bytes(); + + @Override + default void close() {} } /**