Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/partial-rest-requests' into incr…
Browse files Browse the repository at this point in the history
…emental_bulk_rest
  • Loading branch information
Tim-Brooks committed Aug 28, 2024
2 parents b303cc7 + 5757e00 commit 0b9362e
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,29 @@ public void testOversizedChunkedEncodingNoLimits() throws Exception {
}
}

// ensures that we dont leak buffers in stream on 400-bad-request
// some bad requests are dispatched from rest-controller before reaching rest handler
// test relies on netty's buffer leak detection
public void testBadRequestReleaseQueuedChunks() throws Exception {
try (var ctx = setupClientCtx()) {
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
var req = httpRequest(id, contentSize);
var content = randomContent(contentSize, true);

// set unacceptable content-type
req.headers().set(CONTENT_TYPE, "unknown");
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(content);

var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.BAD_REQUEST, resp.status());
resp.release();
}
}
}

private int maxContentLength() {
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
}
Expand Down Expand Up @@ -513,6 +536,11 @@ public Collection<RestHandler> getRestHandlers(
Predicate<NodeFeature> clusterSupportsFeature
) {
return List.of(new BaseRestHandler() {
@Override
public boolean allowsUnsafeBuffers() {
return true;
}

@Override
public String getName() {
return ROUTE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class Netty4HttpRequest implements HttpRequest {
EmptyHttpHeaders.INSTANCE
),
new AtomicBoolean(false),
false,
true,
contentStream,
null
);
Expand Down Expand Up @@ -115,6 +115,7 @@ public HttpBody body() {
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
content.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private final Queue<HttpContent> chunkQueue = new ArrayDeque<>();
private boolean requested = false;
private boolean hasLast = false;
private boolean closing = false;
private HttpBody.ChunkHandler handler;

public Netty4HttpRequestBodyStream(Channel channel) {
this.channel = channel;
channel.closeFuture().addListener((f) -> releaseQueuedChunks());
channel.closeFuture().addListener((f) -> doClose());
channel.config().setAutoRead(false);
}

Expand Down Expand Up @@ -70,6 +71,10 @@ public void next() {
}

public void handleNettyContent(HttpContent httpContent) {
if (closing) {
httpContent.release();
return;
}
assert handler != null : "handler must be set before processing http content";
if (requested && chunkQueue.isEmpty()) {
sendChunk(httpContent);
Expand Down Expand Up @@ -111,4 +116,18 @@ private void releaseQueuedChunks() {
}
}

@Override
public void close() {
if (channel.eventLoop().inEventLoop()) {
doClose();
} else {
channel.eventLoop().submit(this::doClose);
}
}

private void doClose() {
closing = true;
releaseQueuedChunks();
channel.config().setAutoRead(true);
}
}
6 changes: 5 additions & 1 deletion server/src/main/java/org/elasticsearch/http/HttpBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;

/**
* A super-interface for different HTTP content implementations
*/
public sealed interface HttpBody permits HttpBody.Full, HttpBody.Stream {
public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpBody.Stream {

static Full fromBytesReference(BytesReference bytesRef) {
return new ByteRefHttpBody(bytesRef);
Expand Down Expand Up @@ -55,6 +56,9 @@ default Stream asStream() {
*/
non-sealed interface Full extends HttpBody {
BytesReference bytes();

@Override
default void close() {}
}

/**
Expand Down

0 comments on commit 0b9362e

Please sign in to comment.