diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/ConfigOverrideFromEnvTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/ConfigOverrideFromEnvTest.java new file mode 100644 index 0000000000..cd7901a351 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/ConfigOverrideFromEnvTest.java @@ -0,0 +1,71 @@ +package io.smallrye.reactive.messaging.kafka.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; +import org.junitpioneer.jupiter.SetEnvironmentVariable; + +import io.smallrye.reactive.messaging.kafka.KafkaConnector; +import io.smallrye.reactive.messaging.kafka.base.KafkaBrokerExtension; +import io.smallrye.reactive.messaging.kafka.base.KafkaTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class ConfigOverrideFromEnvTest extends KafkaTestBase { + + final static String TOPIC = "ConfigOverrideFromEnvTest-Topic"; + + @Test + @SetEnvironmentVariable(key = "MP_MESSAGING_INCOMING_MY_CHANNEL_TOPIC", value = TOPIC) + public void testOverridingTopicFromEnv() throws InterruptedException { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.my-channel.graceful-shutdown", false) + .with("mp.messaging.incoming.my-channel.topic", "should not be used") + .with("mp.messaging.incoming.my-channel.bootstrap.servers", KafkaBrokerExtension.getBootstrapServers()) + .with("mp.messaging.incoming.my-channel.connector", + KafkaConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.my-channel.value.deserializer", + StringDeserializer.class.getName()) + .with("mp.messaging.incoming.my-channel." + + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + Consumer consumer = runApplication(config, Consumer.class); + await().until(() -> isReady() && isAlive()); + + CountDownLatch latch = new CountDownLatch(1); + usage.produceStrings(5, latch::countDown, () -> new ProducerRecord<>(TOPIC, "key", "hello")); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + + await() + .atMost(Duration.ofMinutes(1)) + .until(() -> consumer.list().size() == 5); + } + + @ApplicationScoped + public static class Consumer { + + private final List list = new CopyOnWriteArrayList<>(); + + @Incoming("my-channel") + public void consume(String data) { + list.add(data); + } + + public List list() { + return list; + } + } + +} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConfigWithDotsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/ConfigWithDotsTest.java similarity index 96% rename from smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConfigWithDotsTest.java rename to smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/ConfigWithDotsTest.java index 80cbbd2e83..b8c47cc79c 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConfigWithDotsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/ConfigWithDotsTest.java @@ -1,4 +1,4 @@ -package io.smallrye.reactive.messaging.kafka; +package io.smallrye.reactive.messaging.kafka.config; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -17,6 +17,7 @@ import org.eclipse.microprofile.reactive.messaging.Incoming; import org.junit.jupiter.api.Test; +import io.smallrye.reactive.messaging.kafka.KafkaConnector; import io.smallrye.reactive.messaging.kafka.base.KafkaBrokerExtension; import io.smallrye.reactive.messaging.kafka.base.KafkaTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/DefaultConfigTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/DefaultConfigTest.java similarity index 97% rename from smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/DefaultConfigTest.java rename to smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/DefaultConfigTest.java index d4ed7ce7ce..96de9041e4 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/DefaultConfigTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/DefaultConfigTest.java @@ -1,4 +1,4 @@ -package io.smallrye.reactive.messaging.kafka; +package io.smallrye.reactive.messaging.kafka.config; import static io.smallrye.reactive.messaging.kafka.KafkaConnector.CONNECTOR_NAME; import static org.assertj.core.api.Assertions.assertThat; @@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test; import io.smallrye.common.annotation.Identifier; +import io.smallrye.reactive.messaging.kafka.KafkaRecord; import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; import io.smallrye.reactive.messaging.kafka.base.KafkaTestBase; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/DeprecatedDefaultConfigTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/DeprecatedDefaultConfigTest.java similarity index 97% rename from smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/DeprecatedDefaultConfigTest.java rename to smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/DeprecatedDefaultConfigTest.java index 30bff71698..ae1e98c5c5 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/DeprecatedDefaultConfigTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/DeprecatedDefaultConfigTest.java @@ -1,4 +1,4 @@ -package io.smallrye.reactive.messaging.kafka; +package io.smallrye.reactive.messaging.kafka.config; import static io.smallrye.reactive.messaging.kafka.KafkaConnector.CONNECTOR_NAME; import static org.assertj.core.api.Assertions.assertThat; @@ -29,6 +29,7 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.junit.jupiter.api.Test; +import io.smallrye.reactive.messaging.kafka.KafkaRecord; import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; import io.smallrye.reactive.messaging.kafka.base.KafkaTestBase; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/EnvAndSysConfigTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/EnvAndSysConfigTest.java similarity index 98% rename from smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/EnvAndSysConfigTest.java rename to smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/EnvAndSysConfigTest.java index c5871a9479..a5381d12da 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/EnvAndSysConfigTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/config/EnvAndSysConfigTest.java @@ -1,4 +1,4 @@ -package io.smallrye.reactive.messaging.kafka; +package io.smallrye.reactive.messaging.kafka.config; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await;