Skip to content

Commit

Permalink
fix: kafka header extraction expression in composite zilla.yaml (#1362)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitk-me authored Jan 7, 2025
1 parent 811415b commit 760e698
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2021-2024 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.config;

public class KafkaTopicHeaderConfig
{
public final String name;
public final String path;

public KafkaTopicHeaderConfig(
String name,
String path)
{
this.name = name;
this.path = path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class KafkaTopicTransformsConfig
{
public final String extractKey;

public final List<KafkaTopicHeaderType> extractHeaders;
public final List<KafkaTopicHeaderConfig> extractHeaders;

public static KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> builder()
{
Expand All @@ -37,7 +37,7 @@ public static <T> KafkaTopicTransformsConfigBuilder<T> builder(

KafkaTopicTransformsConfig(
String extractKey,
List<KafkaTopicHeaderType> extractHeaders)
List<KafkaTopicHeaderConfig> extractHeaders)
{
this.extractKey = extractKey;
this.extractHeaders = extractHeaders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,20 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;

public final class KafkaTopicTransformsConfigBuilder<T> extends ConfigBuilder<T, KafkaTopicTransformsConfigBuilder<T>>
{
private static final String PATH = "^\\$\\{message\\.(key|value)\\.([A-Za-z_][A-Za-z0-9_]*)\\}$";
private static final Pattern PATH_PATTERN = Pattern.compile(PATH);
private static final String INTERNAL_VALUE = "$.%s";
private static final String INTERNAL_PATH = "^\\$\\..*$";
private static final Pattern INTERNAL_PATH_PATTERN = Pattern.compile(INTERNAL_PATH);

private final Matcher matcher;
private final Matcher internalMatcher;
private final Function<KafkaTopicTransformsConfig, T> mapper;
private String extractKey;
private List<KafkaTopicHeaderType> extractHeaders;
private List<KafkaTopicHeaderConfig> extractHeaders;

KafkaTopicTransformsConfigBuilder(
Function<KafkaTopicTransformsConfig, T> mapper)
{
this.mapper = mapper;
this.extractHeaders = new ArrayList<>();
this.matcher = PATH_PATTERN.matcher("");
this.internalMatcher = INTERNAL_PATH_PATTERN.matcher("");
}

@Override
Expand All @@ -56,14 +44,13 @@ protected Class<KafkaTopicTransformsConfigBuilder<T>> thisType()
public KafkaTopicTransformsConfigBuilder<T> extractKey(
String extractKey)
{
this.extractKey = extractKey != null && matcher.reset(extractKey).matches()
? String.format(INTERNAL_VALUE, matcher.group(2))
: extractKey;
this.extractKey = extractKey;

return this;
}

public KafkaTopicTransformsConfigBuilder<T> extractHeaders(
List<KafkaTopicHeaderType> extractHeaders)
List<KafkaTopicHeaderConfig> extractHeaders)
{
if (extractHeaders != null)
{
Expand All @@ -73,7 +60,7 @@ public KafkaTopicTransformsConfigBuilder<T> extractHeaders(
}

public KafkaTopicTransformsConfigBuilder<T> extractHeader(
KafkaTopicHeaderType header)
KafkaTopicHeaderConfig header)
{
return extractHeader(header.name, header.path);
}
Expand All @@ -86,15 +73,9 @@ public KafkaTopicTransformsConfigBuilder<T> extractHeader(
{
this.extractHeaders = new ArrayList<>();
}
if (matcher.reset(path).matches())
{
this.extractHeaders.add(new KafkaTopicHeaderType(name,
String.format(INTERNAL_VALUE, matcher.group(2))));
}
else if (internalMatcher.reset(path).matches())
{
this.extractHeaders.add(new KafkaTopicHeaderType(name, path));
}

this.extractHeaders.add(new KafkaTopicHeaderConfig(name, path));

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
import org.agrona.io.DirectBufferInputStream;
import org.agrona.io.ExpandableDirectBufferOutputStream;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderType;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicHeaderType;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicTransformsType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
Expand Down Expand Up @@ -126,7 +126,7 @@ public final class KafkaCachePartition

private final Varint32FW varintRO = new Varint32FW();
private final KafkaCachePaddedKeyFW.Builder paddedKeyRW = new KafkaCachePaddedKeyFW.Builder()
.wrap(new UnsafeBuffer(new byte[8192]), 0, 8192);;
.wrap(new UnsafeBuffer(new byte[8192]), 0, 8192);
private final String32FW.Builder stringRW = new String32FW.Builder()
.wrap(new UnsafeBuffer(new byte[256]), 0, 256);
private final Varint32FW.Builder varintRW = new Varint32FW.Builder().wrap(new UnsafeBuffer(new byte[5]), 0, 5);
Expand Down Expand Up @@ -364,7 +364,7 @@ public void writeEntry(
ConverterHandler convertKey,
ConverterHandler convertValue,
boolean verbose,
KafkaTopicTransformsConfig transforms)
KafkaTopicTransformsType transforms)
{
final int valueLength = value != null ? value.sizeof() : -1;
writeEntryStart(context, traceId, bindingId, offset, entryMark, valueMark, timestamp, producerId, key,
Expand All @@ -391,7 +391,7 @@ public void writeEntryStart(
OctetsFW payload,
ConverterHandler convertKey,
ConverterHandler convertValue,
KafkaTopicTransformsConfig transforms,
KafkaTopicTransformsType transforms,
boolean verbose)
{
assert offset > this.progress : String.format("%d > %d", offset, this.progress);
Expand Down Expand Up @@ -573,7 +573,7 @@ public void writeEntryFinish(
ConverterHandler convertKey,
ConverterHandler convertValue,
boolean verbose,
KafkaTopicTransformsConfig transforms)
KafkaTopicTransformsType transforms)
{
final Node head = sentinel.previous;
assert head != sentinel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.config;
package io.aklivity.zilla.runtime.binding.kafka.internal.config;

public class KafkaTopicHeaderType
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import jakarta.json.JsonValue;
import jakarta.json.bind.adapter.JsonbAdapter;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderType;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfigBuilder;

Expand All @@ -47,7 +47,7 @@ public JsonObject adaptToJson(
if (transforms.extractHeaders != null && !transforms.extractHeaders.isEmpty())
{
JsonObjectBuilder headers = Json.createObjectBuilder();
for (KafkaTopicHeaderType header : transforms.extractHeaders)
for (KafkaTopicHeaderConfig header : transforms.extractHeaders)
{
headers.add(header.name, header.path);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021-2024 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.config;

import java.util.List;

public class KafkaTopicTransformsType
{
public final String extractKey;

public final List<KafkaTopicHeaderType> extractHeaders;

KafkaTopicTransformsType(
String extractKey,
List<KafkaTopicHeaderType> extractHeaders)
{
this.extractKey = extractKey;
this.extractHeaders = extractHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,34 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.config;

import static java.util.Collections.emptyList;

import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicHeaderType;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.model.ConverterHandler;

public class KafkaTopicType
{
private static final String TRANSFORM_PATH = "^\\$\\{message\\.(key|value)\\.([A-Za-z_][A-Za-z0-9_]*)\\}$";
private static final Pattern TRANSFORM_PATH_PATTERN = Pattern.compile(TRANSFORM_PATH);
private static final String TRANSFORM_INTERNAL_PATH = "$.%s";

public static final KafkaTopicType DEFAULT_TOPIC_TYPE = new KafkaTopicType();

public final ConverterHandler keyReader;
public final ConverterHandler keyWriter;
public final ConverterHandler valueReader;
public final ConverterHandler valueWriter;
public final KafkaTopicTransformsConfig transforms;
public final KafkaTopicTransformsType transforms;

private final Matcher topicMatch;
private final Matcher matcher;

private KafkaTopicType()
{
Expand All @@ -45,14 +52,16 @@ private KafkaTopicType()
this.valueReader = ConverterHandler.NONE;
this.valueWriter = ConverterHandler.NONE;
this.transforms = null;
this.matcher = TRANSFORM_PATH_PATTERN.matcher("");
}

public KafkaTopicType(
EngineContext context,
KafkaTopicConfig topicConfig)
{
this.matcher = TRANSFORM_PATH_PATTERN.matcher("");
this.topicMatch = topicConfig.name != null ? asMatcher(topicConfig.name) : null;
this.transforms = topicConfig.transforms;
this.transforms = topicConfig.transforms != null ? transforms(topicConfig.transforms) : null;
this.keyReader = Optional.ofNullable(topicConfig.key)
.map(context::supplyReadConverter)
.map(this::key)
Expand Down Expand Up @@ -100,6 +109,26 @@ private ConverterHandler headers(
return handler;
}


private KafkaTopicTransformsType transforms(
KafkaTopicTransformsConfig transforms)
{
String transformKey = Optional.ofNullable(transforms.extractKey)
.filter(key -> matcher.reset(key).matches())
.map(key -> String.format(TRANSFORM_INTERNAL_PATH, matcher.group(2)))
.orElse(transforms.extractKey);

List<KafkaTopicHeaderType> transformHeaders = Optional.ofNullable(transforms.extractHeaders)
.orElse(emptyList())
.stream()
.filter(header -> matcher.reset(header.path).matches())
.map(header -> new KafkaTopicHeaderType(header.name,
String.format(TRANSFORM_INTERNAL_PATH, matcher.group(2))))
.toList();

return new KafkaTopicTransformsType(transformKey, transformHeaders);
}

private static Matcher asMatcher(
String topic)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaTopicTransformsConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCache;
Expand All @@ -54,6 +53,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCacheTopic;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicTransformsType;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
Expand Down Expand Up @@ -486,7 +486,7 @@ final class KafkaCacheServerFetchFanout
private final ConverterHandler convertValue;
private final MutableInteger entryMark;
private final MutableInteger valueMark;
private final KafkaTopicTransformsConfig transforms;
private final KafkaTopicTransformsType transforms;

private long leaderId;
private long initialId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void shouldReadHeadersOptions()
assertThat(options, not(nullValue()));
assertThat(options.bootstrap, equalTo(singletonList("test")));
assertEquals(options.topics.get(0).transforms.extractHeaders.get(0).name, "correlation-id");
assertEquals(options.topics.get(0).transforms.extractHeaders.get(0).path, "$.correlationId");
assertEquals(options.topics.get(0).transforms.extractHeaders.get(0).path, "${message.value.correlationId}");
}

@Test
Expand All @@ -264,6 +264,6 @@ public void shouldWriteHeadersOptions()

assertThat(text, not(nullValue()));
assertThat(text, equalTo("{\"bootstrap\":[\"test\"],\"topics\":[{\"name\":\"test\",\"value\":\"test\"," +
"\"transforms\":{\"extract-headers\":{\"correlation-id\":\"$.correlationId\"}}}]}"));
"\"transforms\":{\"extract-headers\":{\"correlation-id\":\"${message.value.correlationId}\"}}}]}"));
}
}

0 comments on commit 760e698

Please sign in to comment.