Skip to content

Commit

Permalink
feat: added a new text embedded format (#858)
Browse files Browse the repository at this point in the history
* feat: added a new text embedded format

Signed-off-by: Antonio Pedro <[email protected]>

* test: added tests

Signed-off-by: Antonio Pedro <[email protected]>

* doc: updated docs for the new format

Signed-off-by: Antonio Pedro <[email protected]>

* Update generated documentation

Signed-off-by: Paolo Patierno <[email protected]>
Signed-off-by: Antonio Pedro <[email protected]>

* add get content type to match on Poll request

Signed-off-by: Antonio Pedro <[email protected]>

* handling incorrect value

Signed-off-by: Antonio Pedro <[email protected]>

* added test

Signed-off-by: Antonio Pedro <[email protected]>

* remove unused method

Signed-off-by: Antonio Pedro <[email protected]>

* replace depracted put in ObjectNode object

Signed-off-by: Antonio Pedro <[email protected]>

* replace depracted put in ObjectNode object

Signed-off-by: Antonio Pedro <[email protected]>

* added test

Signed-off-by: Antonio Pedro <[email protected]>

* remove unused method

Signed-off-by: Antonio Pedro <[email protected]>

* rebase with upstream/main

Signed-off-by: Antonio Pedro <[email protected]>

* Revert "rebase with upstream/main"

This reverts commit b009700.

Signed-off-by: Antonio Pedro <[email protected]>

* change serializer and content type

Signed-off-by: Antonio Pedro <[email protected]>

* added missing docs and CHANGELOG

Signed-off-by: Antonio Pedro <[email protected]>

* added missing word xD

Signed-off-by: António Pedro <[email protected]>

---------

Signed-off-by: Antonio Pedro <[email protected]>
Signed-off-by: Paolo Patierno <[email protected]>
Signed-off-by: António Pedro <[email protected]>
Co-authored-by: Paolo Patierno <[email protected]>
  • Loading branch information
antonio-pedro99 and ppatierno authored Mar 4, 2024
1 parent e1180e4 commit c3e96c7
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Dependency updates (Kafka 3.7.0, Kubernetes configuration provider 1.1.2, Vert.x 4.5.4, Netty 4.1.107.Final, Jackson FasterXML 2.16.1, Micrometer 1.12.3)
* Fixed missing messaging semantic attributes to the Kafka consumer spans
* Introduced a new text embedded format to send and receive plain strings for record key and value.

## 0.27.0

Expand Down
3 changes: 3 additions & 0 deletions documentation/book/api/paths.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ __optional__|The maximum amount of time, in milliseconds, that the HTTP Bridge s

* `application/vnd.kafka.json.v2+json`
* `application/vnd.kafka.binary.v2+json`
* `application/vnd.kafka.text.v2+json`
* `application/vnd.kafka.v2+json`


Expand Down Expand Up @@ -1046,6 +1047,7 @@ __required__||<<_producerrecordlist,ProducerRecordList>>

* `application/vnd.kafka.json.v2+json`
* `application/vnd.kafka.binary.v2+json`
* `application/vnd.kafka.text.v2+json`


==== Produces
Expand Down Expand Up @@ -1312,6 +1314,7 @@ __required__|List of records to send to a given topic partition, including a val

* `application/vnd.kafka.json.v2+json`
* `application/vnd.kafka.binary.v2+json`
* `application/vnd.kafka.text.v2+json`


==== Produces
Expand Down
13 changes: 9 additions & 4 deletions documentation/modules/con-requests-kafka-bridge.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ API request and response bodies are always encoded as JSON.
Content-Type: application/vnd.kafka.v2+json
----

* When performing producer operations, `POST` requests must provide `Content-Type` headers specifying the _embedded data format_ of the messages produced. This can be either `json` or `binary`.
* When performing producer operations, `POST` requests must provide `Content-Type` headers specifying the _embedded data format_ of the messages produced. This can be either `json`, `binary` or `text`.
+
[cols="35,65",options="header",stripes="none",separator=¦]
|===
Expand All @@ -33,6 +33,9 @@ m¦Content-Type: application/vnd.kafka.json.v2+json
¦Binary
m¦Content-Type: application/vnd.kafka.binary.v2+json

¦Text
m¦Content-Type: application/vnd.kafka.text.v2+json

|===

The embedded data format is set per consumer, as described in the next section.
Expand All @@ -42,9 +45,9 @@ An empty body can be used to create a consumer with the default values.

== Embedded data format

The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Two embedded data formats are supported: JSON and binary.
The embedded data format is the format of the Kafka messages that are transmitted, over HTTP, from a producer to a consumer using the Kafka Bridge. Three embedded data formats are supported: JSON, binary and text.

When creating a consumer using the `/consumers/_groupid_` endpoint, the `POST` request body must specify an embedded data format of either JSON or binary. This is specified in the `format` field, for example:
When creating a consumer using the `/consumers/_groupid_` endpoint, the `POST` request body must specify an embedded data format of either JSON, binary or text. This is specified in the `format` field, for example:

[source,json,subs="attributes+"]
----
Expand Down Expand Up @@ -109,6 +112,8 @@ curl -X POST \
----
<1> The header value in binary format and encoded as Base64.

Please note that if your consumer is configured to use the text embedded data format, the `value` and `key` field in the `records` parameter must be a string and not a JSON object.

== Accept headers

After creating a consumer, all subsequent GET requests must provide an `Accept` header in the following format:
Expand All @@ -118,7 +123,7 @@ After creating a consumer, all subsequent GET requests must provide an `Accept`
Accept: application/vnd.kafka._EMBEDDED-DATA-FORMAT_.v2+json
----

The `EMBEDDED-DATA-FORMAT` is either `json` or `binary`.
The `EMBEDDED-DATA-FORMAT` is either `json`, `binary` or `text`.

For example, when retrieving records for a subscribed consumer using an embedded data format of JSON, include this Accept header:

Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/strimzi/kafka/bridge/BridgeContentType.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public class BridgeContentType {
/** JSON encoding with BINARY embedded format */
public static final String KAFKA_JSON_BINARY = "application/vnd.kafka.binary.v2+json";

/** JSON encoding with TEXT embedded format */
public static final String KAFKA_JSON_TEXT = "application/vnd.kafka.text.v2+json";

/** Specific Kafka JSON encoding */
public static final String KAFKA_JSON = "application/vnd.kafka.v2+json";

Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/strimzi/kafka/bridge/EmbeddedFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ public enum EmbeddedFormat {
BINARY,

/** Define "json" data as embedded format */
JSON;
JSON,

/** Define "text" data as embedded format */
TEXT;

/**
* Convert the String value in the corresponding enum
Expand All @@ -28,6 +31,8 @@ public static EmbeddedFormat from(String value) {
return JSON;
case "binary":
return BINARY;
case "text":
return TEXT;
}
throw new IllegalEmbeddedFormatException("Invalid format type.");
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,8 @@ private EmbeddedFormat contentTypeToFormat(String contentType) {
return EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_JSON:
return EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_TEXT:
return EmbeddedFormat.TEXT;
}
throw new IllegalArgumentException(contentType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpTextMessageConverter;
import io.strimzi.kafka.bridge.http.converter.JsonDecodeException;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
Expand Down Expand Up @@ -305,9 +306,8 @@ private void pollHandler(ConsumerRecords<K, V> records, Throwable ex, RoutingCon
BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson()));
} else {
responseStatus = HttpResponseStatus.OK;
HttpUtils.sendResponse(routingContext, responseStatus.code(),
this.format == EmbeddedFormat.BINARY ? BridgeContentType.KAFKA_JSON_BINARY : BridgeContentType.KAFKA_JSON_JSON,
buffer);

HttpUtils.sendResponse(routingContext, responseStatus.code(), getContentType(), buffer);
}
} catch (JsonDecodeException e) {
LOGGER.error("Error decoding records as JSON", e);
Expand Down Expand Up @@ -614,16 +614,32 @@ private MessageConverter<K, V, byte[], byte[]> buildMessageConverter() {
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
}

private String getContentType() {
switch (this.format) {
case JSON:
return BridgeContentType.KAFKA_JSON_JSON;
case BINARY:
return BridgeContentType.KAFKA_JSON_BINARY;
case TEXT:
return BridgeContentType.KAFKA_JSON_TEXT;
}
throw new IllegalArgumentException();
}

private boolean checkAcceptedBody(String accept) {
switch (accept) {
case BridgeContentType.KAFKA_JSON_JSON:
return format == EmbeddedFormat.JSON;
case BridgeContentType.KAFKA_JSON_BINARY:
return format == EmbeddedFormat.BINARY;
case BridgeContentType.KAFKA_JSON_TEXT:
return format == EmbeddedFormat.TEXT;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.strimzi.kafka.bridge.converter.MessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpBinaryMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpJsonMessageConverter;
import io.strimzi.kafka.bridge.http.converter.HttpTextMessageConverter;
import io.strimzi.kafka.bridge.http.converter.JsonUtils;
import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
import io.strimzi.kafka.bridge.http.model.HttpBridgeResult;
Expand Down Expand Up @@ -228,6 +229,8 @@ private MessageConverter<K, V, byte[], byte[]> buildMessageConverter(String cont
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BridgeContentType.KAFKA_JSON_BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
case BridgeContentType.KAFKA_JSON_TEXT:
return (MessageConverter<K, V, byte[], byte[]>) new HttpTextMessageConverter();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.bridge.http.converter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.strimzi.kafka.bridge.converter.MessageConverter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;

import javax.xml.bind.DatatypeConverter;
import java.util.ArrayList;
import java.util.List;

/**
* Implementation of a message converter to deal with the "text" embedded data format
*/
@SuppressWarnings("checkstyle:NPathComplexity")
public class HttpTextMessageConverter implements MessageConverter<byte[], byte[], byte[], byte[]> {
@Override
public ProducerRecord<byte[], byte[]> toKafkaRecord(String kafkaTopic, Integer partition, byte[] message) {

Integer partitionFromBody = null;
byte[] key = null;
byte[] value = null;
Headers headers = new RecordHeaders();

JsonNode json = JsonUtils.bytesToJson(message);

if (!json.isEmpty()) {
if (json.has("key")) {
key = json.get("key").asText().getBytes();
}
if (json.has("value")) {
JsonNode valueNode = json.get("value");
if (!valueNode.isTextual()) {
throw new IllegalStateException("Because the embedded format is 'text', the value must be a string");
}
value = valueNode.asText().getBytes();
}
if (json.has("headers")) {
ArrayNode jsonArray = (ArrayNode) json.get("headers");
for (JsonNode jsonObject : jsonArray) {
headers.add(new RecordHeader(jsonObject.get("key").asText(), DatatypeConverter.parseBase64Binary(jsonObject.get("value").asText())));
}
}
if (json.has("partition")) {
partitionFromBody = json.get("partition").asInt();
}
if (partition != null && partitionFromBody != null) {
throw new IllegalStateException("Partition specified in body and in request path");
}
if (partition != null) {
partitionFromBody = partition;
}
}
return new ProducerRecord<>(kafkaTopic, partitionFromBody, key, value, headers);
}

@Override
public List<ProducerRecord<byte[], byte[]>> toKafkaRecords(String kafkaTopic, Integer partition, byte[] messages) {

List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();

JsonNode json = JsonUtils.bytesToJson(messages);
ArrayNode jsonArray = (ArrayNode) json.get("records");

for (JsonNode jsonObj : jsonArray) {
records.add(toKafkaRecord(kafkaTopic, partition, JsonUtils.jsonToBytes(jsonObj)));
}
return records;
}

@Override
public byte[] toMessage(String address, ConsumerRecord<byte[], byte[]> record) {
throw new UnsupportedOperationException();
}

@Override
public byte[] toMessages(ConsumerRecords<byte[], byte[]> records) {
ArrayNode jsonArray = JsonUtils.createArrayNode();

for (ConsumerRecord<byte[], byte[]> record : records) {
ObjectNode jsonObject = JsonUtils.createObjectNode();

jsonObject.set("topic", new TextNode(record.topic()));
jsonObject.set("key", record.key() != null ? new TextNode(new String(record.key())) : null);
jsonObject.set("value", record.value() != null ? new TextNode(new String(record.value())) : null);
jsonObject.put("partition", record.partition());
jsonObject.put("offset", record.offset());

ArrayNode headers = JsonUtils.createArrayNode();

for (Header kafkaHeader : record.headers()) {
ObjectNode header = JsonUtils.createObjectNode();

header.set("key", new TextNode(kafkaHeader.key()));
header.put("value", DatatypeConverter.printBase64Binary(kafkaHeader.value()));
headers.add(header);
}
if (!headers.isEmpty()) {
jsonObject.set("headers", headers);
}
jsonArray.add(jsonObject);
}
return JsonUtils.jsonToBytes(jsonArray);
}
}
30 changes: 30 additions & 0 deletions src/main/resources/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,11 @@
"schema": {
"$ref": "#/components/schemas/ProducerRecordList"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/ProducerRecordList"
}
}
},
"required": true
Expand Down Expand Up @@ -799,6 +804,11 @@
}
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/ConsumerRecordList"
}
},
"application/vnd.kafka.v2+json": {
"schema": {
"$ref": "#/components/schemas/ConsumerRecordList"
Expand All @@ -819,6 +829,11 @@
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
Expand Down Expand Up @@ -847,6 +862,11 @@
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
Expand Down Expand Up @@ -875,6 +895,11 @@
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
}
},
"application/vnd.kafka.v2+json": {
"schema": {
"$ref": "#/components/schemas/Error"
Expand Down Expand Up @@ -1041,6 +1066,11 @@
"schema": {
"$ref": "#/components/schemas/ProducerRecordToPartitionList"
}
},
"application/vnd.kafka.text.v2+json": {
"schema": {
"$ref": "#/components/schemas/ProducerRecordToPartitionList"
}
}
},
"required": true
Expand Down
Loading

0 comments on commit c3e96c7

Please sign in to comment.