diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java index 5d0e8e115..5cfd0c714 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java @@ -144,7 +144,7 @@ public void doGetTopic(RoutingContext routingContext) { if (configEntries.size() > 0) { ObjectNode configs = JsonUtils.createObjectNode(); configEntries.forEach(configEntry -> configs.put(configEntry.name(), configEntry.value())); - root.put("configs", configs); + root.set("configs", configs); } TopicDescription description = topicDescriptions.get(topicName); if (description != null) { @@ -152,7 +152,7 @@ public void doGetTopic(RoutingContext routingContext) { partitionsArray.add(createPartitionMetadata(partitionInfo)); }); } - root.put("partitions", partitionsArray); + root.set("partitions", partitionsArray); HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(root)); } else if (ex.getCause() instanceof UnknownTopicOrPartitionException) { HttpBridgeError error = new HttpBridgeError( @@ -339,7 +339,7 @@ private static ObjectNode createPartitionMetadata(TopicPartitionInfo partitionIn replica.put("in_sync", insyncSet.contains(node.id())); replicasArray.add(replica); }); - root.put("replicas", replicasArray); + root.set("replicas", replicasArray); return root; } } \ No newline at end of file diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java index 0397d8f40..ed94bb99a 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSinkBridgeEndpoint.java @@ -481,12 +481,12 @@ private void doListSubscriptions(RoutingContext routingContext) { } for (Map.Entry part: partitions.entrySet()) { ObjectNode topic = JsonUtils.createObjectNode(); - topic.put(part.getKey(), part.getValue()); + topic.set(part.getKey(), part.getValue()); partitionsArray.add(topic); } ArrayNode topicsArray = JsonUtils.createArrayNode(topics); - root.put("topics", topicsArray); - root.put("partitions", partitionsArray); + root.set("topics", topicsArray); + root.set("partitions", partitionsArray); HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(root)); } else { diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 0ec69f2b5..92ebc879e 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -196,7 +196,7 @@ private ObjectNode buildOffsets(List> results) { } offsets.add(offset); } - jsonResponse.put("offsets", offsets); + jsonResponse.set("offsets", offsets); return jsonResponse; } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java index 6156456a0..5a6fc0d9a 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpBinaryMessageConverter.java @@ -110,7 +110,7 @@ public byte[] toMessages(ConsumerRecords records) { headers.add(header); } if (!headers.isEmpty()) { - jsonObject.put("headers", headers); + jsonObject.set("headers", headers); } jsonArray.add(jsonObject); } diff --git a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java index 7f8f8a7ac..7077d35dd 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/converter/HttpJsonMessageConverter.java @@ -91,9 +91,9 @@ public byte[] toMessages(ConsumerRecords records) { ObjectNode jsonObject = JsonUtils.createObjectNode(); jsonObject.put("topic", record.topic()); - jsonObject.put("key", record.key() != null ? + jsonObject.set("key", record.key() != null ? JsonUtils.bytesToJson(record.key()) : null); - jsonObject.put("value", record.value() != null ? + jsonObject.set("value", record.value() != null ? JsonUtils.bytesToJson(record.value()) : null); jsonObject.put("partition", record.partition()); jsonObject.put("offset", record.offset()); @@ -109,7 +109,7 @@ public byte[] toMessages(ConsumerRecords records) { headers.add(header); } if (!headers.isEmpty()) { - jsonObject.put("headers", headers); + jsonObject.set("headers", headers); } jsonArray.add(jsonObject); } diff --git a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java index 7210930d1..1064ba8e8 100644 --- a/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java +++ b/src/main/java/io/strimzi/kafka/bridge/tracing/OpenTelemetryHandle.java @@ -111,7 +111,7 @@ private static TextMapPropagator propagator() { return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); } - private static final TextMapGetter ROUTING_CONTEXT_GETTER = new TextMapGetter() { + private static final TextMapGetter ROUTING_CONTEXT_GETTER = new TextMapGetter<>() { @Override public Iterable keys(RoutingContext rc) { return rc.request().headers().names(); @@ -126,7 +126,7 @@ public String get(RoutingContext rc, String key) { } }; - private static final TextMapGetter> MG = new TextMapGetter>() { + private static final TextMapGetter> MG = new TextMapGetter<>() { @Override public Iterable keys(Map map) { return map.keySet();