Skip to content

Commit

Permalink
Ensure partial bulks released if channel closes
Browse files Browse the repository at this point in the history
Currently, the entire close pipeline is not hooked up in case of a
channel close while a request is being buffered or executed. This commit
resolves the issue by adding a connection to a stream closure.
  • Loading branch information
Tim-Brooks committed Sep 10, 2024
1 parent 2caf43c commit 46b3ece
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public void close() {

private void doClose() {
closing = true;
if (handler != null) {
handler.close();
}
if (buf != null) {
buf.release();
buf = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public static class Handler implements Releasable {

private final ArrayList<Releasable> releasables = new ArrayList<>(4);
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
private boolean closed = false;
private boolean globalFailure = false;
private boolean incrementalRequestSubmitted = false;
private ThreadContext.StoredContext requestContext;
Expand All @@ -126,6 +127,7 @@ protected Handler(
}

public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
assert closed == false;
if (bulkActionLevelFailure != null) {
shortCircuitDueToTopLevelFailure(items, releasable);
nextItems.run();
Expand All @@ -139,10 +141,11 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
requestContext.restore();
client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {

private final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);

@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
successResponse(bulkResponse, toRelease);
createNewBulkRequest(
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
);
Expand All @@ -151,12 +154,14 @@ public void onResponse(BulkResponse bulkResponse) {
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
toRelease.forEach(Releasable::close);
}
}, () -> {
requestContext = threadContext.newStoredContext();
nextItems.run();
}));
}
releasables.clear();
} else {
nextItems.run();
}
Expand All @@ -182,27 +187,36 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
client.bulk(bulkRequest, new ActionListener<>() {

private final boolean isFirstRequest = incrementalRequestSubmitted == false;
private final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);

@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
successResponse(bulkResponse, toRelease);
listener.onResponse(combineResponses());
}

@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
toRelease.forEach(Releasable::close);
errorResponse(listener);
}
});
}
releasables.clear();
} else {
errorResponse(listener);
}
}
}

@Override
public void close() {
closed = true;
releasables.forEach(Releasable::close);
releasables.clear();
}

private void shortCircuitDueToTopLevelFailure(List<DocWriteRequest<?>> items, Releasable releasable) {
assert releasables.isEmpty();
assert bulkRequest == null;
Expand All @@ -212,6 +226,12 @@ private void shortCircuitDueToTopLevelFailure(List<DocWriteRequest<?>> items, Re
Releasables.close(releasable);
}

private void successResponse(BulkResponse bulkResponse, ArrayList<Releasable> toRelease) {
responses.add(bulkResponse);
toRelease.forEach(Releasable::close);
bulkRequest = null;
}

private void errorResponse(ActionListener<BulkResponse> listener) {
if (globalFailure) {
listener.onFailure(bulkActionLevelFailure);
Expand All @@ -225,7 +245,7 @@ private void handleBulkFailure(boolean isFirstRequest, Exception e) {
globalFailure = isFirstRequest;
bulkActionLevelFailure = e;
addItemLevelFailures(bulkRequest.requests());
releaseCurrentReferences();
bulkRequest = null;
}

private void addItemLevelFailures(List<DocWriteRequest<?>> items) {
Expand Down Expand Up @@ -297,10 +317,5 @@ private BulkResponse combineResponses() {

return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
}

@Override
public void close() {
// TODO: Implement
}
}
}
5 changes: 4 additions & 1 deletion server/src/main/java/org/elasticsearch/http/HttpBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ non-sealed interface Stream extends HttpBody {
}

@FunctionalInterface
interface ChunkHandler {
interface ChunkHandler extends Releasable {
void onNext(ReleasableBytesReference chunk, boolean isLast);

@Override
default void close() {}
}

record ByteRefHttpBody(BytesReference bytes) implements Full {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ default void close() {}

public interface RequestBodyChunkConsumer extends RestChannelConsumer {
void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast);

/**
* Called when the stream closes. This could happen prior to the completion of the request if the underlying channel was closed.
* Implementors should do their best to clean up resources and early terminate request processing if it is triggered before a
* response.
*/
default void streamClose() {}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.transport.Transports;

import java.io.IOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -147,7 +148,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
private IncrementalBulkService.Handler handler;

private volatile RestChannel restChannel;
private boolean isException;
private boolean shortCircuited;
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);

Expand Down Expand Up @@ -175,9 +176,10 @@ public void accept(RestChannel restChannel) {

@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
assert Transports.assertTransportThread();
assert handler != null;
assert channel == restChannel;
if (isException) {
if (shortCircuited) {
chunk.close();
return;
}
Expand Down Expand Up @@ -214,12 +216,8 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
);

} catch (Exception e) {
// TODO: This needs to be better
Releasables.close(handler);
Releasables.close(unParsedChunks);
unParsedChunks.clear();
shortCircuit();
new RestToXContentListener<>(channel).onFailure(e);
isException = true;
return;
}

Expand All @@ -241,8 +239,16 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
}

@Override
public void close() {
RequestBodyChunkConsumer.super.close();
public void streamClose() {
assert Transports.assertTransportThread();
shortCircuit();
}

private void shortCircuit() {
shortCircuited = true;
Releasables.close(handler);
Releasables.close(unParsedChunks);
unParsedChunks.clear();
}

private ArrayList<Releasable> accountParsing(int bytesConsumed) {
Expand Down

0 comments on commit 46b3ece

Please sign in to comment.