Skip to content

Commit

Permalink
Handling Content-Type from producer on each HTTP request (#876)
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Patierno <[email protected]>
  • Loading branch information
ppatierno authored Mar 2, 2024
1 parent a4ac00a commit afbf0a3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class HttpAdminBridgeEndpoint extends HttpBridgeEndpoint {
* @param context the HTTP bridge context
*/
public HttpAdminBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext context) {
super(bridgeConfig, null);
super(bridgeConfig);
this.name = "kafka-bridge-admin";
this.httpBridgeContext = context;
this.kafkaBridgeAdmin = new KafkaBridgeAdmin(bridgeConfig.getKafkaConfig());
Expand Down Expand Up @@ -343,4 +343,4 @@ private static ObjectNode createPartitionMetadata(TopicPartitionInfo partitionIn
root.set("replicas", replicasArray);
return root;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package io.strimzi.kafka.bridge.http;

import io.strimzi.kafka.bridge.EmbeddedFormat;
import io.strimzi.kafka.bridge.Handler;
import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.vertx.ext.web.RoutingContext;
Expand All @@ -15,19 +14,16 @@
*/
public abstract class HttpBridgeEndpoint {
protected String name;
protected final EmbeddedFormat format;
protected final BridgeConfig bridgeConfig;
private Handler<HttpBridgeEndpoint> closeHandler;

/**
* Constructor
*
* @param bridgeConfig the bridge configuration
* @param format the embedded format for consumed messages
*/
public HttpBridgeEndpoint(BridgeConfig bridgeConfig, EmbeddedFormat format) {
public HttpBridgeEndpoint(BridgeConfig bridgeConfig) {
this.bridgeConfig = bridgeConfig;
this.format = format;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class HttpSinkBridgeEndpoint<K, V> extends HttpBridgeEndpoint {
private MessageConverter<K, V, byte[], byte[]> messageConverter;
private final HttpBridgeContext<K, V> httpBridgeContext;
private final KafkaBridgeConsumer<K, V> kafkaBridgeConsumer;
private final EmbeddedFormat format;
private ConsumerInstanceId consumerInstanceId;
private boolean subscribed;
private boolean assigned;
Expand All @@ -85,9 +86,10 @@ public class HttpSinkBridgeEndpoint<K, V> extends HttpBridgeEndpoint {
*/
public HttpSinkBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext<K, V> context, EmbeddedFormat format,
Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
super(bridgeConfig, format);
super(bridgeConfig);
this.httpBridgeContext = context;
this.kafkaBridgeConsumer = new KafkaBridgeConsumer<>(bridgeConfig.getKafkaConfig(), keyDeserializer, valueDeserializer);
this.format = format;
this.subscribed = false;
this.assigned = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@ public class HttpSourceBridgeEndpoint<K, V> extends HttpBridgeEndpoint {
private MessageConverter<K, V, byte[], byte[]> messageConverter;
private boolean closing;
private final KafkaBridgeProducer<K, V> kafkaBridgeProducer;
private String contentType;

HttpSourceBridgeEndpoint(BridgeConfig bridgeConfig, EmbeddedFormat format,
Serializer<K> keySerializer, Serializer<V> valueSerializer) {
super(bridgeConfig, format);
super(bridgeConfig);
this.kafkaBridgeProducer = new KafkaBridgeProducer<>(bridgeConfig.getKafkaConfig(), keySerializer, valueSerializer);
}

@Override
public void open() {
this.name = this.bridgeConfig.getBridgeID() == null ? "kafka-bridge-producer-" + UUID.randomUUID() : this.bridgeConfig.getBridgeID() + "-" + UUID.randomUUID();
this.closing = false;
this.messageConverter = this.buildMessageConverter();
this.kafkaBridgeProducer.create();
}

Expand Down Expand Up @@ -106,6 +106,14 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
SpanHandle<K, V> span = tracing.span(routingContext, operationName);

try {
String requestContentType = routingContext.request().getHeader("Content-Type") != null ?
routingContext.request().getHeader("Content-Type") : BridgeContentType.KAFKA_JSON_BINARY;
// create a new message converter only if it's needed because the Content-Type is different from the previous request
if (!requestContentType.equals(contentType)) {
contentType = requestContentType;
messageConverter = this.buildMessageConverter(contentType);
}

if (messageConverter == null) {
span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
HttpBridgeError error = new HttpBridgeError(
Expand Down Expand Up @@ -214,11 +222,11 @@ private int handleError(Throwable ex) {
}

@SuppressWarnings("unchecked")
private MessageConverter<K, V, byte[], byte[]> buildMessageConverter() {
switch (this.format) {
case JSON:
private MessageConverter<K, V, byte[], byte[]> buildMessageConverter(String contentType) {
switch (contentType) {
case BridgeContentType.KAFKA_JSON_JSON:
return (MessageConverter<K, V, byte[], byte[]>) new HttpJsonMessageConverter();
case BINARY:
case BridgeContentType.KAFKA_JSON_BINARY:
return (MessageConverter<K, V, byte[], byte[]>) new HttpBinaryMessageConverter();
}
return null;
Expand Down

0 comments on commit afbf0a3

Please sign in to comment.