Skip to content

Commit

Permalink
Add a test for #1264.
Browse files Browse the repository at this point in the history
And move config related test in the config package.
  • Loading branch information
cescoffier committed Jul 5, 2021
1 parent 9fd2371 commit 3b30bce
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.smallrye.reactive.messaging.kafka.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.enterprise.context.ApplicationScoped;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;

import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.base.KafkaBrokerExtension;
import io.smallrye.reactive.messaging.kafka.base.KafkaTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class ConfigOverrideFromEnvTest extends KafkaTestBase {

final static String TOPIC = "ConfigOverrideFromEnvTest-Topic";

@Test
@SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_MY_CHANNEL_TOPIC", value = TOPIC)
public void testOverridingTopicFromEnv() throws InterruptedException {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.my-channel.graceful-shutdown", false)
.with("mp.messaging.incoming.my-channel.topic", "should not be used")
.with("mp.messaging.incoming.my-channel.bootstrap.servers", KafkaBrokerExtension.getBootstrapServers())
.with("mp.messaging.incoming.my-channel.connector",
KafkaConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.my-channel.value.deserializer",
StringDeserializer.class.getName())
.with("mp.messaging.incoming.my-channel."
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Consumer consumer = runApplication(config, Consumer.class);
await().until(() -> isReady() && isAlive());

CountDownLatch latch = new CountDownLatch(1);
usage.produceStrings(5, latch::countDown, () -> new ProducerRecord<>(TOPIC, "key", "hello"));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

await()
.atMost(Duration.ofMinutes(1))
.until(() -> consumer.list().size() == 5);
}

@ApplicationScoped
public static class Consumer {

private final List<String> list = new CopyOnWriteArrayList<>();

@Incoming("my-channel")
public void consume(String data) {
list.add(data);
}

public List<String> list() {
return list;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.smallrye.reactive.messaging.kafka;
package io.smallrye.reactive.messaging.kafka.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand All @@ -17,6 +17,7 @@
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.Test;

import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.base.KafkaBrokerExtension;
import io.smallrye.reactive.messaging.kafka.base.KafkaTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.smallrye.reactive.messaging.kafka;
package io.smallrye.reactive.messaging.kafka.config;

import static io.smallrye.reactive.messaging.kafka.KafkaConnector.CONNECTOR_NAME;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.junit.jupiter.api.Test;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig;
import io.smallrye.reactive.messaging.kafka.base.KafkaTestBase;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.smallrye.reactive.messaging.kafka;
package io.smallrye.reactive.messaging.kafka.config;

import static io.smallrye.reactive.messaging.kafka.KafkaConnector.CONNECTOR_NAME;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig;
import io.smallrye.reactive.messaging.kafka.base.KafkaTestBase;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.smallrye.reactive.messaging.kafka;
package io.smallrye.reactive.messaging.kafka.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
Expand Down

0 comments on commit 3b30bce

Please sign in to comment.