Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Sep 10, 2024
1 parent 46b3ece commit 2ea1c82
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,11 @@ public void testEmptyContent() throws Exception {
assertTrue(recvChunk.isLast);
assertEquals(0, recvChunk.chunk.length());
recvChunk.chunk.close();
assertFalse(handler.streamClosed);

// send response to process following request
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
assertBusy(() -> assertTrue(handler.streamClosed));
}
assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size()));
}
Expand Down Expand Up @@ -145,14 +147,16 @@ public void testReceiveAllChunks() throws Exception {
}
}

assertFalse(handler.streamClosed);
assertEquals("sent and received payloads are not the same", sendData, recvData);
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
assertBusy(() -> assertTrue(handler.streamClosed));
}
assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size()));
}
}

// ensures that all received chunks are released when connection closed
// ensures that all received chunks are released when connection closed and handler notified
public void testClientConnectionCloseMidStream() throws Exception {
try (var ctx = setupClientCtx()) {
var opaqueId = opaqueId(0);
Expand All @@ -167,10 +171,14 @@ public void testClientConnectionCloseMidStream() throws Exception {

// enable auto-read to receive channel close event
handler.stream.channel().config().setAutoRead(true);
assertFalse(handler.streamClosed);

// terminate connection and wait resources are released
ctx.clientChannel.close();
assertBusy(() -> assertNull(handler.stream.buf()));
assertBusy(() -> {
assertNull(handler.stream.buf());
assertTrue(handler.streamClosed);
});
}
}

Expand All @@ -186,10 +194,14 @@ public void testServerCloseConnectionMidStream() throws Exception {
// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotNull(handler.stream.buf()));
assertFalse(handler.streamClosed);

// terminate connection on server and wait resources are released
handler.channel.request().getHttpChannel().close();
assertBusy(() -> assertNull(handler.stream.buf()));
assertBusy(() -> {
assertNull(handler.stream.buf());
assertTrue(handler.streamClosed);
});
}
}

Expand Down Expand Up @@ -470,6 +482,7 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon
final Netty4HttpRequestBodyStream stream;
RestChannel channel;
boolean recvLast = false;
volatile boolean streamClosed = false;

ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
this.opaqueId = opaqueId;
Expand All @@ -487,6 +500,11 @@ public void accept(RestChannel channel) throws Exception {
channelAccepted.onResponse(null);
}

@Override
public void streamClose() {
streamClosed = true;
}

void sendResponse(RestResponse response) {
channel.sendResponse(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,29 @@ public void testSingleBulkRequest() {
assertFalse(refCounted.hasReferences());
}

public void testBufferedResourcesReleasedOnClose() {
String index = "test";
createIndex(index);

String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);

IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
IndexRequest indexRequest = indexRequest(index);

AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> {});

assertTrue(refCounted.hasReferences());
assertThat(indexingPressure.stats().getCurrentCoordinatingBytes(), greaterThan(0L));

handler.close();

assertFalse(refCounted.hasReferences());
assertThat(indexingPressure.stats().getCurrentCoordinatingBytes(), equalTo(0L));
}

public void testIndexingPressureRejection() {
String index = "test";
createIndex(index);
Expand Down
15 changes: 13 additions & 2 deletions server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.rest.action.admin.cluster.RestNodesUsageAction;

Expand Down Expand Up @@ -126,7 +127,17 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl
if (request.isStreamedContent()) {
assert action instanceof RequestBodyChunkConsumer;
var chunkConsumer = (RequestBodyChunkConsumer) action;
request.contentStream().setHandler((chunk, isLast) -> chunkConsumer.handleChunk(channel, chunk, isLast));
request.contentStream().setHandler(new HttpBody.ChunkHandler() {
@Override
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
chunkConsumer.handleChunk(channel, chunk, isLast);
}

@Override
public void close() {
chunkConsumer.streamClose();
}
});
}

usageCount.increment();
Expand Down Expand Up @@ -192,7 +203,7 @@ public interface RequestBodyChunkConsumer extends RestChannelConsumer {
/**
* Called when the stream closes. This could happen prior to the completion of the request if the underlying channel was closed.
* Implementors should do their best to clean up resources and early terminate request processing if it is triggered before a
* response.
* response is generated.
*/
default void streamClose() {}
}
Expand Down

0 comments on commit 2ea1c82

Please sign in to comment.