Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Oct 23, 2024
1 parent 93b565c commit 3daab65
Showing 1 changed file with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.core.RestApiVersion;
Expand All @@ -21,8 +22,56 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;

public class BulkRequestParserTests extends ESTestCase {

public void testIncrementalParsing() throws IOException {
ArrayList<DocWriteRequest<?>> indexRequests = new ArrayList<>();
ArrayList<DocWriteRequest<?>> updateRequests = new ArrayList<>();
ArrayList<DocWriteRequest<?>> deleteRequests = new ArrayList<>();

BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser(
null,
null,
null,
null,
null,
null,
null,
false,
XContentType.JSON,
(r, t) -> indexRequests.add(r),
updateRequests::add,
deleteRequests::add
);

BytesArray request = new BytesArray("""
{ "index":{ "_id": "bar", "pipeline": "foo" } }
{ "field": "value"}
{ "index":{ "require_alias": false } }
{ "field": "value" }
{ "update":{ "_id": "bus", "require_alias": true } }
{ "doc": {"field": "value" }}
{ "delete":{ "_id": "baz" } }
{ "index": { } }
{ "field": "value"}
{ "delete":{ "_id": "bop" } }
""");

int consumed = 0;
for (int i = 0; i < request.length() - 1; ++i) {
consumed += incrementalParser.parse(request.slice(consumed, i - consumed + 1), false);
}
consumed += incrementalParser.parse(request.slice(consumed, request.length() - consumed), true);
assertThat(consumed, equalTo(request.length()));

assertThat(indexRequests.size(), equalTo(3));
assertThat(updateRequests.size(), equalTo(1));
assertThat(deleteRequests.size(), equalTo(2));
}

public void testIndexRequest() throws IOException {
BytesArray request = new BytesArray("""
{ "index":{ "_id": "bar" } }
Expand Down Expand Up @@ -126,7 +175,7 @@ public void testUpdateRequest() throws IOException {
}, req -> fail());
}

public void testBarfOnLackOfTrailingNewline() {
public void testBarfOnLackOfTrailingNewline() throws IOException {
BytesArray request = new BytesArray("""
{ "index":{ "_id": "bar" } }
{}""");
Expand All @@ -150,6 +199,27 @@ public void testBarfOnLackOfTrailingNewline() {
)
);
assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage());

BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser(
"foo",
null,
null,
null,
null,
null,
null,
false,
XContentType.JSON,
(req, type) -> {},
req -> {},
req -> {}
);

// Should not throw because not last
incrementalParser.parse(request, false);

IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, true));
assertEquals("The bulk request must be terminated by a newline [\\n]", e2.getMessage());
}

public void testFailOnExplicitIndex() {
Expand Down

0 comments on commit 3daab65

Please sign in to comment.