Skip to content

Commit

Permalink
refactor: take advantage of the topic injection in several integratio…
Browse files Browse the repository at this point in the history
…n tests (kroxylicious#837)

* refactor: take advantage of the topic injection in several integration tests

used the framework feature to create the test's topic
also made use of assertj consistent in several places too.

why: reducing bolier-plate
Signed-off-by: kwall <[email protected]>
  • Loading branch information
k-wall authored Dec 21, 2023
1 parent 859ca83 commit d8aa2ab
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 165 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Please enumerate **all user-facing** changes using format `<githib issue/pr numb

## 0.5.0


* [#837](https://github.com/kroxylicious/kroxylicious/pull/837): refactor: take advantage of the topic injection in several integration tests including (the SampleFilterIT)
* [#827](https://github.com/kroxylicious/kroxylicious/issues/827): Release process should update version number references in container image versions too
* [#825](https://github.com/kroxylicious/kroxylicious/pull/825): Improve the topic encryption example
* [#832](https://github.com/kroxylicious/kroxylicious/pull/832): Bump io.netty:netty-bom from 4.1.101.Final to 4.1.104.Final
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void exposesSingleUpstreamClusterOverTls(String name,

var connectionsMetric = admin.metrics().entrySet().stream().filter(metricNameEntry -> "connections".equals(metricNameEntry.getKey().name()))
.findFirst();
assertThat(connectionsMetric.isPresent()).isTrue();
assertThat(connectionsMetric).isPresent();
var protocol = connectionsMetric.get().getKey().tags().get("protocol");
assertThat(protocol).startsWith("TLS");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.message.ApiVersionsRequestData;
Expand Down Expand Up @@ -64,6 +64,7 @@
import io.kroxylicious.test.tester.MockServerKroxyliciousTester;
import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension;
import io.kroxylicious.testing.kafka.junit5ext.Topic;

import static io.kroxylicious.UnknownTaggedFields.unknownTaggedFieldsToStrings;
import static io.kroxylicious.proxy.filter.ApiVersionsMarkingFilter.INTERSECTED_API_VERSION_RANGE_TAG;
Expand All @@ -87,32 +88,17 @@
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

@ExtendWith(KafkaClusterExtension.class)
class FilterIT {

private static final String TOPIC_1 = "my-test-topic";
private static final String TOPIC_2 = "other-test-topic";
private static final String PLAINTEXT = "Hello, world!";
private static final byte[] TOPIC_1_CIPHERTEXT = { (byte) 0x3d, (byte) 0x5a, (byte) 0x61, (byte) 0x61, (byte) 0x64, (byte) 0x21, (byte) 0x15, (byte) 0x6c,
(byte) 0x64, (byte) 0x67, (byte) 0x61, (byte) 0x59, (byte) 0x16 };
private static final byte[] TOPIC_2_CIPHERTEXT = { (byte) 0xffffffa7, (byte) 0xffffffc4, (byte) 0xffffffcb, (byte) 0xffffffcb, (byte) 0xffffffce, (byte) 0xffffff8b,
(byte) 0x7f, (byte) 0xffffffd6, (byte) 0xffffffce, (byte) 0xffffffd1, (byte) 0xffffffcb, (byte) 0xffffffc3, (byte) 0xffffff80 };
private static final FilterDefinitionBuilder REJECTING_CREATE_TOPIC_FILTER = new FilterDefinitionBuilder(RejectingCreateTopicFilterFactory.class.getName());
private static NettyLeakLogAppender appender;

@BeforeAll
public static void checkReversibleEncryption() {
// The precise details of the cipher don't matter
// What matters is that it the ciphertext key depends on the topic name
// and that decode() is the inverse of encode()
assertArrayEquals(TOPIC_1_CIPHERTEXT, encode(TOPIC_1, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))).array());
assertEquals(PLAINTEXT, new String(decode(TOPIC_1, ByteBuffer.wrap(TOPIC_1_CIPHERTEXT)).array(), StandardCharsets.UTF_8));
assertArrayEquals(TOPIC_2_CIPHERTEXT, encode(TOPIC_2, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))).array());
assertEquals(PLAINTEXT, new String(decode(TOPIC_2, ByteBuffer.wrap(TOPIC_2_CIPHERTEXT)).array(), StandardCharsets.UTF_8));
public static void beforeAll() {

final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
final Configuration config = ctx.getConfiguration();
Expand All @@ -129,6 +115,30 @@ public void checkNoNettyLeaks() {
appender.verifyNoLeaks();
}

@Test
void reversibleEncryption() {
// The precise details of the cipher don't matter
// What matters is that it the ciphertext key depends on the topic name
// and that decode() is the inverse of encode()
var name = UUID.randomUUID().toString();
var encoded = encode(name, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8)));
var decoded = new String(decode(name, encoded).array(), StandardCharsets.UTF_8);
assertThat(decoded)
.isEqualTo(PLAINTEXT);
}

@Test
void encryptionDistinguishedByName() {
var name = UUID.randomUUID().toString();
var differentName = UUID.randomUUID().toString();
assertThat(differentName).isNotEqualTo(name);

var encoded = encode(name, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8)));

assertThat(encoded)
.isNotEqualTo(encode(differentName, ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))));
}

static ByteBuffer encode(String topicName, ByteBuffer in) {
var out = ByteBuffer.allocate(in.limit());
byte rot = (byte) (topicName.hashCode() % Byte.MAX_VALUE);
Expand All @@ -153,18 +163,22 @@ static ByteBuffer decode(String topicName, ByteBuffer in) {
}

@Test
void shouldPassThroughRecordUnchanged(KafkaCluster cluster, Admin admin) throws Exception {
admin.createTopics(List.of(new NewTopic(TOPIC_1, 1, (short) 1))).all().get();
void shouldPassThroughRecordUnchanged(KafkaCluster cluster, Topic topic) throws Exception {

try (var tester = kroxyliciousTester(proxy(cluster));
var producer = tester.producer(Map.of(CLIENT_ID_CONFIG, "shouldPassThroughRecordUnchanged", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000));
var consumer = tester.consumer()) {
producer.send(new ProducerRecord<>(TOPIC_1, "my-key", "Hello, world!")).get();
consumer.subscribe(Set.of(TOPIC_1));
producer.send(new ProducerRecord<>(topic.name(), "my-key", "Hello, world!")).get();
consumer.subscribe(Set.of(topic.name()));
var records = consumer.poll(Duration.ofSeconds(10));
consumer.close();
assertEquals(1, records.count());
assertEquals("Hello, world!", records.iterator().next().value());

assertThat(records.iterator())
.toIterable()
.hasSize(1)
.map(ConsumerRecord::value)
.containsExactly(PLAINTEXT);

}
}

Expand All @@ -174,13 +188,14 @@ void requestFiltersCanRespondWithoutProxying(KafkaCluster cluster, Admin admin)
var config = proxy(cluster)
.addToFilters(REJECTING_CREATE_TOPIC_FILTER.build());

var topicName = UUID.randomUUID().toString();
try (var tester = kroxyliciousTester(config);
var proxyAdmin = tester.admin()) {
assertCreatingTopicThrowsExpectedException(proxyAdmin);
assertCreatingTopicThrowsExpectedException(proxyAdmin, topicName);

// check no topic created on the cluster
Set<String> names = admin.listTopics().names().get(10, TimeUnit.SECONDS);
assertThat(names).doesNotContain(TOPIC_1);
assertThat(names).doesNotContain(topicName);
}
}

Expand Down Expand Up @@ -306,26 +321,27 @@ void requestFiltersCanRespondWithoutProxyingDoesntLeakBuffers(KafkaCluster clust
var config = proxy(cluster)
.addToFilters(REJECTING_CREATE_TOPIC_FILTER.build());

var name = UUID.randomUUID().toString();
try (var tester = kroxyliciousTester(config);
var proxyAdmin = tester.admin()) {
// loop because System.gc doesn't make any guarantees that the buffer will be collected
for (int i = 0; i < 20; i++) {
// CreateTopicRejectFilter allocates a buffer and then short-circuit responds
assertCreatingTopicThrowsExpectedException(proxyAdmin);
assertCreatingTopicThrowsExpectedException(proxyAdmin, name);
// buffers must be garbage collected before it causes leak detection during
// a subsequent buffer allocation
System.gc();
}

// check no topic created on the cluster
Set<String> names = admin.listTopics().names().get(10, TimeUnit.SECONDS);
assertThat(names).doesNotContain(TOPIC_1);
assertThat(names).doesNotContain(name);
}
}

private static void assertCreatingTopicThrowsExpectedException(Admin proxyAdmin) {
private static void assertCreatingTopicThrowsExpectedException(Admin proxyAdmin, String topicName) {
assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(() -> proxyAdmin.createTopics(List.of(new NewTopic(TOPIC_1, 1, (short) 1))).all().get())
.isThrownBy(() -> proxyAdmin.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get())
.withCauseInstanceOf(InvalidTopicException.class)
.havingCause()
.withMessage(RejectingCreateTopicFilter.ERROR_MESSAGE);
Expand All @@ -339,7 +355,7 @@ void testCompositeFilter() {
var kafkaClient = tester.simpleTestClient()) {
tester.addMockResponseForApiKey(new ResponsePayload(METADATA, METADATA.latestVersion(), new MetadataResponseData()));
kafkaClient.getSync(new Request(METADATA, METADATA.latestVersion(), "client", new MetadataRequestData()));
assertEquals("123banana", tester.getOnlyRequest().clientIdHeader());
assertThat(tester.getOnlyRequest().clientIdHeader()).isEqualTo("123banana");
}
}

Expand All @@ -363,10 +379,11 @@ void testApiVersionsAvailableToFilter() {
}

@Test
void shouldModifyProduceMessage(KafkaCluster cluster, Admin admin) throws Exception {
admin.createTopics(List.of(
new NewTopic(TOPIC_1, 1, (short) 1),
new NewTopic(TOPIC_2, 1, (short) 1))).all().get();
void shouldModifyProduceMessage(KafkaCluster cluster, Topic topic1, Topic topic2) throws Exception {

var bytes = PLAINTEXT.getBytes(StandardCharsets.UTF_8);
var expectedEncoded1 = encode(topic1.name(), ByteBuffer.wrap(bytes)).array();
var expectedEncoded2 = encode(topic2.name(), ByteBuffer.wrap(bytes)).array();

var config = proxy(cluster)
.addToFilters(new FilterDefinitionBuilder(ProduceRequestTransformationFilterFactory.class.getName())
Expand All @@ -376,21 +393,23 @@ void shouldModifyProduceMessage(KafkaCluster cluster, Admin admin) throws Except
var producer = tester.producer(Map.of(CLIENT_ID_CONFIG, "shouldModifyProduceMessage", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000));
var consumer = tester
.consumer(Serdes.String(), Serdes.ByteArray(), Map.of(GROUP_ID_CONFIG, "my-group-id", AUTO_OFFSET_RESET_CONFIG, "earliest"))) {
producer.send(new ProducerRecord<>(TOPIC_1, "my-key", PLAINTEXT)).get();
producer.send(new ProducerRecord<>(TOPIC_2, "my-key", PLAINTEXT)).get();
producer.send(new ProducerRecord<>(topic1.name(), "my-key", PLAINTEXT)).get();
producer.send(new ProducerRecord<>(topic2.name(), "my-key", PLAINTEXT)).get();
producer.flush();

ConsumerRecords<String, byte[]> records1;
ConsumerRecords<String, byte[]> records2;
consumer.subscribe(Set.of(TOPIC_1));
records1 = consumer.poll(Duration.ofSeconds(10));
consumer.subscribe(Set.of(TOPIC_2));
records2 = consumer.poll(Duration.ofSeconds(10));

assertEquals(1, records1.count());
assertArrayEquals(TOPIC_1_CIPHERTEXT, records1.iterator().next().value());
assertEquals(1, records2.count());
assertArrayEquals(TOPIC_2_CIPHERTEXT, records2.iterator().next().value());
consumer.subscribe(Set.of(topic1.name(), topic2.name()));
var records = consumer.poll(Duration.ofSeconds(10));

assertThat(records).hasSize(2);
assertThat(records.records(topic1.name()))
.hasSize(1)
.map(ConsumerRecord::value)
.containsExactly(expectedEncoded1);

assertThat(records.records(topic2.name()))
.hasSize(1)
.map(ConsumerRecord::value)
.containsExactly(expectedEncoded2);
}
}

Expand Down Expand Up @@ -461,57 +480,60 @@ void zeroAckProduceRequestsDoNotInterfereWithResponseReorderingLogic() throws Ex
// zero-ack produce requests require special handling because they have no response associated
// this checks that Kroxy can handle the basics of forwarding them.
@Test
void shouldModifyZeroAckProduceMessage(KafkaCluster cluster, Admin admin) throws Exception {
admin.createTopics(List.of(new NewTopic(TOPIC_1, 1, (short) 1))).all().get();

void shouldModifyZeroAckProduceMessage(KafkaCluster cluster, Topic topic) throws Exception {
var config = proxy(cluster)
.addToFilters(new FilterDefinitionBuilder(ProduceRequestTransformationFilterFactory.class.getName())
.withConfig("transformation", TestEncoderFactory.class.getName()).build());

var expectedEncoded = encode(topic.name(), ByteBuffer.wrap(PLAINTEXT.getBytes(StandardCharsets.UTF_8))).array();

try (var tester = kroxyliciousTester(config);
var producer = tester.producer(Map.of(CLIENT_ID_CONFIG, "shouldModifyProduceMessage", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000, ACKS_CONFIG, "0"));
var consumer = tester
.consumer(Serdes.String(), Serdes.ByteArray(), Map.of(GROUP_ID_CONFIG, "my-group-id", AUTO_OFFSET_RESET_CONFIG, "earliest"))) {
producer.send(new ProducerRecord<>(TOPIC_1, "my-key", PLAINTEXT)).get();
producer.send(new ProducerRecord<>(topic.name(), "my-key", PLAINTEXT)).get();
producer.flush();

ConsumerRecords<String, byte[]> records1;
consumer.subscribe(Set.of(TOPIC_1));
records1 = consumer.poll(Duration.ofSeconds(10));
consumer.subscribe(Set.of(topic.name()));
var records = consumer.poll(Duration.ofSeconds(10));

assertThat(records1).hasSize(1);
assertThat(records1.records(TOPIC_1)).map(ConsumerRecord::value)
.containsExactly(TOPIC_1_CIPHERTEXT);
assertThat(records.iterator())
.toIterable()
.hasSize(1)
.map(ConsumerRecord::value)
.containsExactly(expectedEncoded);
}
}

@Test
void shouldForwardUnfilteredZeroAckProduceMessage(KafkaCluster cluster, Admin admin) throws Exception {
admin.createTopics(List.of(new NewTopic(TOPIC_1, 1, (short) 1))).all().get();
void shouldForwardUnfilteredZeroAckProduceMessage(KafkaCluster cluster, Topic topic) throws Exception {

var config = proxy(cluster);

try (var tester = kroxyliciousTester(config);
var producer = tester.producer(Map.of(CLIENT_ID_CONFIG, "shouldModifyProduceMessage", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000, ACKS_CONFIG, "0"));
var consumer = tester
.consumer(Serdes.String(), Serdes.String(), Map.of(GROUP_ID_CONFIG, "my-group-id", AUTO_OFFSET_RESET_CONFIG, "earliest"))) {
producer.send(new ProducerRecord<>(TOPIC_1, "my-key", PLAINTEXT)).get();
producer.send(new ProducerRecord<>(topic.name(), "my-key", PLAINTEXT)).get();
producer.flush();

consumer.subscribe(Set.of(TOPIC_1));
var records1 = consumer.poll(Duration.ofSeconds(10));
consumer.subscribe(Set.of(topic.name()));
var records = consumer.poll(Duration.ofSeconds(10));

assertEquals(1, records1.count());
assertEquals(PLAINTEXT, records1.iterator().next().value());
assertThat(records).hasSize(1);
assertThat(records.iterator())
.toIterable()
.map(ConsumerRecord::value)
.containsExactly(PLAINTEXT);
}
}

@Test
void shouldModifyFetchMessage(KafkaCluster cluster, Admin admin) throws Exception {
void shouldModifyFetchMessage(KafkaCluster cluster, Topic topic1, Topic topic2) throws Exception {

admin.createTopics(List.of(
new NewTopic(TOPIC_1, 1, (short) 1),
new NewTopic(TOPIC_2, 1, (short) 1))).all().get();
var bytes = PLAINTEXT.getBytes(StandardCharsets.UTF_8);
var encoded1 = encode(topic1.name(), ByteBuffer.wrap(bytes)).array();
var encoded2 = encode(topic2.name(), ByteBuffer.wrap(bytes)).array();

var config = proxy(cluster)
.addToFilters(new FilterDefinitionBuilder(FetchResponseTransformationFilterFactory.class.getName())
Expand All @@ -522,22 +544,18 @@ void shouldModifyFetchMessage(KafkaCluster cluster, Admin admin) throws Exceptio
Map.of(CLIENT_ID_CONFIG, "shouldModifyFetchMessage", DELIVERY_TIMEOUT_MS_CONFIG, 3_600_000));
var consumer = tester.consumer()) {

producer.send(new ProducerRecord<>(TOPIC_1, "my-key", TOPIC_1_CIPHERTEXT)).get();
producer.send(new ProducerRecord<>(TOPIC_2, "my-key", TOPIC_2_CIPHERTEXT)).get();
ConsumerRecords<String, String> records1;
ConsumerRecords<String, String> records2;
consumer.subscribe(Set.of(TOPIC_1));

records1 = consumer.poll(Duration.ofSeconds(100));

consumer.subscribe(Set.of(TOPIC_2));

records2 = consumer.poll(Duration.ofSeconds(100));
assertEquals(1, records1.count());
assertEquals(1, records2.count());
assertEquals(List.of(PLAINTEXT, PLAINTEXT),
List.of(records1.iterator().next().value(),
records2.iterator().next().value()));
producer.send(new ProducerRecord<>(topic1.name(), "my-key", encoded1)).get();
producer.send(new ProducerRecord<>(topic2.name(), "my-key", encoded2)).get();

consumer.subscribe(Set.of(topic1.name(), topic2.name()));
var records = consumer.poll(Duration.ofSeconds(100));
assertThat(records).hasSize(2);
assertThat(records.records(topic1.name()))
.hasSize(1)
.map(ConsumerRecord::value).containsExactly(PLAINTEXT);
assertThat(records.records(topic2.name()))
.hasSize(1)
.map(ConsumerRecord::value).containsExactly(PLAINTEXT);
}
}

Expand Down
Loading

0 comments on commit d8aa2ab

Please sign in to comment.