Skip to content

Commit

Permalink
Try_no_split
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Aug 27, 2024
1 parent fbfcbb5 commit 3c8d8c0
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,56 @@ public void testIndexingPressureStats() throws IOException {
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk");

successfulIndexingRequest.setJsonEntity("""
{ "index" : { "_index" : "index_name" } }
{ "field1" : "value1" }
{ "field" : "value1" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value2" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value3" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value4" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value5" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value6" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value7" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value8" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value9" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value10" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value11" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value12" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value13" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value14" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value15" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value16" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value17" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value18" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value19" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value20" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value21" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value22" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value23" }
{ "index" : { "_index" : "index_name" } }
{ "field" : "value24" }
""");
final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void onFailure(Exception e) {

private boolean shouldBackOff() {
// TODO: Implement Real Memory Logic
return bulkRequest.requests().size() >= 16;
return false;
}

public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -94,11 +96,20 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
request.param("type");
}

return new ChunkHandler(request);
return new ChunkHandler(
allowExplicitIndex,
request,
bulkHandler.newBulkRequest(
request.param("wait_for_active_shards"),
request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT),
request.param("refresh")
)
);
}

private class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
private static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {

private final boolean allowExplicitIndex;
private final RestRequest request;

private final Map<String, String> stringDeduplicator = new HashMap<>();
Expand All @@ -116,7 +127,8 @@ private class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);

private ChunkHandler(RestRequest request) {
private ChunkHandler(boolean allowExplicitIndex, RestRequest request, IncrementalBulkService.Handler handler) {
this.allowExplicitIndex = allowExplicitIndex;
this.request = request;
this.defaultIndex = request.param("index");
this.defaultRouting = request.param("routing");
Expand All @@ -126,11 +138,7 @@ private ChunkHandler(RestRequest request) {
this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
this.parser = new BulkRequestParser(true, request.getRestApiVersion());
handler = bulkHandler.newBulkRequest(
request.param("wait_for_active_shards"),
request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT),
request.param("refresh")
);
this.handler = handler;
}

@Override
Expand All @@ -142,19 +150,16 @@ public void accept(RestChannel restChannel) throws Exception {
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
assert channel == restChannel;

final ReleasableBytesReference data;
final BytesReference data;
final Releasable releasable;
try {
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine

unParsedChunks.add(chunk);

if (unParsedChunks.size() > 1) {
ReleasableBytesReference[] bytesReferences = unParsedChunks.toArray(new ReleasableBytesReference[0]);
data = new ReleasableBytesReference(
CompositeBytesReference.of(bytesReferences),
() -> Releasables.close(bytesReferences)
);
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
} else {
data = chunk;
}
Expand All @@ -177,7 +182,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
stringDeduplicator
);

accountParsing(bytesConsumed);
releasable = accountParsing(bytesConsumed);

} catch (IOException e) {
// TODO: Exception Handling
Expand All @@ -187,10 +192,10 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
if (isLast) {
assert unParsedChunks.isEmpty();
assert channel != null;
handler.lastItems(new ArrayList<>(items), data, new RestRefCountedChunkedToXContentListener<>(channel));
handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel));
items.clear();
} else if (items.isEmpty() == false) {
handler.addItems(new ArrayList<>(items), data, () -> request.contentStream().next());
handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next());
items.clear();
}
}
Expand All @@ -202,16 +207,20 @@ public void close() {
RequestBodyChunkConsumer.super.close();
}

private void accountParsing(int bytesConsumed) {
private Releasable accountParsing(int bytesConsumed) {
ArrayList<Releasable> releasables = new ArrayList<>(unParsedChunks.size());
while (bytesConsumed > 0) {
ReleasableBytesReference reference = unParsedChunks.removeFirst();
releasables.add(reference);
if (bytesConsumed >= reference.length()) {
bytesConsumed -= reference.length();
releasables.add(reference);
} else {
unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed));
bytesConsumed = 0;
}
}
return () -> Releasables.close(releasables);
}
}

Expand Down

0 comments on commit 3c8d8c0

Please sign in to comment.