diff --git a/CHANGELOG.md b/CHANGELOG.md index 499648f46b..ca42f3c40b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Please enumerate **all user-facing** changes using format ` "connections".equals(metricNameEntry.getKey().name())) .findFirst(); - assertThat(connectionsMetric.isPresent()).isTrue(); + assertThat(connectionsMetric).isPresent(); var protocol = connectionsMetric.get().getKey().tags().get("protocol"); assertThat(protocol).startsWith("TLS"); } diff --git a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java index 2a8390a917..c0f5c65875 100644 --- a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java +++ b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java @@ -12,6 +12,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -19,7 +20,6 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.message.ApiVersionsRequestData; @@ -64,6 +64,7 @@ import io.kroxylicious.test.tester.MockServerKroxyliciousTester; import io.kroxylicious.testing.kafka.api.KafkaCluster; import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; +import io.kroxylicious.testing.kafka.junit5ext.Topic; import static io.kroxylicious.UnknownTaggedFields.unknownTaggedFieldsToStrings; import static io.kroxylicious.proxy.filter.ApiVersionsMarkingFilter.INTERSECTED_API_VERSION_RANGE_TAG; @@ -87,32 +88,17 @@ import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @ExtendWith(KafkaClusterExtension.class) class FilterIT { - private static final String TOPIC_1 = "my-test-topic"; - private static final String TOPIC_2 = "other-test-topic"; private static final String PLAINTEXT = "Hello, world!"; - private static final byte[] TOPIC_1_CIPHERTEXT = { (byte) 0x3d, (byte) 0x5a, (byte) 0x61, (byte) 0x61, (byte) 0x64, (byte) 0x21, (byte) 0x15, (byte) 0x6c, - (byte) 0x64, (byte) 0x67, (byte) 0x61, (byte) 0x59, (byte) 0x16 }; - private static final byte[] TOPIC_2_CIPHERTEXT = { (byte) 0xffffffa7, (byte) 0xffffffc4, (byte) 0xffffffcb, (byte) 0xffffffcb, (byte) 0xffffffce, (byte) 0xffffff8b, - (byte) 0x7f, (byte) 0xffffffd6, (byte) 0xffffffce, (byte) 0xffffffd1, (byte) 0xffffffcb, (byte) 0xffffffc3, (byte) 0xffffff80 }; private static final FilterDefinitionBuilder REJECTING_CREATE_TOPIC_FILTER = new FilterDefinitionBuilder(RejectingCreateTopicFilterFactory.class.getName()); private static NettyLeakLogAppender appender; @BeforeAll - public static void checkReversibleEncryption() { - // The precise details of the cipher don't matter - // What matters is that it the ciphertext key depends on the topic name - // and that decode() is the inverse of encode() - assertArrayEquals(TOPIC_1_CIPHERTEXT, encode(TOPIC_1, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))).array()); - assertEquals(PLAINTEXT, new String(decode(TOPIC_1, ByteBuffer.wrap(TOPIC_1_CIPHERTEXT)).array(), StandardCharsets.UTF_8)); - assertArrayEquals(TOPIC_2_CIPHERTEXT, encode(TOPIC_2, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))).array()); - assertEquals(PLAINTEXT, new String(decode(TOPIC_2, ByteBuffer.wrap(TOPIC_2_CIPHERTEXT)).array(), StandardCharsets.UTF_8)); + public static void beforeAll() { final LoggerContext ctx = (LoggerContext) LogManager.getContext(false); final Configuration config = ctx.getConfiguration(); @@ -129,6 +115,30 @@ public void checkNoNettyLeaks() { appender.verifyNoLeaks(); } + @Test + void reversibleEncryption() { + // The precise details of the cipher don't matter + // What matters is that it the ciphertext key depends on the topic name + // and that decode() is the inverse of encode() + var name = UUID.randomUUID().toString(); + var encoded = encode(name, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))); + var decoded = new String(decode(name, encoded).array(), StandardCharsets.UTF_8); + assertThat(decoded) + .isEqualTo(PLAINTEXT); + } + + @Test + void encryptionDistinguishedByName() { + var name = UUID.randomUUID().toString(); + var differentName = UUID.randomUUID().toString(); + assertThat(differentName).isNotEqualTo(name); + + var encoded = encode(name, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))); + + assertThat(encoded) + .isNotEqualTo(encode(differentName, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8)))); + } + static ByteBuffer encode(String topicName, ByteBuffer in) { var out = ByteBuffer.allocate(in.limit()); byte rot = (byte) (topicName.hashCode() % Byte.MAX_VALUE); @@ -153,18 +163,22 @@ static ByteBuffer decode(String topicName, ByteBuffer in) { } @Test - void shouldPassThroughRecordUnchanged(KafkaCluster cluster, Admin admin) throws Exception { - admin.createTopics(List.of(new NewTopic(TOPIC_1, 1, (short) 1))).all().get(); + void shouldPassThroughRecordUnchanged(KafkaCluster cluster, Topic topic) throws Exception { try (var tester = kroxyliciousTester(proxy(cluster)); var producer = tester.producer(Map.of(CLIENT_ID_CONFIG, "shouldPassThroughRecordUnchanged", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000)); var consumer = tester.consumer()) { - producer.send(new ProducerRecord<>(TOPIC_1, "my-key", "Hello, world!")).get(); - consumer.subscribe(Set.of(TOPIC_1)); + producer.send(new ProducerRecord<>(topic.name(), "my-key", "Hello, world!")).get(); + consumer.subscribe(Set.of(topic.name())); var records = consumer.poll(Duration.ofSeconds(10)); consumer.close(); - assertEquals(1, records.count()); - assertEquals("Hello, world!", records.iterator().next().value()); + + assertThat(records.iterator()) + .toIterable() + .hasSize(1) + .map(ConsumerRecord::value) + .containsExactly(PLAINTEXT); + } } @@ -174,13 +188,14 @@ void requestFiltersCanRespondWithoutProxying(KafkaCluster cluster, Admin admin) var config = proxy(cluster) .addToFilters(REJECTING_CREATE_TOPIC_FILTER.build()); + var topicName = UUID.randomUUID().toString(); try (var tester = kroxyliciousTester(config); var proxyAdmin = tester.admin()) { - assertCreatingTopicThrowsExpectedException(proxyAdmin); + assertCreatingTopicThrowsExpectedException(proxyAdmin, topicName); // check no topic created on the cluster Set names = admin.listTopics().names().get(10, TimeUnit.SECONDS); - assertThat(names).doesNotContain(TOPIC_1); + assertThat(names).doesNotContain(topicName); } } @@ -306,12 +321,13 @@ void requestFiltersCanRespondWithoutProxyingDoesntLeakBuffers(KafkaCluster clust var config = proxy(cluster) .addToFilters(REJECTING_CREATE_TOPIC_FILTER.build()); + var name = UUID.randomUUID().toString(); try (var tester = kroxyliciousTester(config); var proxyAdmin = tester.admin()) { // loop because System.gc doesn't make any guarantees that the buffer will be collected for (int i = 0; i < 20; i++) { // CreateTopicRejectFilter allocates a buffer and then short-circuit responds - assertCreatingTopicThrowsExpectedException(proxyAdmin); + assertCreatingTopicThrowsExpectedException(proxyAdmin, name); // buffers must be garbage collected before it causes leak detection during // a subsequent buffer allocation System.gc(); @@ -319,13 +335,13 @@ void requestFiltersCanRespondWithoutProxyingDoesntLeakBuffers(KafkaCluster clust // check no topic created on the cluster Set names = admin.listTopics().names().get(10, TimeUnit.SECONDS); - assertThat(names).doesNotContain(TOPIC_1); + assertThat(names).doesNotContain(name); } } - private static void assertCreatingTopicThrowsExpectedException(Admin proxyAdmin) { + private static void assertCreatingTopicThrowsExpectedException(Admin proxyAdmin, String topicName) { assertThatExceptionOfType(ExecutionException.class) - .isThrownBy(() -> proxyAdmin.createTopics(List.of(new NewTopic(TOPIC_1, 1, (short) 1))).all().get()) + .isThrownBy(() -> proxyAdmin.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get()) .withCauseInstanceOf(InvalidTopicException.class) .havingCause() .withMessage(RejectingCreateTopicFilter.ERROR_MESSAGE); @@ -339,7 +355,7 @@ void testCompositeFilter() { var kafkaClient = tester.simpleTestClient()) { tester.addMockResponseForApiKey(new ResponsePayload(METADATA, METADATA.latestVersion(), new MetadataResponseData())); kafkaClient.getSync(new Request(METADATA, METADATA.latestVersion(), "client", new MetadataRequestData())); - assertEquals("123banana", tester.getOnlyRequest().clientIdHeader()); + assertThat(tester.getOnlyRequest().clientIdHeader()).isEqualTo("123banana"); } } @@ -363,10 +379,11 @@ void testApiVersionsAvailableToFilter() { } @Test - void shouldModifyProduceMessage(KafkaCluster cluster, Admin admin) throws Exception { - admin.createTopics(List.of( - new NewTopic(TOPIC_1, 1, (short) 1), - new NewTopic(TOPIC_2, 1, (short) 1))).all().get(); + void shouldModifyProduceMessage(KafkaCluster cluster, Topic topic1, Topic topic2) throws Exception { + + var bytes = PLAINTEXT.getBytes(StandardCharsets.UTF_8); + var expectedEncoded1 = encode(topic1.name(), ByteBuffer.wrap(bytes)).array(); + var expectedEncoded2 = encode(topic2.name(), ByteBuffer.wrap(bytes)).array(); var config = proxy(cluster) .addToFilters(new FilterDefinitionBuilder(ProduceRequestTransformationFilterFactory.class.getName()) @@ -376,21 +393,23 @@ void shouldModifyProduceMessage(KafkaCluster cluster, Admin admin) throws Except var producer = tester.producer(Map.of(CLIENT_ID_CONFIG, "shouldModifyProduceMessage", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000)); var consumer = tester .consumer(Serdes.String(), Serdes.ByteArray(), Map.of(GROUP_ID_CONFIG, "my-group-id", AUTO_OFFSET_RESET_CONFIG, "earliest"))) { - producer.send(new ProducerRecord<>(TOPIC_1, "my-key", PLAINTEXT)).get(); - producer.send(new ProducerRecord<>(TOPIC_2, "my-key", PLAINTEXT)).get(); + producer.send(new ProducerRecord<>(topic1.name(), "my-key", PLAINTEXT)).get(); + producer.send(new ProducerRecord<>(topic2.name(), "my-key", PLAINTEXT)).get(); producer.flush(); - ConsumerRecords records1; - ConsumerRecords records2; - consumer.subscribe(Set.of(TOPIC_1)); - records1 = consumer.poll(Duration.ofSeconds(10)); - consumer.subscribe(Set.of(TOPIC_2)); - records2 = consumer.poll(Duration.ofSeconds(10)); - - assertEquals(1, records1.count()); - assertArrayEquals(TOPIC_1_CIPHERTEXT, records1.iterator().next().value()); - assertEquals(1, records2.count()); - assertArrayEquals(TOPIC_2_CIPHERTEXT, records2.iterator().next().value()); + consumer.subscribe(Set.of(topic1.name(), topic2.name())); + var records = consumer.poll(Duration.ofSeconds(10)); + + assertThat(records).hasSize(2); + assertThat(records.records(topic1.name())) + .hasSize(1) + .map(ConsumerRecord::value) + .containsExactly(expectedEncoded1); + + assertThat(records.records(topic2.name())) + .hasSize(1) + .map(ConsumerRecord::value) + .containsExactly(expectedEncoded2); } } @@ -461,33 +480,33 @@ void zeroAckProduceRequestsDoNotInterfereWithResponseReorderingLogic() throws Ex // zero-ack produce requests require special handling because they have no response associated // this checks that Kroxy can handle the basics of forwarding them. @Test - void shouldModifyZeroAckProduceMessage(KafkaCluster cluster, Admin admin) throws Exception { - admin.createTopics(List.of(new NewTopic(TOPIC_1, 1, (short) 1))).all().get(); - + void shouldModifyZeroAckProduceMessage(KafkaCluster cluster, Topic topic) throws Exception { var config = proxy(cluster) .addToFilters(new FilterDefinitionBuilder(ProduceRequestTransformationFilterFactory.class.getName()) .withConfig("transformation", TestEncoderFactory.class.getName()).build()); + var expectedEncoded = encode(topic.name(), ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))).array(); + try (var tester = kroxyliciousTester(config); var producer = tester.producer(Map.of(CLIENT_ID_CONFIG, "shouldModifyProduceMessage", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000, ACKS_CONFIG, "0")); var consumer = tester .consumer(Serdes.String(), Serdes.ByteArray(), Map.of(GROUP_ID_CONFIG, "my-group-id", AUTO_OFFSET_RESET_CONFIG, "earliest"))) { - producer.send(new ProducerRecord<>(TOPIC_1, "my-key", PLAINTEXT)).get(); + producer.send(new ProducerRecord<>(topic.name(), "my-key", PLAINTEXT)).get(); producer.flush(); - ConsumerRecords records1; - consumer.subscribe(Set.of(TOPIC_1)); - records1 = consumer.poll(Duration.ofSeconds(10)); + consumer.subscribe(Set.of(topic.name())); + var records = consumer.poll(Duration.ofSeconds(10)); - assertThat(records1).hasSize(1); - assertThat(records1.records(TOPIC_1)).map(ConsumerRecord::value) - .containsExactly(TOPIC_1_CIPHERTEXT); + assertThat(records.iterator()) + .toIterable() + .hasSize(1) + .map(ConsumerRecord::value) + .containsExactly(expectedEncoded); } } @Test - void shouldForwardUnfilteredZeroAckProduceMessage(KafkaCluster cluster, Admin admin) throws Exception { - admin.createTopics(List.of(new NewTopic(TOPIC_1, 1, (short) 1))).all().get(); + void shouldForwardUnfilteredZeroAckProduceMessage(KafkaCluster cluster, Topic topic) throws Exception { var config = proxy(cluster); @@ -495,23 +514,26 @@ void shouldForwardUnfilteredZeroAckProduceMessage(KafkaCluster cluster, Admin ad var producer = tester.producer(Map.of(CLIENT_ID_CONFIG, "shouldModifyProduceMessage", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000, ACKS_CONFIG, "0")); var consumer = tester .consumer(Serdes.String(), Serdes.String(), Map.of(GROUP_ID_CONFIG, "my-group-id", AUTO_OFFSET_RESET_CONFIG, "earliest"))) { - producer.send(new ProducerRecord<>(TOPIC_1, "my-key", PLAINTEXT)).get(); + producer.send(new ProducerRecord<>(topic.name(), "my-key", PLAINTEXT)).get(); producer.flush(); - consumer.subscribe(Set.of(TOPIC_1)); - var records1 = consumer.poll(Duration.ofSeconds(10)); + consumer.subscribe(Set.of(topic.name())); + var records = consumer.poll(Duration.ofSeconds(10)); - assertEquals(1, records1.count()); - assertEquals(PLAINTEXT, records1.iterator().next().value()); + assertThat(records).hasSize(1); + assertThat(records.iterator()) + .toIterable() + .map(ConsumerRecord::value) + .containsExactly(PLAINTEXT); } } @Test - void shouldModifyFetchMessage(KafkaCluster cluster, Admin admin) throws Exception { + void shouldModifyFetchMessage(KafkaCluster cluster, Topic topic1, Topic topic2) throws Exception { - admin.createTopics(List.of( - new NewTopic(TOPIC_1, 1, (short) 1), - new NewTopic(TOPIC_2, 1, (short) 1))).all().get(); + var bytes = PLAINTEXT.getBytes(StandardCharsets.UTF_8); + var encoded1 = encode(topic1.name(), ByteBuffer.wrap(bytes)).array(); + var encoded2 = encode(topic2.name(), ByteBuffer.wrap(bytes)).array(); var config = proxy(cluster) .addToFilters(new FilterDefinitionBuilder(FetchResponseTransformationFilterFactory.class.getName()) @@ -522,22 +544,18 @@ void shouldModifyFetchMessage(KafkaCluster cluster, Admin admin) throws Exceptio Map.of(CLIENT_ID_CONFIG, "shouldModifyFetchMessage", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000)); var consumer = tester.consumer()) { - producer.send(new ProducerRecord<>(TOPIC_1, "my-key", TOPIC_1_CIPHERTEXT)).get(); - producer.send(new ProducerRecord<>(TOPIC_2, "my-key", TOPIC_2_CIPHERTEXT)).get(); - ConsumerRecords records1; - ConsumerRecords records2; - consumer.subscribe(Set.of(TOPIC_1)); - - records1 = consumer.poll(Duration.ofSeconds(100)); - - consumer.subscribe(Set.of(TOPIC_2)); - - records2 = consumer.poll(Duration.ofSeconds(100)); - assertEquals(1, records1.count()); - assertEquals(1, records2.count()); - assertEquals(List.of(PLAINTEXT, PLAINTEXT), - List.of(records1.iterator().next().value(), - records2.iterator().next().value())); + producer.send(new ProducerRecord<>(topic1.name(), "my-key", encoded1)).get(); + producer.send(new ProducerRecord<>(topic2.name(), "my-key", encoded2)).get(); + + consumer.subscribe(Set.of(topic1.name(), topic2.name())); + var records = consumer.poll(Duration.ofSeconds(100)); + assertThat(records).hasSize(2); + assertThat(records.records(topic1.name())) + .hasSize(1) + .map(ConsumerRecord::value).containsExactly(PLAINTEXT); + assertThat(records.records(topic2.name())) + .hasSize(1) + .map(ConsumerRecord::value).containsExactly(PLAINTEXT); } } diff --git a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ResilienceIT.java b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ResilienceIT.java index b3b92a6c1e..7340677c65 100644 --- a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ResilienceIT.java +++ b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ResilienceIT.java @@ -9,10 +9,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -27,6 +25,7 @@ import io.kroxylicious.testing.kafka.api.KafkaCluster; import io.kroxylicious.testing.kafka.common.BrokerCluster; import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; +import io.kroxylicious.testing.kafka.junit5ext.Topic; import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.proxy; import static io.kroxylicious.test.tester.KroxyliciousTesters.kroxyliciousTester; @@ -47,20 +46,16 @@ class ResilienceIT extends BaseIT { static @BrokerCluster(numBrokers = 3) KafkaCluster cluster; @Test - void kafkaProducerShouldTolerateKroxyliciousRestarting(Admin admin) throws Exception { - String randomTopic = UUID.randomUUID().toString(); - createTopic(admin, randomTopic, 1); + void kafkaProducerShouldTolerateKroxyliciousRestarting(Topic randomTopic) throws Exception { testProducerCanSurviveARestart(proxy(cluster), randomTopic); } @Test - void kafkaConsumerShouldTolerateKroxyliciousRestarting(Admin admin) throws Exception { - String randomTopic = UUID.randomUUID().toString(); - createTopic(admin, randomTopic, 1); + void kafkaConsumerShouldTolerateKroxyliciousRestarting(Topic randomTopic) throws Exception { testConsumerCanSurviveKroxyliciousRestart(proxy(cluster), randomTopic); } - private static void testConsumerCanSurviveKroxyliciousRestart(ConfigurationBuilder builder, String topic) + private static void testConsumerCanSurviveKroxyliciousRestart(ConfigurationBuilder builder, Topic randomTopic) throws Exception { var producerConfig = new HashMap(Map.of(CLIENT_ID_CONFIG, "producer", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000)); @@ -72,15 +67,13 @@ private static void testConsumerCanSurviveKroxyliciousRestart(ConfigurationBuild try (var tester = kroxyliciousTester(builder); var producer = tester.producer(producerConfig)) { consumer = tester.consumer(consumerConfig); - producer.send(new ProducerRecord<>(topic, "my-key", "Hello, world!")).get(10, TimeUnit.SECONDS); - consumer.subscribe(Set.of(topic)); + producer.send(new ProducerRecord<>(randomTopic.name(), "my-key", "Hello, world!")).get(10, TimeUnit.SECONDS); + consumer.subscribe(Set.of(randomTopic.name())); var firstRecords = consumer.poll(Duration.ofSeconds(10)); assertThat(firstRecords).hasSize(1); assertThat(firstRecords.iterator()).toIterable().map(ConsumerRecord::value).containsExactly("Hello, world!"); - assertThat(firstRecords.count()).isOne(); - assertThat(firstRecords.iterator().next().value()).isEqualTo("Hello, world!"); - producer.send(new ProducerRecord<>(topic, "my-key", "Hello, again!")).get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(randomTopic.name(), "my-key", "Hello, again!")).get(10, TimeUnit.SECONDS); LOGGER.debug("Restarting proxy"); producer.close(); @@ -93,7 +86,7 @@ private static void testConsumerCanSurviveKroxyliciousRestart(ConfigurationBuild } } - private void testProducerCanSurviveARestart(ConfigurationBuilder builder, String topic) throws Exception { + private void testProducerCanSurviveARestart(ConfigurationBuilder builder, Topic randomTopic) throws Exception { var producerConfig = new HashMap(Map.of(CLIENT_ID_CONFIG, "producer", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000, @@ -111,15 +104,14 @@ private void testProducerCanSurviveARestart(ConfigurationBuilder builder, String try (var tester = kroxyliciousTester(builder)) { producer = tester.producer(producerConfig); consumer = tester.consumer(consumerConfig); - consumer.subscribe(Set.of(topic)); - var response = producer.send(new ProducerRecord<>(topic, "my-key", "Hello, world!")).get(10, TimeUnit.SECONDS); - LOGGER.warn("response {}", response); + consumer.subscribe(Set.of(randomTopic.name())); + var response = producer.send(new ProducerRecord<>(randomTopic.name(), "my-key", "Hello, world!")).get(10, TimeUnit.SECONDS); LOGGER.debug("Restarting proxy"); tester.restartProxy(); // re-use the existing producer and consumer (made through Kroxylicious's first incarnation). This provides us the assurance // that they were able to reconnect successfully. - producer.send(new ProducerRecord<>(topic, "my-key", "Hello, again!")).get(10, TimeUnit.SECONDS); + producer.send(new ProducerRecord<>(randomTopic.name(), "my-key", "Hello, again!")).get(10, TimeUnit.SECONDS); producer.close(); var records = consumer.poll(Duration.ofSeconds(20)); consumer.close(); @@ -138,8 +130,6 @@ private void testProducerCanSurviveARestart(ConfigurationBuilder builder, String consumer.close(); } } - } } - } diff --git a/kroxylicious-sample/src/test/java/io/kroxylicious/sample/SampleFilterIT.java b/kroxylicious-sample/src/test/java/io/kroxylicious/sample/SampleFilterIT.java index 8077335ffc..aefa7f90d7 100644 --- a/kroxylicious-sample/src/test/java/io/kroxylicious/sample/SampleFilterIT.java +++ b/kroxylicious-sample/src/test/java/io/kroxylicious/sample/SampleFilterIT.java @@ -10,11 +10,8 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -32,6 +29,7 @@ import io.kroxylicious.testing.kafka.api.KafkaCluster; import io.kroxylicious.testing.kafka.common.BrokerCluster; import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension; +import io.kroxylicious.testing.kafka.junit5ext.Topic; import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.proxy; import static io.kroxylicious.test.tester.KroxyliciousTesters.kroxyliciousTester; @@ -57,12 +55,12 @@ class SampleFilterIT { // Configure Cluster/Producer/Consumer values here private static final Integer TIMEOUT_SECONDS = 10; - private static final Integer TOPIC_PARTITIONS = 1; - private static final Short TOPIC_REPLICATION = 1; @BrokerCluster KafkaCluster cluster; + Topic topic; // Injected by KafkaClusterExtension + FilterIntegrationTest test; @AfterEach @@ -74,10 +72,9 @@ public void afterEach() { * Test that the SampleProduceRequestFilter will transform when given data containing its findValue. */ @Test - void sampleProduceRequestFilterWillTransformIntegrationTest() { + void sampleProduceRequestFilterWillTransform() { test = new FilterIntegrationTest(SAMPLE_PRODUCE_REQUEST_FILTER); - test.withTopic("sampleProduceRequestFilterWillTransformRoundTripTest") - .produceMessage(PRE_TRANSFORM_VALUE) + test.produceMessage(PRE_TRANSFORM_VALUE) .consumeSingleRecord() .assertConsumerRecordEquals(PRODUCE_TRANSFORM_VALUE); } @@ -86,10 +83,9 @@ void sampleProduceRequestFilterWillTransformIntegrationTest() { * Test that the SampleProduceRequestFilter won't transform when given data that does not contain its findValue. */ @Test - void sampleProduceRequestFilterWontTransformIntegrationTest() { + void sampleProduceRequestFilterWontTransform() { test = new FilterIntegrationTest(SAMPLE_PRODUCE_REQUEST_FILTER); - test.withTopic("sampleProduceRequestFilterWontTransformRoundTripTest") - .produceMessage(NO_TRANSFORM_VALUE) + test.produceMessage(NO_TRANSFORM_VALUE) .consumeSingleRecord() .assertConsumerRecordEquals(NO_TRANSFORM_VALUE); } @@ -98,10 +94,9 @@ void sampleProduceRequestFilterWontTransformIntegrationTest() { * Test that the SampleProduceRequestFilter won't drop a second message produced to a topic. */ @Test - void sampleProduceRequestFilterWontDropSecondMessageIntegrationTest() { + void sampleProduceRequestFilterWontDropSecondMessage() { test = new FilterIntegrationTest(SAMPLE_PRODUCE_REQUEST_FILTER); - test.withTopic("sampleProduceRequestFilterWontDropSecondMessageRoundTripTest") - .produceMessage(NO_TRANSFORM_VALUE) + test.produceMessage(NO_TRANSFORM_VALUE) .consumeSingleRecord() .produceMessage(PRE_TRANSFORM_VALUE) .consumeSingleRecord() @@ -112,10 +107,9 @@ void sampleProduceRequestFilterWontDropSecondMessageIntegrationTest() { * Test that the SampleFetchResponseFilter will transform when given data containing its findValue. */ @Test - void sampleFetchResponseFilterWillTransformIntegrationTest() { + void sampleFetchResponseFilterWillTransform() { test = new FilterIntegrationTest(SAMPLE_FETCH_RESPONSE_FILTER); - test.withTopic("sampleFetchResponseFilterWillTransformRoundTripTest") - .produceMessage(PRE_TRANSFORM_VALUE) + test.produceMessage(PRE_TRANSFORM_VALUE) .consumeSingleRecord() .assertConsumerRecordEquals(FETCH_TRANSFORM_VALUE); } @@ -124,10 +118,9 @@ void sampleFetchResponseFilterWillTransformIntegrationTest() { * Test that the SampleFetchResponseFilter won't transform when given data that does not contain its findValue. */ @Test - void sampleFetchResponseFilterWontTransformIntegrationTest() { + void sampleFetchResponseFilterWontTransform() { test = new FilterIntegrationTest(SAMPLE_FETCH_RESPONSE_FILTER); - test.withTopic("sampleFetchResponseFilterWontTransformRoundTripTest") - .produceMessage(NO_TRANSFORM_VALUE) + test.produceMessage(NO_TRANSFORM_VALUE) .consumeSingleRecord() .assertConsumerRecordEquals(NO_TRANSFORM_VALUE); } @@ -136,10 +129,9 @@ void sampleFetchResponseFilterWontTransformIntegrationTest() { * Test that the SampleFetchResponseFilter won't drop a second message produced to a topic. */ @Test - void sampleFetchResponseFilterWontDropSecondMessageIntegrationTest() { + void sampleFetchResponseFilterWontDropSecondMessage() { test = new FilterIntegrationTest(SAMPLE_FETCH_RESPONSE_FILTER); - test.withTopic("sampleFetchResponseFilterWontDropSecondMessageRoundTripTest") - .produceMessage(NO_TRANSFORM_VALUE) + test.produceMessage(NO_TRANSFORM_VALUE) .consumeSingleRecord() .produceMessage(PRE_TRANSFORM_VALUE) .consumeSingleRecord() @@ -153,8 +145,6 @@ private class FilterIntegrationTest { private final KroxyliciousTester tester; private final Producer producer; private final Consumer consumer; - private final Admin admin; - private String topic; private ConsumerRecord record; /** @@ -170,23 +160,6 @@ private class FilterIntegrationTest { producer = tester.producer(); consumer = tester.consumer(Serdes.String(), Serdes.ByteArray(), Map.of(ConsumerConfig.GROUP_ID_CONFIG, "group-id-0", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")); - admin = tester.admin(); - } - - /** - * Create a topic with the given name for this test. - * @param name the name of the topic to create - * @return the SingleFilterIntegrationTest object (itself) - */ - FilterIntegrationTest withTopic(String name) { - try { - admin.createTopics(List.of(new NewTopic(name, TOPIC_PARTITIONS, TOPIC_REPLICATION))).all().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - } - catch (Exception e) { - throw new RuntimeException(e); - } - this.topic = name; - return this; } /** @@ -195,9 +168,8 @@ FilterIntegrationTest withTopic(String name) { * @return the SingleFilterIntegrationTest object (itself) */ FilterIntegrationTest produceMessage(String value) { - this.ensureTopicExists(); try { - this.producer.send(new ProducerRecord<>(this.topic, value)).get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + this.producer.send(new ProducerRecord<>(topic.name(), value)).get(TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Exception e) { throw new RuntimeException(e); @@ -210,15 +182,12 @@ FilterIntegrationTest produceMessage(String value) { * @return the SingleFilterIntegrationTest object (itself) */ FilterIntegrationTest consumeSingleRecord() { - if (this.topic == null) { - fail("Could not consumeSingleRecord - this test has no topic"); - } - this.consumer.subscribe(List.of(this.topic)); + this.consumer.subscribe(List.of(topic.name())); ConsumerRecords poll = this.consumer.poll(Duration.ofSeconds(TIMEOUT_SECONDS)); if (poll.count() == 0) { - fail(String.format("No records could be consumed from topic: %s.", this.topic)); + fail(String.format("No records could be consumed from topic: %s.", topic.name())); } - this.record = poll.records(this.topic).iterator().next(); + this.record = poll.records(topic.name()).iterator().next(); return this; } @@ -239,19 +208,10 @@ void assertConsumerRecordEquals(String value) { */ void close() { this.tester.close(); - this.admin.close(); this.producer.close(); this.consumer.close(); } - /** - * Creates a topic with a random UUID name if none exists for this test. - */ - private void ensureTopicExists() { - if (this.topic == null) { - this.withTopic(UUID.randomUUID().toString()); - } - } } private record TestFilter(String name, Map config) {}