Skip to content

Commit

Permalink
Merge pull request kroxylicious#910 from robobario/batch-aware-transform
Browse files Browse the repository at this point in the history
Envelope encryption preserve batches within MemoryRecords
  • Loading branch information
robobario authored Jan 23, 2024
2 parents 1e52156 + 2d25534 commit ba4fc98
Show file tree
Hide file tree
Showing 21 changed files with 1,769 additions and 398 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Please enumerate **all user-facing** changes using format `<githib issue/pr numb

## 0.5.0

* [#910](https://github.com/kroxylicious/kroxylicious/pull/910): Envelope encryption preserve batches within MemoryRecords
* [#883](https://github.com/kroxylicious/kroxylicious/pull/883): Ensure we only initialise a filter factory once.
* [#912](https://github.com/kroxylicious/kroxylicious/pull/912): Bump io.netty:netty-bom from 4.1.104.Final to 4.1.106.Final
* [#909](https://github.com/kroxylicious/kroxylicious/pull/909): [build] use maven maven-dependency-plugin to detect missing/superfluous dependencies at build time
Expand All @@ -18,6 +19,8 @@ Please enumerate **all user-facing** changes using format `<githib issue/pr numb
deprecated. Use `passwordFile` instead.
* As a result of the work of #909, some superfluous transitive dependencies have been removed from some kroxylicious. If you were relying on those, you will need to
adjust your dependencies as your adopt this release.
* `io.kroxylicious:kroxylicious-filter-test-support` now contains RecordTestUtils for creating example `Record`, `RecordBatch` and `MemoryRecords`. It also contains
assertj assertions for those same classes to enable us to write fluent assertions, accessible via `io.kroxylicious.test.assertj.KafkaAssertions`.

## 0.4.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public class HeaderAssert extends AbstractAssert<HeaderAssert, Header> {
protected HeaderAssert(Header header) {
super(header, HeaderAssert.class);
describedAs(header == null ? "null header" : "header");
}

public static HeaderAssert assertThat(Header actual) {
Expand All @@ -29,7 +30,8 @@ public HeaderAssert hasKeyEqualTo(String expected) {

public HeaderAssert hasValueEqualTo(String expected) {
isNotNull();
Assertions.assertThat(new String(actual.value()))
String valueString = actual.value() == null ? null : new String(actual.value());
Assertions.assertThat(valueString)
.describedAs("header value")
.isEqualTo(expected);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class MemoryRecordsAssert extends AbstractAssert<MemoryRecordsAssert, MemoryRecords> {
protected MemoryRecordsAssert(MemoryRecords memoryRecords) {
super(memoryRecords, MemoryRecordsAssert.class);
describedAs(memoryRecords == null ? "null memory records" : "memory records");
}

public static MemoryRecordsAssert assertThat(MemoryRecords actual) {
Expand All @@ -47,7 +48,7 @@ public Iterable<RecordBatchAssert> batches() {
isNotNull();
return () -> {
var it = actual.batches().iterator();
return new Iterator<RecordBatchAssert>() {
return new Iterator<>() {
@Override
public boolean hasNext() {
return it.hasNext();
Expand All @@ -62,17 +63,27 @@ public RecordBatchAssert next() {
}

public RecordBatchAssert firstBatch() {
isNotNull();
isNotEmpty();
return batchesIterable()
.first(new InstanceOfAssertFactory<>(RecordBatch.class, RecordBatchAssert::assertThat))
.describedAs("first batch");
}

public RecordBatchAssert lastBatch() {
isNotNull();
isNotEmpty();
return batchesIterable()
.last(new InstanceOfAssertFactory<>(RecordBatch.class, RecordBatchAssert::assertThat))
.describedAs("last batch");
}

private void isNotEmpty() {
Assertions.assertThat(actual.batches())
.describedAs("number of batches")
.hasSizeGreaterThan(0);
}

public MemoryRecordsAssert hasNumBatches(int expected) {
isNotNull();
Assertions.assertThat(actual.batches())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,31 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.Record;
import org.assertj.core.api.AbstractAssert;
import org.assertj.core.api.AbstractByteArrayAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractObjectAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectArrayAssert;

import io.kroxylicious.test.record.RecordTestUtils;

public class RecordAssert extends AbstractAssert<RecordAssert, Record> {

private static final String RECORD_VALUE_DESCRIPTION = "record value";
private static final String RECORD_KEY_DESCRIPTION = "record key";

protected RecordAssert(Record record) {
super(record, RecordAssert.class);
describedAs(record == null ? "null record" : "record");
}

public static RecordAssert assertThat(Record actual) {
return new RecordAssert(actual);
}

public RecordAssert hasOffsetEqualTo(int expect) {
isNotNull();
AbstractLongAssert<?> offset = offsetAssert();
offset.isEqualTo(expect);
return this;
Expand All @@ -38,6 +46,7 @@ private AbstractLongAssert<?> offsetAssert() {
}

public RecordAssert hasTimestampEqualTo(int expect) {
isNotNull();
AbstractLongAssert<?> timestamp = timestampAssert();
timestamp.isEqualTo(expect);
return this;
Expand All @@ -50,36 +59,65 @@ private AbstractLongAssert<?> timestampAssert() {
}

private AbstractObjectAssert<?, String> keyStrAssert() {
isNotNull();
return Assertions.assertThat(actual).extracting(RecordTestUtils::recordKeyAsString)
.describedAs("record key");
.describedAs(RECORD_KEY_DESCRIPTION);
}

public RecordAssert hasKeyEqualTo(String expect) {
isNotNull();
Assertions.assertThat(actual).extracting(RecordTestUtils::recordKeyAsString)
.describedAs("record key")
.describedAs(RECORD_KEY_DESCRIPTION)
.isEqualTo(expect);
return this;
}

public RecordAssert hasNullKey() {
keyStrAssert()
.isNull();
isNotNull();
keyStrAssert().isNull();
return this;
}

private AbstractObjectAssert<?, String> valueStrAssert() {
return Assertions.assertThat(actual).extracting(RecordTestUtils::recordValueAsString)
.describedAs("record value");
private AbstractStringAssert<?> valueStrAssert() {
isNotNull();
return Assertions.assertThat(RecordTestUtils.recordValueAsString(actual))
.describedAs(RECORD_VALUE_DESCRIPTION);
}

private AbstractByteArrayAssert<?> valueBytesAssert() {
isNotNull();
return Assertions.assertThat(RecordTestUtils.recordValueAsBytes(actual))
.describedAs(RECORD_VALUE_DESCRIPTION);
}

public RecordAssert hasValueEqualTo(String expect) {
isNotNull();
valueStrAssert().isEqualTo(expect);
return this;
}

public RecordAssert hasValueEqualTo(byte[] expect) {
isNotNull();
valueBytesAssert().isEqualTo(expect);
return this;
}

public RecordAssert hasValueNotEqualTo(String notExpected) {
isNotNull();
valueStrAssert().isNotEqualTo(notExpected);
return this;
}

public RecordAssert hasValueEqualTo(Record expected) {
isNotNull();
hasValueEqualTo(RecordTestUtils.recordValueAsBytes(expected));
return this;
}

public RecordAssert hasNullValue() {
isNotNull();
Assertions.assertThat(actual).extracting(RecordTestUtils::recordValueAsString)
.describedAs("record value")
.describedAs(RECORD_VALUE_DESCRIPTION)
.isNull();
return this;
}
Expand All @@ -91,33 +129,39 @@ public ObjectArrayAssert<Header> headersAssert() {
}

public RecordAssert hasEmptyHeaders() {
isNotNull();
headersAssert().isEmpty();
return this;
}

public HeaderAssert singleHeader() {
isNotNull();
headersAssert().singleElement();
return HeaderAssert.assertThat(actual.headers()[0])
.describedAs("record header");
}

public RecordAssert hasHeadersSize(int expect) {
isNotNull();
headersAssert().hasSize(expect);
return this;
}

public RecordAssert containsHeaderWithKey(String expectedKey) {
isNotNull();
headersAssert().anyMatch(h -> h.key().equals(expectedKey));
return this;
}

public HeaderAssert firstHeader() {
isNotNull();
headersAssert().isNotEmpty();
return HeaderAssert.assertThat(actual.headers()[0])
.describedAs("first record header");
}

public HeaderAssert lastHeader() {
isNotNull();
headersAssert().isNotEmpty();
return HeaderAssert.assertThat(actual.headers()[actual.headers().length - 1])
.describedAs("last record header");
Expand Down
Loading

0 comments on commit ba4fc98

Please sign in to comment.