diff --git a/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java b/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java index fe56c8323..650661a70 100644 --- a/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java +++ b/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java @@ -9,6 +9,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; @@ -16,6 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -84,6 +86,29 @@ public CompletionStage> listTopics() { return promise; } + /** + * Creates a topic with given name + * + * @param topicName topic name to create + * @return a CompletionStage Void + */ + public CompletionStage createTopic(String topicName) { + log.trace("Create topic thread {}", Thread.currentThread()); + log.info("Create topic {}", topicName); + CompletableFuture promise = new CompletableFuture<>(); + this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, 2, (short) 1))) + .all() + .whenComplete((topic, exception) -> { + log.trace("Create topic callback thread {}", Thread.currentThread()); + if (exception == null) { + promise.complete(topic); + } else { + promise.completeExceptionally(exception); + } + }); + return promise; + } + /** * Returns the description of the specified topics. * diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java index 9e7ce581a..9b9851769 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java @@ -78,6 +78,10 @@ public void handle(RoutingContext routingContext, Handler ha doGetTopic(routingContext); break; + case CREATE_TOPIC: + doCreateTopic(routingContext); + break; + case LIST_PARTITIONS: doListPartitions(routingContext); break; @@ -170,6 +174,32 @@ public void doGetTopic(RoutingContext routingContext) { }); } + /** + * Create a topic described in the HTTP request + * + * @param routingContext the routing context + */ + public void doCreateTopic(RoutingContext routingContext) { + String topicName = routingContext.pathParam("topicname"); + + this.kafkaBridgeAdmin.createTopic(topicName) + .whenComplete(((topic, exception) -> { + log.trace("Create topic handler thread {}", Thread.currentThread()); + if (exception == null) { + ArrayNode root = JsonUtils.createArrayNode(); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.OK.code(), + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(root)); + } else { + HttpBridgeError error = new HttpBridgeError( + HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + exception.getMessage() + ); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); + } + })); + } + /** * Get partitions information related to the topic in the HTTP request * diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java index 9d7ea8b72..1ad811e35 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java @@ -159,6 +159,7 @@ public void start(Promise startPromise) { routerBuilder.operation(this.SEEK_TO_END.getOperationId().toString()).handler(this.SEEK_TO_END); routerBuilder.operation(this.LIST_TOPICS.getOperationId().toString()).handler(this.LIST_TOPICS); routerBuilder.operation(this.GET_TOPIC.getOperationId().toString()).handler(this.GET_TOPIC); + routerBuilder.operation(this.CREATE_TOPIC.getOperationId().toString()).handler(this.CREATE_TOPIC); routerBuilder.operation(this.LIST_PARTITIONS.getOperationId().toString()).handler(this.LIST_PARTITIONS); routerBuilder.operation(this.GET_PARTITION.getOperationId().toString()).handler(this.GET_PARTITION); routerBuilder.operation(this.GET_OFFSETS.getOperationId().toString()).handler(this.GET_OFFSETS); @@ -378,6 +379,11 @@ private void getTopic(RoutingContext routingContext) { processAdminClient(routingContext); } + private void createTopic(RoutingContext routingContext) { + this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC); + processAdminClient(routingContext); + } + private void listPartitions(RoutingContext routingContext) { this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS); processAdminClient(routingContext); @@ -721,6 +727,14 @@ public void process(RoutingContext routingContext) { } }; + final HttpOpenApiOperation CREATE_TOPIC = new HttpOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC) { + + @Override + public void process(RoutingContext routingContext) { + createTopic(routingContext); + } + }; + final HttpOpenApiOperation LIST_PARTITIONS = new HttpOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS) { @Override diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java index d12a43a91..72775af36 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java @@ -28,6 +28,8 @@ public enum HttpOpenApiOperations { LIST_TOPICS("listTopics"), /** get information for a specific topic */ GET_TOPIC("getTopic"), + /** creates a topic with specified name */ + CREATE_TOPIC("createTopic"), /** list partitions for a specific topic */ LIST_PARTITIONS("listPartitions"), /** get partition information for a specific topic */ diff --git a/src/main/resources/openapi.json b/src/main/resources/openapi.json index f6a0862d8..f99f5a8d8 100644 --- a/src/main/resources/openapi.json +++ b/src/main/resources/openapi.json @@ -748,6 +748,56 @@ } ] }, + "/create-topic/{topicname}": { + "post": { + "tags": [ + "Topics" + ], + "description": "Creates a topic with given topic name.", + "operationId": "createTopic", + "responses": { + "200": { + "description": "Topic metadata", + "content": { + "application/vnd.kafka.v2+json": { + "schema": { + "$ref": "#/components/schemas/TopicMetadata" + } + } + } + }, + "404": { + "description": "The specified topic was not found.", + "content": { + "application/vnd.kafka.v2+json": { + "schema": { + "$ref": "#/components/schemas/Error" + }, + "examples": { + "response": { + "value": { + "error_code": 404, + "message": "The specified topic was not found." + } + } + } + } + } + } + } + }, + "parameters": [ + { + "name": "topicname", + "in": "path", + "description": "Name of the topic to send records to or retrieve metadata from.", + "required": true, + "schema": { + "type": "string" + } + } + ] + }, "/consumers/{groupid}/instances/{name}/records": { "get": { "tags": [ diff --git a/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java b/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java index df6cdd782..9b6905694 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java @@ -215,4 +215,19 @@ void setupTopic(VertxTestContext context, String topic, int partitions, int coun }); producer.get(TEST_TIMEOUT, TimeUnit.SECONDS); } + + @Test + void createTopicTest(VertxTestContext context) { + baseService() + .postRequest("/create-topic/" + topic) + .as(BodyCodec.jsonArray()) + .send(ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + }); + context.completeNow(); + }); + } }