Skip to content

Commit

Permalink
Update thread leak scope in ingestion IT
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Bharadwaj <[email protected]>
  • Loading branch information
varunbharadwaj committed Feb 3, 2025
1 parent c20567f commit 3d45a2a
Showing 1 changed file with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package org.opensearch.plugin.kafka;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -45,7 +45,7 @@
/**
* Integration test for Kafka ingestion
*/
@ThreadLeakLingering(linger = 15000) // wait for container pull thread to die
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
public class IngestFromKafkaIT extends OpenSearchIntegTestCase {
static final String topicName = "test";

Expand Down Expand Up @@ -75,29 +75,31 @@ public void testPluginsAreInstalled() {
}

public void testKafkaIngestion() {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test");
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});

stopKafka();
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test");
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
}

private void setupKafka() {
Expand Down

0 comments on commit 3d45a2a

Please sign in to comment.