Skip to content

Commit

Permalink
Add support for data streaming CREATE requests (slackhq#752)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Jan 24, 2024
1 parent baa7262 commit 0adba74
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ protected static List<IndexRequest> parseBulkRequest(byte[] postBody) throws IOE
bulkRequest.add(postBody, 0, postBody.length, null, MediaTypeRegistry.JSON);
List<DocWriteRequest<?>> requests = bulkRequest.requests();
for (DocWriteRequest<?> request : requests) {
if (request.opType() == DocWriteRequest.OpType.INDEX) {
if (request.opType() == DocWriteRequest.OpType.INDEX
| request.opType() == DocWriteRequest.OpType.CREATE) {

// The client makes a DocWriteRequest and sends it to the server
// IngestService#innerExecute is where the server eventually reads when request is an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,21 +143,37 @@ public void testIndexRequestWithSpecialChars() throws Exception {
public void testBulkRequests() throws Exception {
byte[] rawRequest = getRawQueryBytes("bulk_requests");
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
assertThat(indexRequests.size()).isEqualTo(2);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
assertThat(indexDocs.keySet().size()).isEqualTo(1);
assertThat(indexDocs.get("test").size()).isEqualTo(1);
assertThat(indexDocs.keySet().size()).isEqualTo(2);
assertThat(indexDocs.get("test1").size()).isEqualTo(1);
assertThat(indexDocs.get("test3").size()).isEqualTo(1);

assertThat(indexDocs.get("test").get(0).getId().toStringUtf8()).isEqualTo("1");
assertThat(indexDocs.get("test").get(0).getTagsList().size()).isEqualTo(2);
Trace.Span indexDoc1 = indexDocs.get("test1").get(0);
Trace.Span indexDoc3 = indexDocs.get("test3").get(0);

assertThat(indexDoc1.getId().toStringUtf8()).isEqualTo("1");
assertThat(indexDoc3.getId().toStringUtf8()).isEqualTo("3");

assertThat(indexDoc1.getTagsList().size()).isEqualTo(2);
assertThat(
indexDocs.get("test").get(0).getTagsList().stream()
indexDoc1.getTagsList().stream()
.filter(
keyValue ->
keyValue.getKey().equals("service_name")
&& keyValue.getVStr().equals("test"))
&& keyValue.getVStr().equals("test1"))
.count())
.isEqualTo(1);

assertThat(indexDoc3.getTagsList().size()).isEqualTo(2);
assertThat(
indexDoc3.getTagsList().stream()
.filter(
keyValue ->
keyValue.getKey().equals("service_name")
&& keyValue.getVStr().equals("test3"))
.count())
.isEqualTo(1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "index" : { "_index" : "test1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "delete" : { "_index" : "test2", "_id" : "2" } }
{ "create" : { "_index" : "test3", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "update" : {"_id" : "1", "_index" : "test4"} }
{ "doc" : {"field2" : "value2"} }

0 comments on commit 0adba74

Please sign in to comment.