Skip to content

Commit

Permalink
feat: support ion as input for Bulk
Browse files Browse the repository at this point in the history
close #18
  • Loading branch information
tchiotludo committed Nov 20, 2024
1 parent fc62a40 commit 33d6221
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 18 deletions.
39 changes: 31 additions & 8 deletions src/main/java/io/kestra/plugin/elasticsearch/Bulk.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.elasticsearch;

import co.elastic.clients.elasticsearch.core.bulk.*;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
Expand Down Expand Up @@ -61,16 +62,34 @@ public class Bulk extends AbstractLoad implements RunnableTask<Bulk.Output> {
@Override
protected Flux<BulkOperation> source(RunContext runContext, BufferedReader inputStream) throws IOException {
return Flux
.create(this.esNdJSonReader(inputStream), FluxSink.OverflowStrategy.BUFFER);
.create(this.fileReader(inputStream), FluxSink.OverflowStrategy.BUFFER);
}

@SuppressWarnings("unchecked")
public Consumer<FluxSink<BulkOperation>> esNdJSonReader(BufferedReader input) throws IOException {
public Consumer<FluxSink<BulkOperation>> fileReader(BufferedReader input) throws IOException {
return throwConsumer(s -> {
String row;
Boolean isJson = null;

while ((row = input.readLine()) != null) {
Map.Entry<String, Object> operation = JacksonMapper.toMap(row).entrySet().iterator().next();
// validate if it's json or ion
if (isJson == null) {
try {
OBJECT_MAPPER.readTree(row);
isJson = true;
} catch (JacksonException e) {
isJson = false;
}
}

Map<String, Object> data;
if (isJson) {
data = JacksonMapper.toMap(row);
} else {
data = JacksonMapper.ofIon().readValue(row, JacksonMapper.MAP_TYPE_REFERENCE);
}

Map.Entry<String, Object> operation = data.entrySet().iterator().next();
Map<String, Object> value = (Map<String, Object>) operation.getValue();

var bulkOperation = new BulkOperation.Builder();
Expand All @@ -80,15 +99,15 @@ public Consumer<FluxSink<BulkOperation>> esNdJSonReader(BufferedReader input) th
var indexOperation = new IndexOperation.Builder<>()
.id((String) value.get("_id"))
.index((String) value.get("_index"))
.document(parseline(input.readLine()));
.document(parseline(isJson, input.readLine()));
bulkOperation.index(indexOperation.build());
break;
case "create":
var createOperation = new CreateOperation.Builder<>()
.id((String) value.get("_id"))
.index((String) value.get("_index"))
.ifPrimaryTerm(0L) //FIXME opType
.document(parseline(input.readLine()));
.document(parseline(isJson, input.readLine()));
bulkOperation.create(createOperation.build());
break;
case "update":
Expand All @@ -97,7 +116,7 @@ public Consumer<FluxSink<BulkOperation>> esNdJSonReader(BufferedReader input) th
.index((String) value.get("_index"))
.action(new UpdateAction.Builder<>()
.docAsUpsert(true)
.doc(parseline(input.readLine()))
.doc(parseline(isJson, input.readLine()))
.build());
bulkOperation.update(updateOperation.build());
break;
Expand All @@ -118,7 +137,11 @@ public Consumer<FluxSink<BulkOperation>> esNdJSonReader(BufferedReader input) th
});
}

private Map<?,?> parseline(String line) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(line, JacksonMapper.MAP_TYPE_REFERENCE);
private static Map<?,?> parseline(Boolean isJson, String line) throws JsonProcessingException {
if (isJson) {
return OBJECT_MAPPER.readValue(line, JacksonMapper.MAP_TYPE_REFERENCE);
} else {
return JacksonMapper.ofIon().readValue(line, JacksonMapper.MAP_TYPE_REFERENCE);
}
}
}
62 changes: 52 additions & 10 deletions src/test/java/io/kestra/plugin/elasticsearch/BulkTest.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package io.kestra.plugin.elasticsearch;

import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.annotation.Value;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

Expand All @@ -17,7 +19,10 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

Expand All @@ -32,22 +37,59 @@ class BulkTest {
@Inject
private StorageInterface storageInterface;

private static final Function<String, List<Map<String, Object>>> DATA = (String indice) -> List.of(
Map.of("index", Map.of("_index", indice, "_id", "1")),
Map.of("field1", "value1"),
Map.of("delete", Map.of("_index", indice, "_id", "1")),
Map.of("create", Map.of("_index", indice, "_id", "3")),
Map.of("field1", "value3"),
Map.of("update", Map.of("_index", indice, "_id", "1")),
Map.of("doc", Map.of("field2", "value2")),
Map.of("create", Map.of("_index", indice)),
Map.of("field1", "value4")
);

@Test
void run() throws Exception {
RunContext runContext = runContextFactory.of();

String indice = "ut_" + IdUtils.create().toLowerCase(Locale.ROOT);

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".trs");
try (OutputStream output = new FileOutputStream(tempFile)) {
output.write(("{ \"index\" : { \"_index\" : \"" + indice + "\", \"_id\" : \"1\" } }\n").getBytes(StandardCharsets.UTF_8));
output.write(("{ \"field1\" : \"value1\" }\n").getBytes(StandardCharsets.UTF_8));
output.write(("{ \"delete\" : { \"_index\" : \"" + indice + "\", \"_id\" : \"1\" } }\n").getBytes(StandardCharsets.UTF_8));
output.write(("{ \"create\" : { \"_index\" : \"" + indice + "\", \"_id\" : \"3\" } }\n").getBytes(StandardCharsets.UTF_8));
output.write(("{ \"field1\" : \"value3\" }\n").getBytes(StandardCharsets.UTF_8));
output.write(("{ \"update\" : {\"_id\" : \"1\", \"_index\" : \"" + indice + "\"} }\n").getBytes(StandardCharsets.UTF_8));
output.write(("{ \"doc\" : {\"field2\" : \"value2\"} }\n").getBytes(StandardCharsets.UTF_8));
output.write(("{ \"create\" : { \"_index\" : \"" + indice + "\"}}\n").getBytes(StandardCharsets.UTF_8));
output.write(("{ \"field1\" : \"value4\" }\n").getBytes(StandardCharsets.UTF_8));
DATA.apply(indice)
.forEach(throwConsumer(s -> output.write((JacksonMapper
.ofJson()
.writeValueAsString(s) + "\n")
.getBytes(StandardCharsets.UTF_8)
)));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

Bulk put = Bulk.builder()
.connection(ElasticsearchConnection.builder().hosts(hosts).build())
.from(uri.toString())
.chunk(10)
.build();

Bulk.Output runOutput = put.run(runContext);

assertThat(runOutput.getSize(), is(5L));
assertThat(runContext.metrics().stream().filter(e -> e.getName().equals("requests.count")).findFirst().orElseThrow().getValue(), is(1D));
assertThat(runContext.metrics().stream().filter(e -> e.getName().equals("records")).findFirst().orElseThrow().getValue(), is(5D));
}

@Test
void runIon() throws Exception {
RunContext runContext = runContextFactory.of();

String indice = "ut_" + IdUtils.create().toLowerCase(Locale.ROOT);

File tempFile = File.createTempFile(this.getClass().getSimpleName().toLowerCase() + "_", ".ion");
try (OutputStream output = new FileOutputStream(tempFile)) {
DATA.apply(indice)
.forEach(throwConsumer(s -> FileSerde.write(output, s)));
}

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
Expand Down

0 comments on commit 33d6221

Please sign in to comment.