Skip to content

Commit

Permalink
Change
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Aug 28, 2024
1 parent d3bbac1 commit 1bdb92d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testIncrementalMalformed() throws IOException {
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request successfulIndexingRequest = new Request("POST", "/index_name/_bulk");
Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
Expand All @@ -84,12 +84,8 @@ public void testIncrementalMalformed() throws IOException {
bulk.append("{}\n");
bulk.append("\r\n");

successfulIndexingRequest.setJsonEntity(bulk.toString());

ResponseException responseException = expectThrows(
ResponseException.class,
() -> getRestClient().performRequest(successfulIndexingRequest)
);
bulkRequest.setJsonEntity(bulk.toString());

ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
BulkResponse response = combineResponses();
listener.onResponse(response);
listener.onResponse(combineResponses());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,8 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
}

final BytesReference data;
final Releasable releasable;
int bytesConsumed;
try {
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine

unParsedChunks.add(chunk);

if (unParsedChunks.size() > 1) {
Expand All @@ -199,7 +196,9 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
data = chunk;
}

int bytesConsumed = parser.incrementalParse(
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine
bytesConsumed = parser.incrementalParse(
data,
defaultIndex,
defaultRouting,
Expand All @@ -217,21 +216,24 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
stringDeduplicator
);

releasable = accountParsing(bytesConsumed);

} catch (Exception e) {
// TODO: This needs to be better
Releasables.close(handler);
Releasables.close(unParsedChunks);
unParsedChunks.clear();
new RestToXContentListener<>(channel).onFailure(e);
isException = true;
return;
}

final Releasable releasable = accountParsing(bytesConsumed);
if (isLast) {
assert unParsedChunks.isEmpty();
assert channel != null;
handler.lastItems(new ArrayList<>(items), releasable, new RestRefCountedChunkedToXContentListener<>(channel));
items.clear();
} else if (items.isEmpty() == false) {
handler.addItems(new ArrayList<>(items), releasable, () -> { request.contentStream().next(); });
handler.addItems(new ArrayList<>(items), releasable, () -> request.contentStream().next());
items.clear();
} else {
releasable.close();
Expand All @@ -240,8 +242,6 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo

@Override
public void close() {
Releasables.close(handler);
Releasables.close(unParsedChunks);
RequestBodyChunkConsumer.super.close();
}

Expand All @@ -268,7 +268,6 @@ public boolean supportsBulkContent() {

@Override
public boolean allowsUnsafeBuffers() {
// TODO: Does this change with the chunking?
return true;
return false;
}
}

0 comments on commit 1bdb92d

Please sign in to comment.