Skip to content

Commit

Permalink
Tease out simple transform filters into their own module (kroxyliciou…
Browse files Browse the repository at this point in the history
…s#727)

* Tease out kroxylicious-simple-transform to separate module moving the filters from io.kroxylicious.proxy.internal.filter to io.kroxylicious.proxy.filter.simpletransform;

Signed-off-by: kwall <[email protected]>
  • Loading branch information
k-wall authored Nov 28, 2023
1 parent e84adc3 commit 72bd070
Show file tree
Hide file tree
Showing 43 changed files with 594 additions and 128 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ Please enumerate **all user-facing** changes using format `<githib issue/pr numb

## 0.4.0


* [#727](https://github.com/kroxylicious/kroxylicious/pull/727): Tease out simple transform filters into their own module
* [#738](https://github.com/kroxylicious/kroxylicious/pull/738): Update to Kroxylicious Junit Ext 0.7.0
* [#723](https://github.com/kroxylicious/kroxylicious/pull/723): Bump com.fasterxml.jackson:jackson-bom from 2.15.3 to 2.16.0 #723
* [#724](https://github.com/kroxylicious/kroxylicious/pull/724): Bump io.netty.incubator:netty-incubator-transport-native-io_uring from 0.0.23.Final to 0.0.24.Final
* [#725](https://github.com/kroxylicious/kroxylicious/pull/725): Bump io.netty:netty-bom from 4.1.100.Final to 4.1.101.Final #725
* [#701](https://github.com/kroxylicious/kroxylicious/pull/701): Bump org.apache.logging.log4j:log4j-bom from 2.21.0 to 2.21.1 #701

### Changes, deprecations and removals

* The `ProduceRequestTransformationFilter` and `FetchResponseTransformationFilter` have been moved to their own module kroxylicious-simple-transform.
If you were depending on these filters, you must ensure that the kroxylicious-simple-transform JAR file is added to your classpath. The
Javadoc of these classes has been updated to convey the fact that these filters are *not* intended for production use.

## 0.3.0

* [#686](https://github.com/kroxylicious/kroxylicious/pull/686): Bump org.apache.logging.log4j:log4j-bom from 2.20.0 to 2.21.0.
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ FROM registry.access.redhat.com/ubi9/openjdk-17:1.15 AS builder
USER root
WORKDIR /opt/kroxylicious
COPY . .
RUN ./mvnw -B clean verify -Pdist,withAdditionalFilters -Dquick -pl :kroxylicious-bom,:kroxylicious-app,:kroxylicious-multitenant,:kroxylicious-record-validation -am
RUN ./mvnw -B clean verify -Pdist,withAdditionalFilters -Dquick -am -pl !kroxylicious-integration-tests
USER 185
FROM registry.access.redhat.com/ubi9/ubi-minimal:9.2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public interface FilterFactory<C, I> {
* This method may provide extra semantic validation of the config,
* and returns some object (which may be the config, or some other object) which will be passed to {@link #createFilter(FilterFactoryContext, Object)}
*
* @param context
* @param context context
* @param config configuration
* @throws PluginConfigurationException when the configuration is invalid
*/
Expand Down
17 changes: 17 additions & 0 deletions kroxylicious-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,5 +195,22 @@
</plugins>
</build>
</profile>
<profile>
<id>withAdditionalFilters</id>
<dependencies>
<dependency>
<groupId>io.kroxylicious</groupId>
<artifactId>kroxylicious-multitenant</artifactId>
</dependency>
<dependency>
<groupId>io.kroxylicious</groupId>
<artifactId>kroxylicious-record-validation</artifactId>
</dependency>
<dependency>
<groupId>io.kroxylicious</groupId>
<artifactId>kroxylicious-simple-transform</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
6 changes: 6 additions & 0 deletions kroxylicious-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.kroxylicious</groupId>
<artifactId>kroxylicious-simple-transform</artifactId>
<version>${project.version}</version>
</dependency>

<!-- testing dependencies -->
<!-- note scope is *not* set at this level -->
<dependency>
Expand Down
70 changes: 70 additions & 0 deletions kroxylicious-filters/kroxylicious-simple-transform/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright Kroxylicious Authors.
Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.kroxylicious</groupId>
<artifactId>kroxylicious-parent</artifactId>
<version>0.4.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>kroxylicious-simple-transform</artifactId>
<name>Simple filters</name>
<description>Several simple filters that exist for test purposes.</description>

<dependencies>
<!-- project dependencies - runtime and compile -->
<dependency>
<groupId>io.kroxylicious</groupId>
<artifactId>kroxylicious-api</artifactId>
</dependency>

<!-- project dependencies - test -->
<dependency>
<groupId>io.kroxylicious</groupId>
<artifactId>kroxylicious-filter-test-support</artifactId>
<scope>test</scope>
</dependency>

<!-- third party dependencies - test -->
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.kroxylicious.proxy.internal.filter;
package io.kroxylicious.proxy.filter.simpletransform;

import java.nio.ByteBuffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.proxy.internal.filter;
package io.kroxylicious.proxy.filter.simpletransform;

import io.kroxylicious.proxy.plugin.PluginConfigurationException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.proxy.internal.filter;
package io.kroxylicious.proxy.filter.simpletransform;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
Expand All @@ -24,18 +23,20 @@
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.kroxylicious.proxy.filter.FetchResponseFilter;
import io.kroxylicious.proxy.filter.FilterContext;
import io.kroxylicious.proxy.filter.ResponseFilterResult;
import io.kroxylicious.proxy.internal.util.MemoryRecordsHelper;

/**
* A filter for modifying the key/value/header/topic of {@link ApiKeys#FETCH} responses.
*/
* <p>
* <strong>Not intended to production use.</strong>
* </p> */
public class FetchResponseTransformationFilter implements FetchResponseFilter {

// Version 12 was the first version that uses topic ids.
Expand Down Expand Up @@ -93,18 +94,21 @@ public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion, R
private void applyTransformation(FilterContext context, FetchResponseData responseData) {
for (FetchResponseData.FetchableTopicResponse topicData : responseData.responses()) {
for (FetchResponseData.PartitionData partitionData : topicData.partitions()) {
MemoryRecords records = (MemoryRecords) partitionData.records();
MemoryRecordsBuilder newRecords = MemoryRecordsHelper.builder(context.createByteBufferOutputStream(records.sizeInBytes()), CompressionType.NONE,
TimestampType.CREATE_TIME, 0);

for (MutableRecordBatch batch : records.batches()) {
for (Iterator<Record> batchRecords = batch.iterator(); batchRecords.hasNext();) {
Record batchRecord = batchRecords.next();
newRecords.append(batchRecord.timestamp(), batchRecord.key(), valueTransformation.transform(topicData.topic(), batchRecord.value()));
var records = (MemoryRecords) partitionData.records();
var stream = context.createByteBufferOutputStream(records.sizeInBytes());
try (var newRecords = new MemoryRecordsBuilder(stream, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0,
System.currentTimeMillis(), RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
stream.remaining())) {

for (MutableRecordBatch batch : records.batches()) {
for (Record batchRecord : batch) {
newRecords.append(batchRecord.timestamp(), batchRecord.key(), valueTransformation.transform(topicData.topic(), batchRecord.value()));
}
}
}

partitionData.setRecords(newRecords.build());
partitionData.setRecords(newRecords.build());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.proxy.internal.filter;
package io.kroxylicious.proxy.filter.simpletransform;

import java.util.Objects;

import com.fasterxml.jackson.annotation.JsonProperty;

import io.kroxylicious.proxy.filter.FilterFactory;
import io.kroxylicious.proxy.filter.FilterFactoryContext;
import io.kroxylicious.proxy.internal.filter.FetchResponseTransformationFilterFactory.Config;
import io.kroxylicious.proxy.filter.simpletransform.FetchResponseTransformationFilterFactory.Config;
import io.kroxylicious.proxy.plugin.Plugin;
import io.kroxylicious.proxy.plugin.PluginImplConfig;
import io.kroxylicious.proxy.plugin.PluginImplName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.proxy.internal.filter;
package io.kroxylicious.proxy.filter.simpletransform;

import java.util.Iterator;
import java.util.concurrent.CompletionStage;

import org.apache.kafka.common.message.ProduceRequestData;
Expand All @@ -17,15 +16,18 @@
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;

import io.kroxylicious.proxy.filter.FilterContext;
import io.kroxylicious.proxy.filter.ProduceRequestFilter;
import io.kroxylicious.proxy.filter.RequestFilterResult;
import io.kroxylicious.proxy.internal.util.MemoryRecordsHelper;

/**
* A filter for modifying the key/value/header/topic of {@link ApiKeys#PRODUCE} requests.
* <p>
* <strong>Not intended to production use.</strong>
* </p>
*/
public class ProduceRequestTransformationFilter implements ProduceRequestFilter {

Expand All @@ -50,17 +52,20 @@ private void applyTransformation(FilterContext ctx, ProduceRequestData req) {
req.topicData().forEach(topicData -> {
for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) {
MemoryRecords records = (MemoryRecords) partitionData.records();
MemoryRecordsBuilder newRecords = MemoryRecordsHelper.builder(ctx.createByteBufferOutputStream(records.sizeInBytes()), CompressionType.NONE,
TimestampType.CREATE_TIME, 0);
var stream = ctx.createByteBufferOutputStream(records.sizeInBytes());
try (var newRecords = new MemoryRecordsBuilder(stream, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0,
System.currentTimeMillis(), RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
stream.remaining())) {

for (MutableRecordBatch batch : records.batches()) {
for (Iterator<Record> batchRecords = batch.iterator(); batchRecords.hasNext();) {
Record batchRecord = batchRecords.next();
newRecords.append(batchRecord.timestamp(), batchRecord.key(), valueTransformation.transform(topicData.name(), batchRecord.value()));
for (MutableRecordBatch batch : records.batches()) {
for (Record batchRecord : batch) {
newRecords.append(batchRecord.timestamp(), batchRecord.key(), valueTransformation.transform(topicData.name(), batchRecord.value()));
}
}
}

partitionData.setRecords(newRecords.build());
partitionData.setRecords(newRecords.build());
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.proxy.internal.filter;
package io.kroxylicious.proxy.filter.simpletransform;

import com.fasterxml.jackson.annotation.JsonProperty;

import io.kroxylicious.proxy.filter.FilterFactory;
import io.kroxylicious.proxy.filter.FilterFactoryContext;
import io.kroxylicious.proxy.internal.filter.ProduceRequestTransformationFilterFactory.Config;
import io.kroxylicious.proxy.filter.simpletransform.ProduceRequestTransformationFilterFactory.Config;
import io.kroxylicious.proxy.plugin.Plugin;
import io.kroxylicious.proxy.plugin.PluginImplConfig;
import io.kroxylicious.proxy.plugin.PluginImplName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.proxy.internal.filter;
package io.kroxylicious.proxy.filter.simpletransform;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
io.kroxylicious.proxy.filter.simpletransform.ProduceRequestTransformationFilterFactory
io.kroxylicious.proxy.filter.simpletransform.FetchResponseTransformationFilterFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright Kroxylicious Authors.
#
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
#

io.kroxylicious.proxy.filter.simpletransform.UpperCasing
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.proxy.internal.filter;
package io.kroxylicious.proxy.filter.simpletransform;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -38,7 +38,6 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;

Expand All @@ -47,7 +46,6 @@
import io.kroxylicious.proxy.filter.ResponseFilterResult;
import io.kroxylicious.proxy.filter.ResponseFilterResultBuilder;
import io.kroxylicious.proxy.filter.filterresultbuilder.CloseOrTerminalStage;
import io.kroxylicious.proxy.internal.filter.FetchResponseTransformationFilterFactory.Config;
import io.kroxylicious.proxy.plugin.PluginConfigurationException;

import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -57,6 +55,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -116,9 +115,9 @@ void testFactory() {
FetchResponseTransformationFilterFactory factory = new FetchResponseTransformationFilterFactory();
assertThatThrownBy(() -> factory.initialize(null, null)).isInstanceOf(PluginConfigurationException.class)
.hasMessage(FetchResponseTransformationFilterFactory.class.getSimpleName() + " requires configuration, but config object is null");
FilterFactoryContext constructContext = Mockito.mock(FilterFactoryContext.class);
FilterFactoryContext constructContext = mock(FilterFactoryContext.class);
doReturn(new UpperCasing()).when(constructContext).pluginInstance(any(), any());
Config config = new Config(UpperCasing.class.getName(),
FetchResponseTransformationFilterFactory.Config config = new FetchResponseTransformationFilterFactory.Config(UpperCasing.class.getName(),
new UpperCasing.Config("UTF-8"));
assertThat(factory.createFilter(constructContext, config)).isInstanceOf(FetchResponseTransformationFilter.class);
}
Expand Down
Loading

0 comments on commit 72bd070

Please sign in to comment.