diff --git a/src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java b/src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java index 5b1b32a..028a016 100644 --- a/src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java +++ b/src/main/java/io/kestra/plugin/elasticsearch/AbstractSearch.java @@ -1,24 +1,17 @@ package io.kestra.plugin.elasticsearch; import co.elastic.clients.elasticsearch.core.SearchRequest; -import co.elastic.clients.json.JsonpMapper; -import co.elastic.clients.transport.rest_client.RestClientTransport; -import com.fasterxml.jackson.databind.ObjectMapper; import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.runners.RunContext; -import io.kestra.core.serializers.JacksonMapper; import io.kestra.plugin.elasticsearch.model.XContentType; import io.swagger.v3.oas.annotations.media.Schema; -import jakarta.json.stream.JsonParser; +import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; import java.util.List; -import java.util.Map; @SuperBuilder @ToString @@ -26,7 +19,6 @@ @Getter @NoArgsConstructor public abstract class AbstractSearch extends AbstractTask { - private static ObjectMapper MAPPER = JacksonMapper.ofJson(); @Schema( title = "The ElasticSearch indices.", @@ -40,6 +32,7 @@ public abstract class AbstractSearch extends AbstractTask { description = "Can be a JSON string. In this case, the contentType will be used or a raw Map." ) @PluginProperty(dynamic = true) + @NotNull private Object request; @Schema( @@ -54,14 +47,7 @@ public abstract class AbstractSearch extends AbstractTask { protected SearchRequest.Builder request(RunContext runContext) throws IllegalVariableEvaluationException, IOException { SearchRequest.Builder request; - if (this.request instanceof String requestStr) { - request = parseQuery(requestStr); - } else if (this.request instanceof Map requestMap) { - String requestStr = MAPPER.writeValueAsString(requestMap); - request = parseQuery(requestStr); - } else { - throw new IllegalArgumentException("The `request` property must be a String or an Object"); - } + request = QueryService.request(runContext, this.request); if (this.indexes != null) { request.index(runContext.render(this.indexes)); @@ -74,9 +60,4 @@ protected SearchRequest.Builder request(RunContext runContext) throws IllegalVar return request; } - private SearchRequest.Builder parseQuery(String query) throws IOException { - try (Reader reader = new StringReader(query)) { - return new SearchRequest.Builder().withJson(reader); - } - } } diff --git a/src/main/java/io/kestra/plugin/elasticsearch/Esql.java b/src/main/java/io/kestra/plugin/elasticsearch/Esql.java new file mode 100644 index 0000000..2e20905 --- /dev/null +++ b/src/main/java/io/kestra/plugin/elasticsearch/Esql.java @@ -0,0 +1,226 @@ +package io.kestra.plugin.elasticsearch; + +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._helpers.esql.objects.ObjectsEsqlAdapter; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.esql.QueryRequest; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Iterables; +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.common.FetchType; +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.FileSerde; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.*; +import lombok.experimental.SuperBuilder; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import reactor.core.publisher.Flux; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.URI; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static io.kestra.core.utils.Rethrow.throwFunction; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Send a ES|QL search request.", + description = "Get documents from a ES|QL search request and store it as outputs." +) +@Plugin( + examples = { + @Example( + full = true, + code = """ + id: elasticsearch_search + namespace: company.team + + tasks: + - id: search + type: io.kestra.plugin.elasticsearch.Search + connection: + hosts: + - "http://localhost:9200" + query: | + FROM library + | KEEP author, name, page_count, release_date + | SORT page_count DESC + | LIMIT 5 + """ + ) + } +) +public class Esql extends AbstractTask implements RunnableTask { + private static final TypeReference> TYPE_REFERENCE = new TypeReference<>() {}; + + @Schema( + title = "The way you want to store the data.", + description = "FETCH_ONE output the first row, " + + "FETCH output all the rows, " + + "STORE store all rows in a file, " + + "NONE do nothing." + ) + @Builder.Default + @PluginProperty + @NotNull + private FetchType fetchType = FetchType.FETCH; + + @Schema( + title = "The ElasticSearch value.", + description = "Can be a JSON string. In this case, the contentType will be used or a raw Map." + ) + @PluginProperty(dynamic = true) + @NotNull + private String query; + + @Schema( + title = "A Query DSL query filter.", + description = "Specify a Query DSL query in the filter parameter to filter the set of documents that an ES|QL query runs on." + ) + @PluginProperty(dynamic = true) + private Object filter; + + @Override + public Esql.Output run(RunContext runContext) throws Exception { + Logger logger = runContext.logger(); + + try (RestClientTransport transport = this.connection.client(runContext)) { + ElasticsearchClient client = new ElasticsearchClient(transport); + + // build request + QueryRequest queryRequest = QueryRequest.of(throwFunction(builder -> { + builder.query(runContext.render(this.query)); + + if (filter != null) { + SearchRequest.Builder request = QueryService.request(runContext, this.filter); + builder.filter(request.build().query()); + } + + return builder; + } + )); + + logger.debug("Starting query: {}", query); + + Iterable> queryResponse = client + .esql() + .query(ObjectsEsqlAdapter.of(TYPE_REFERENCE.getType()), queryRequest); + + Output.OutputBuilder outputBuilder = Esql.Output.builder(); + + switch (fetchType) { + case FETCH: + Pair>, Integer> fetch = this.fetch(queryResponse); + outputBuilder + .rows(fetch.getLeft()) + .size(fetch.getRight()); + break; + + case FETCH_ONE: + var o = this.fetchOne(queryResponse); + + outputBuilder + .row(o) + .size(o != null ? 1 : 0); + break; + + case STORE: + Pair store = this.store(runContext, queryResponse); + outputBuilder + .uri(store.getLeft()) + .size(store.getRight().intValue()); + break; + } + + int size = Iterables.size(queryResponse); + + runContext.metric(Counter.of("records", size)); + outputBuilder.total((long) size); + + // metrics + runContext.metric(Counter.of("requests.count", 1)); + + // outputs + return outputBuilder + .build(); + } + } + + + protected Pair store(RunContext runContext, Iterable> searchResponse) throws IOException { + File tempFile = runContext.workingDir().createTempFile(".ion").toFile(); + + try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) { + Flux> hitFlux = Flux.fromIterable(searchResponse); + Long count = FileSerde.writeAll(output, hitFlux).block(); + + return Pair.of( + runContext.storage().putFile(tempFile), + count + ); + } + } + + protected Pair>, Integer> fetch(Iterable> searchResponse) { + List> result = StreamSupport.stream(searchResponse.spliterator(), false) + .collect(Collectors.toList()); + + return Pair.of(result, result.size()); + } + + protected Map fetchOne(Iterable> searchResponse) { + if (!searchResponse.iterator().hasNext()) { + return null; + } + + return searchResponse.iterator().next(); + } + + @Builder + @Getter + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema( + title = "The size of the rows fetched." + ) + private Integer size; + + @Schema( + title = "The total of the rows fetched without pagination." + ) + private Long total; + + @Schema( + title = "List containing the fetched data.", + description = "Only populated if using `fetchType=FETCH`." + ) + private List> rows; + + @Schema( + title = "Map containing the first row of fetched data.", + description = "Only populated if using `fetchType=FETCH_ONE`." + ) + private Map row; + + @Schema( + title = "The URI of the stored data.", + description = "Only populated if using `fetchType=STORE`." + ) + private URI uri; + } +} diff --git a/src/main/java/io/kestra/plugin/elasticsearch/QueryService.java b/src/main/java/io/kestra/plugin/elasticsearch/QueryService.java new file mode 100644 index 0000000..71a9791 --- /dev/null +++ b/src/main/java/io/kestra/plugin/elasticsearch/QueryService.java @@ -0,0 +1,35 @@ +package io.kestra.plugin.elasticsearch; + +import co.elastic.clients.elasticsearch.core.SearchRequest; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.JacksonMapper; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.Map; + +public abstract class QueryService { + private static ObjectMapper MAPPER = JacksonMapper.ofJson(); + + @SuppressWarnings("rawtypes") + public static SearchRequest.Builder request(RunContext runContext, Object request) throws IllegalVariableEvaluationException, IOException { + if (request instanceof String requestStr) { + return parseQuery(runContext.render(requestStr)); + } else if (request instanceof Map requestMap) { + String requestStr = runContext.render(MAPPER.writeValueAsString(requestMap)); + return parseQuery(requestStr); + } else { + throw new IllegalArgumentException("The `request` property must be a String or an Object"); + } + } + + private static SearchRequest.Builder parseQuery(String query) throws IOException { + try (Reader reader = new StringReader(query)) { + return new SearchRequest.Builder().withJson(reader); + } + } +} + diff --git a/src/test/java/io/kestra/plugin/elasticsearch/EsqlTest.java b/src/test/java/io/kestra/plugin/elasticsearch/EsqlTest.java new file mode 100644 index 0000000..a872054 --- /dev/null +++ b/src/test/java/io/kestra/plugin/elasticsearch/EsqlTest.java @@ -0,0 +1,122 @@ +package io.kestra.plugin.elasticsearch; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.tasks.common.FetchType; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.serializers.FileSerde; +import io.kestra.core.storages.StorageInterface; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +@KestraTest +class EsqlTest { + @Inject + private RunContextFactory runContextFactory; + + @Value("${elasticsearch-hosts}") + private List hosts; + + @Inject + private StorageInterface storageInterface; + + @Test + void run() throws Exception { + RunContext runContext = runContextFactory.of(); + + Esql task = Esql.builder() + .connection(ElasticsearchConnection.builder().hosts(hosts).build()) + .query(""" + FROM gbif + | WHERE key == 925277090 + """) + .build(); + + Esql.Output run = task.run(runContext); + + assertThat(run.getSize(), is(1)); + assertThat(run.getRows().getFirst().get("genericName"), is("Larus")); + } + + @Test + void filter() throws Exception { + RunContext runContext = runContextFactory.of(); + + Esql task = Esql.builder() + .connection(ElasticsearchConnection.builder().hosts(hosts).build()) + .query("FROM gbif") + .filter(""" + { + "query": { + "term": { + "key": "925277090" + } + } + }""") + .build(); + + Esql.Output run = task.run(runContext); + + assertThat(run.getSize(), is(1)); + assertThat(run.getRows().getFirst().get("genericName"), is("Larus")); + } + + @Test + void runFetchOne() throws Exception { + RunContext runContext = runContextFactory.of(); + + Esql task = Esql.builder() + .connection(ElasticsearchConnection.builder().hosts(hosts).build()) + .query(""" + FROM gbif + | WHERE publishingCountry.keyword == "BE" + """) + .fetchType(FetchType.FETCH_ONE) + .build(); + + Esql.Output run = task.run(runContext); + + assertThat(run.getSize(), is(1)); + assertThat(run.getTotal(), is(28L)); + assertThat(run.getRow().get("key"), is(925277090L)); + } + + @SuppressWarnings("unchecked") + @Test + void runStored() throws Exception { + RunContext runContext = runContextFactory.of(); + + Esql task = Esql.builder() + .connection(ElasticsearchConnection.builder().hosts(hosts).build()) + .query(""" + FROM gbif + | WHERE publishingCountry.keyword == "BE" + | LIMIT 10 + """) + .fetchType(FetchType.STORE) + .build(); + + Esql.Output run = task.run(runContext); + + assertThat(run.getSize(), is(10)); + assertThat(run.getTotal(), is(10L)); + assertThat(run.getUri(), notNullValue()); + + BufferedReader inputStream = new BufferedReader(new InputStreamReader(storageInterface.get(null, run.getUri()))); + List> result = new ArrayList<>(); + FileSerde.reader(inputStream, r -> result.add((Map) r)); + + assertThat(result.get(8).get("key"), is(925311404)); + } +}