diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java index 9a0ef3d4..033be8a3 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java @@ -52,7 +52,7 @@ public class HttpAdminBridgeEndpoint extends HttpBridgeEndpoint { * @param context the HTTP bridge context */ public HttpAdminBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext context) { - super(bridgeConfig, null); + super(bridgeConfig); this.name = "kafka-bridge-admin"; this.httpBridgeContext = context; this.kafkaBridgeAdmin = new KafkaBridgeAdmin(bridgeConfig.getKafkaConfig()); @@ -343,4 +343,4 @@ private static ObjectNode createPartitionMetadata(TopicPartitionInfo partitionIn root.set("replicas", replicasArray); return root; } -} \ No newline at end of file +} diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridgeEndpoint.java index e76ada16..8ac367a3 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridgeEndpoint.java @@ -5,7 +5,6 @@ package io.strimzi.kafka.bridge.http; -import io.strimzi.kafka.bridge.EmbeddedFormat; import io.strimzi.kafka.bridge.Handler; import io.strimzi.kafka.bridge.config.BridgeConfig; import io.vertx.ext.web.RoutingContext; @@ -15,7 +14,6 @@ */ public abstract class HttpBridgeEndpoint { protected String name; - protected final EmbeddedFormat format; protected final BridgeConfig bridgeConfig; private Handler closeHandler; @@ -23,11 +21,9 @@ public abstract class HttpBridgeEndpoint { * Constructor * * @param bridgeConfig the bridge configuration - * @param format the embedded format for consumed messages */ - public HttpBridgeEndpoint(BridgeConfig bridgeConfig, EmbeddedFormat format) { + public HttpBridgeEndpoint(BridgeConfig bridgeConfig) { this.bridgeConfig = bridgeConfig; - this.format = format; } /** diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 7987aeee..6c97ad4d 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -70,6 +70,7 @@ public class HttpSinkBridgeEndpoint extends HttpBridgeEndpoint { private MessageConverter messageConverter; private final HttpBridgeContext httpBridgeContext; private final KafkaBridgeConsumer kafkaBridgeConsumer; + private final EmbeddedFormat format; private ConsumerInstanceId consumerInstanceId; private boolean subscribed; private boolean assigned; @@ -85,9 +86,10 @@ public class HttpSinkBridgeEndpoint extends HttpBridgeEndpoint { */ public HttpSinkBridgeEndpoint(BridgeConfig bridgeConfig, HttpBridgeContext context, EmbeddedFormat format, Deserializer keyDeserializer, Deserializer valueDeserializer) { - super(bridgeConfig, format); + super(bridgeConfig); this.httpBridgeContext = context; this.kafkaBridgeConsumer = new KafkaBridgeConsumer<>(bridgeConfig.getKafkaConfig(), keyDeserializer, valueDeserializer); + this.format = format; this.subscribed = false; this.assigned = false; } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 93a21475..49282f36 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -48,10 +48,11 @@ public class HttpSourceBridgeEndpoint extends HttpBridgeEndpoint { private MessageConverter messageConverter; private boolean closing; private final KafkaBridgeProducer kafkaBridgeProducer; + private String contentType; HttpSourceBridgeEndpoint(BridgeConfig bridgeConfig, EmbeddedFormat format, Serializer keySerializer, Serializer valueSerializer) { - super(bridgeConfig, format); + super(bridgeConfig); this.kafkaBridgeProducer = new KafkaBridgeProducer<>(bridgeConfig.getKafkaConfig(), keySerializer, valueSerializer); } @@ -59,7 +60,6 @@ public class HttpSourceBridgeEndpoint extends HttpBridgeEndpoint { public void open() { this.name = this.bridgeConfig.getBridgeID() == null ? "kafka-bridge-producer-" + UUID.randomUUID() : this.bridgeConfig.getBridgeID() + "-" + UUID.randomUUID(); this.closing = false; - this.messageConverter = this.buildMessageConverter(); this.kafkaBridgeProducer.create(); } @@ -106,6 +106,14 @@ public void handle(RoutingContext routingContext, Handler ha SpanHandle span = tracing.span(routingContext, operationName); try { + String requestContentType = routingContext.request().getHeader("Content-Type") != null ? + routingContext.request().getHeader("Content-Type") : BridgeContentType.KAFKA_JSON_BINARY; + // create a new message converter only if it's needed because the Content-Type is different from the previous request + if (!requestContentType.equals(contentType)) { + contentType = requestContentType; + messageConverter = this.buildMessageConverter(contentType); + } + if (messageConverter == null) { span.finish(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()); HttpBridgeError error = new HttpBridgeError( @@ -214,11 +222,11 @@ private int handleError(Throwable ex) { } @SuppressWarnings("unchecked") - private MessageConverter buildMessageConverter() { - switch (this.format) { - case JSON: + private MessageConverter buildMessageConverter(String contentType) { + switch (contentType) { + case BridgeContentType.KAFKA_JSON_JSON: return (MessageConverter) new HttpJsonMessageConverter(); - case BINARY: + case BridgeContentType.KAFKA_JSON_BINARY: return (MessageConverter) new HttpBinaryMessageConverter(); } return null;