diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/StreamkapElasticConnectorTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/StreamkapElasticConnectorTest.java index 3729e2a..4d6d63a 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/StreamkapElasticConnectorTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/StreamkapElasticConnectorTest.java @@ -10,6 +10,7 @@ import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -26,13 +27,13 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.opensearch.testcontainers.OpensearchContainer; import org.testcontainers.utility.DockerImageName; -import com.github.castorm.kafka.connect.http.model.Offset; import com.github.castorm.kafka.connect.http.model.Partition; import lombok.extern.slf4j.Slf4j; @@ -79,6 +80,56 @@ private Map getConf() { return props; } + private String createAliases(Integer nbIndexes) throws Exception { + String url = opensearch.getHttpHostAddress() + "/_aliases"; + HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection(); + con.setRequestMethod("POST"); + con.setRequestProperty("Content-Type", "application/json"); + con.setRequestProperty("Accept", "application/json"); + con.setDoOutput(true); + + ArrayList aliases = new ArrayList<>(); + StringBuilder input = getAliasesRequestInput(nbIndexes, aliases); + + try (OutputStream os = con.getOutputStream()) { + byte[] byteInput = input.toString().getBytes(StandardCharsets.UTF_8); + os.write(byteInput, 0, byteInput.length); + + assertThat(con.getResponseCode()).isEqualTo(200); + } + + return StringUtils.join(aliases, ","); + } + + @NotNull + private static StringBuilder getAliasesRequestInput(Integer nbIndexes, ArrayList aliases) { + StringBuilder input = new StringBuilder("{\"actions\": [ "); + for (int i = 0; i < nbIndexes; i++) { + String alias = "alias-" + i; + input.append("{\"add\": {\"index\": \"index-").append(i) + .append("\",\"alias\": \"").append(alias).append("\"}}, "); + aliases.add(alias); + } + input.replace(input.length() - 2, input.length() - 1, ""); + input.append("]}"); + return input; + } + + private void deleteAliases(String indexes, String aliases) throws Exception { + String[] aliasesList = aliases.split(","); + String[] indexesList = indexes.split(","); + for (int i = 0; i < aliasesList.length; i++) { + String url = opensearch.getHttpHostAddress() + "/" + indexesList[i] + "/_alias/" + aliasesList[i]; + HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection(); + con.setRequestMethod("DELETE"); + con.setRequestProperty("Content-Type", "application/json"); + con.setRequestProperty("Accept", "application/json"); + con.setDoOutput(true); + + assertThat(con.getResponseCode()).isEqualTo(200); + } + } + private String loadTestData(int nbIndexes) throws Exception { return loadTestData(nbIndexes, 1); } @@ -155,10 +206,16 @@ void testNominal() throws Exception { @Test void testTimestamps() throws Exception { - String endpointIncludeList = loadTestData(4, 4); + // before + String indexes = loadTestData(4, 4); + String aliases = createAliases(4); Map config = getConf(); - config.put(HttpSourceConnectorConfig.ENDPOINT_INCLUDE_LIST, endpointIncludeList); + config.put(HttpSourceConnectorConfig.ENDPOINT_INCLUDE_LIST, aliases); + + // when List records = runTasks(config, 4, 4); + + // then assertThat(records).hasSize(16); assertThat(records.get(0).value()).isInstanceOf(Struct.class); assertThat(((Struct) records.get(0).value()).get("_streamkap_value").toString()).contains("my_timestamp"); @@ -171,6 +228,9 @@ void testTimestamps() throws Exception { "1-0", "1-1", "1-2", "1-3", "2-0", "2-1", "2-2", "2-3", "3-0", "3-1", "3-2", "3-3"); + + // after + deleteAliases(indexes, aliases); } @Test