diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f325432..2e49c4b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ ## 0.31.0 -* Dependency updates (Kafka 3.9.0, Vert.x 4.5.11, Netty 4.1.115.Final) +* Dependency updates (Kafka 3.9.0, Vert.x 4.5.10) +* Added support for creating a new topic via endpoint. ## 0.30.0 diff --git a/documentation/book/api/.openapi-generator/openapi.json-generate-apidoc.sha256 b/documentation/book/api/.openapi-generator/openapi.json-generate-apidoc.sha256 index ec32758c..f0bdd7ef 100644 --- a/documentation/book/api/.openapi-generator/openapi.json-generate-apidoc.sha256 +++ b/documentation/book/api/.openapi-generator/openapi.json-generate-apidoc.sha256 @@ -1 +1 @@ -e4a70d99cdac5e8dddb45ce30c39817fa9402463074478e0fd3a05d6a7f849d5 \ No newline at end of file +cf88d8909114896517ba4027596382126bca594d55dd9a924330232407e603b2 \ No newline at end of file diff --git a/documentation/book/api/index.adoc b/documentation/book/api/index.adoc index 8f053f1c..af76517a 100644 --- a/documentation/book/api/index.adoc +++ b/documentation/book/api/index.adoc @@ -2289,6 +2289,87 @@ endif::internal-generation[] === Topics +[.createTopic] +==== createTopic + +`POST /admin/topics` + + + +===== Description + +Creates a topic with given name, partitions count, and replication factor. + + +// markup not found, no include::{specDir}admin/topics/POST/spec.adoc[opts=optional] + + + +===== Parameters + + + +[cols="2,3,1,1,1"] +.Body Parameter +|=== +|Name| Description| Required| Default| Pattern + +| NewTopic +| Creates a topic with given name, partitions count, and replication factor. <> +| X +| +| + +|=== + + + + + +===== Return Type + + + + +- + + +===== Responses + +.HTTP Response Codes +[cols="2,3,1"] +|=== +| Code | Message | Datatype + + +| 201 +| Created +| <<>> + +|=== + +===== Samples + + +include::{snippetDir}admin/topics/POST/http-request.adoc[opts=optional] + + +// markup not found, no include::{snippetDir}admin/topics/POST/http-response.adoc[opts=optional] + + + +// file not found, no * wiremock data link :admin/topics/POST/POST.json[] + + +ifdef::internal-generation[] +===== Implementation + +// markup not found, no include::{specDir}admin/topics/POST/implementation.adoc[opts=optional] + + +endif::internal-generation[] + + [.getOffsets] ==== getOffsets @@ -3209,6 +3290,42 @@ Information about Kafka Bridge instance. +[#NewTopic] +=== _NewTopic_ NewTopic + + + + +[.fields-NewTopic] +[cols="2,1,1,2,4,1"] +|=== +| Field Name| Required| Nullable | Type| Description | Format + +| topic_name +| X +| +| String +| Name of the topic to create. +| + +| partitions_count +| +| X +| Integer +| Number of partitions for the topic. +| + +| replication_factor +| +| X +| Integer +| Number of replicas for each partition. +| + +|=== + + + [#OffsetCommitSeek] === _OffsetCommitSeek_ OffsetCommitSeek diff --git a/documentation/book/api/snippet/admin/topics/POST/http-request.adoc b/documentation/book/api/snippet/admin/topics/POST/http-request.adoc new file mode 100644 index 00000000..43e8dfb9 --- /dev/null +++ b/documentation/book/api/snippet/admin/topics/POST/http-request.adoc @@ -0,0 +1,11 @@ +==== Example HTTP request + +===== Request body +[source,json] +---- +{ + "topic_name" : "my-topic", + "partitions_count" : 1, + "replication_factor" : 2, +} +---- \ No newline at end of file diff --git a/pom.xml b/pom.xml index e9b1b296..c401d8f4 100644 --- a/pom.xml +++ b/pom.xml @@ -100,8 +100,8 @@ 11 11 2.17.2 - 4.5.11 - 4.5.11 + 4.5.10 + 4.5.10 4.1.115.Final 3.9.0 1.2.0 diff --git a/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java b/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java index 51f7d272..4947c63f 100644 --- a/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java +++ b/src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java @@ -9,6 +9,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; @@ -16,8 +17,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -84,6 +87,31 @@ public CompletionStage> listTopics() { return promise; } + /** + * Creates a topic with given name and number of partitions (optional) and replication factor (optional). + * + * @param topicName topic name to create + * @param partitions number of partitions + * @param replicationFactor replication factor + * @return a CompletionStage Void + */ + public CompletionStage createTopic(String topicName, Optional partitions, Optional replicationFactor) { + LOGGER.trace("Create topic thread {}", Thread.currentThread()); + LOGGER.info("Create topic {}, partitions {}, replicationFactor {}", topicName, partitions, replicationFactor); + CompletableFuture promise = new CompletableFuture<>(); + this.adminClient.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicationFactor))) + .all() + .whenComplete((topic, exception) -> { + LOGGER.trace("Create topic callback thread {}", Thread.currentThread()); + if (exception == null) { + promise.complete(topic); + } else { + promise.completeExceptionally(exception); + } + }); + return promise; + } + /** * Returns the description of the specified topics. * diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java index 033be8a3..8719c5c6 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpAdminBridgeEndpoint.java @@ -15,6 +15,7 @@ import io.strimzi.kafka.bridge.config.BridgeConfig; import io.strimzi.kafka.bridge.http.converter.JsonUtils; import io.strimzi.kafka.bridge.http.model.HttpBridgeError; +import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; @@ -32,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -81,6 +83,10 @@ public void handle(RoutingContext routingContext, Handler ha doGetTopic(routingContext); break; + case CREATE_TOPIC: + doCreateTopic(routingContext); + break; + case LIST_PARTITIONS: doListPartitions(routingContext); break; @@ -173,6 +179,47 @@ public void doGetTopic(RoutingContext routingContext) { }); } + /** + * Create a topic with described name, partitions count, and replication factor in the body of the HTTP request + * + * @param routingContext the routing context + */ + public void doCreateTopic(RoutingContext routingContext) { + JsonObject jsonBody = routingContext.body().asJsonObject(); + + if (jsonBody.isEmpty()) { + HttpBridgeError error = new HttpBridgeError( + HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), + "Request body must be a JSON object" + ); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.UNPROCESSABLE_ENTITY.code(), + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); + return; + } + + String topicName = jsonBody.getString("topic_name"); + Optional partitionsCount = Optional.ofNullable(jsonBody.getInteger("partitions_count")); + Optional replicationFactor = Optional.ofNullable(jsonBody.getInteger("replication_factor")) + .map(Integer::shortValue); + + this.kafkaBridgeAdmin.createTopic(topicName, partitionsCount, replicationFactor) + .whenComplete(((topic, exception) -> { + LOGGER.trace("Create topic handler thread {}", Thread.currentThread()); + if (exception == null) { + JsonNode root = JsonUtils.createObjectNode(); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.CREATED.code(), + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(root)); + } else { + HttpBridgeError error = new HttpBridgeError( + HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + exception.getMessage() + ); + HttpUtils.sendResponse(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + BridgeContentType.KAFKA_JSON, JsonUtils.jsonToBytes(error.toJson())); + } + })); + } + /** * Get partitions information related to the topic in the HTTP request * diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java index af2fc245..5c4747e3 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java @@ -157,6 +157,7 @@ public void start(Promise startPromise) { routerBuilder.operation(this.SEEK_TO_END.getOperationId().toString()).handler(this.SEEK_TO_END); routerBuilder.operation(this.LIST_TOPICS.getOperationId().toString()).handler(this.LIST_TOPICS); routerBuilder.operation(this.GET_TOPIC.getOperationId().toString()).handler(this.GET_TOPIC); + routerBuilder.operation(this.CREATE_TOPIC.getOperationId().toString()).handler(this.CREATE_TOPIC); routerBuilder.operation(this.LIST_PARTITIONS.getOperationId().toString()).handler(this.LIST_PARTITIONS); routerBuilder.operation(this.GET_PARTITION.getOperationId().toString()).handler(this.GET_PARTITION); routerBuilder.operation(this.GET_OFFSETS.getOperationId().toString()).handler(this.GET_OFFSETS); @@ -381,6 +382,11 @@ private void getTopic(RoutingContext routingContext) { processAdminClient(routingContext); } + private void createTopic(RoutingContext routingContext) { + this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC); + processAdminClient(routingContext); + } + private void listPartitions(RoutingContext routingContext) { this.httpBridgeContext.setOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS); processAdminClient(routingContext); @@ -726,6 +732,14 @@ public void process(RoutingContext routingContext) { } }; + final HttpOpenApiOperation CREATE_TOPIC = new HttpOpenApiOperation(HttpOpenApiOperations.CREATE_TOPIC) { + + @Override + public void process(RoutingContext routingContext) { + createTopic(routingContext); + } + }; + final HttpOpenApiOperation LIST_PARTITIONS = new HttpOpenApiOperation(HttpOpenApiOperations.LIST_PARTITIONS) { @Override diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java index eef44825..869c9452 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpOpenApiOperations.java @@ -28,6 +28,8 @@ public enum HttpOpenApiOperations { LIST_TOPICS("listTopics"), /** get information for a specific topic */ GET_TOPIC("getTopic"), + /** creates a topic with specified name */ + CREATE_TOPIC("createTopic"), /** list partitions for a specific topic */ LIST_PARTITIONS("listPartitions"), /** get partition information for a specific topic */ diff --git a/src/main/resources/openapi.json b/src/main/resources/openapi.json index 504a2e87..e14b1a40 100644 --- a/src/main/resources/openapi.json +++ b/src/main/resources/openapi.json @@ -753,6 +753,31 @@ } ] }, + "/admin/topics": { + "post": { + "tags": [ + "Topics" + ], + "description": "Creates a topic with given name, partitions count, and replication factor.", + "operationId": "createTopic", + "requestBody": { + "description": "Creates a topic with given name, partitions count, and replication factor.", + "content": { + "application/vnd.kafka.json.v2+json": { + "schema": { + "$ref": "#/components/schemas/NewTopic" + } + } + }, + "required": true + }, + "responses": { + "201": { + "description": "Created" + } + } + } + }, "/consumers/{groupid}/instances/{name}/records": { "get": { "tags": [ @@ -2193,6 +2218,33 @@ } ], "nullable" : true + }, + "NewTopic": { + "title": "NewTopic", + "type": "object", + "properties": { + "topic_name": { + "description": "Name of the topic to create.", + "type": "string" + }, + "partitions_count": { + "description": "Number of partitions for the topic.", + "type": "integer", + "nullable": true + }, + "replication_factor": { + "description": "Number of replicas for each partition.", + "type": "integer", + "nullable": true + } + }, + "required": ["topic_name"], + "additionalProperties": false, + "example": { + "topic_name": "my-topic", + "partitions_count": 3, + "replication_factor": 2 + } } } }, diff --git a/src/main/resources/openapiv2.json b/src/main/resources/openapiv2.json index a5e4eab7..e9d59a0a 100644 --- a/src/main/resources/openapiv2.json +++ b/src/main/resources/openapiv2.json @@ -686,6 +686,36 @@ } ] }, + "/admin/topics": { + "post": { + "tags": [ + "Topics" + ], + "description": "Creates a topic with given name, partitions count, and replication factor.", + "operationId": "createTopic", + "consumes": [ + "application/vnd.kafka.v2+json" + ], + "produces": [ + "application/vnd.kafka.v2+json" + ], + "parameters": [ + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/NewTopic" + } + } + ], + "responses": { + "201": { + "description": "Created" + } + } + } + }, "/consumers/{groupid}/instances/{name}/records": { "get": { "tags": [ @@ -2027,6 +2057,31 @@ "string", "null" ] + }, + "NewTopic": { + "title": "NewTopic", + "type": "object", + "properties": { + "topic_name": { + "description": "Name of the topic to create.", + "type": "string" + }, + "partitions_count": { + "description": "Number of partitions for the topic.", + "type": "integer" + }, + "replication_factor": { + "description": "Number of replicas for each partition.", + "type": "integer" + } + }, + "required": ["topic_name"], + "additionalProperties": false, + "example": { + "topic_name": "my-topic", + "partitions_count": 3, + "replication_factor": 2 + } } }, "tags": [ diff --git a/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java b/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java index df6cdd78..bc8ae7e5 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/AdminClientIT.java @@ -215,4 +215,62 @@ void setupTopic(VertxTestContext context, String topic, int partitions, int coun }); producer.get(TEST_TIMEOUT, TimeUnit.SECONDS); } + + @Test + void createTopicBlankBodyTest(VertxTestContext context) { + JsonObject jsonObject = new JsonObject(); + + // create topic test without name, partitions and replication factor + baseService() + .postRequest("/admin/topics") + .as(BodyCodec.jsonObject()) + .sendJsonObject(jsonObject, ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); + }); + context.completeNow(); + }); + } + + @Test + void createTopicTest(VertxTestContext context) { + JsonObject jsonObject = new JsonObject(); + + // create topic test without partitions and replication factor + jsonObject.put("topic_name", topic); + baseService() + .postRequest("/admin/topics") + .as(BodyCodec.jsonObject()) + .sendJsonObject(jsonObject, ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.CREATED.code())); + }); + context.completeNow(); + }); + } + + @Test + void createTopicAllParametersTest(VertxTestContext context) { + JsonObject jsonObject = new JsonObject(); + + // create topic test with 1 partition and 1 replication factor + jsonObject.put("topic_name", topic); + jsonObject.put("partitions_count", 1); + jsonObject.put("replication_factor", 1); + baseService() + .postRequest("/admin/topics") + .as(BodyCodec.jsonObject()) + .sendJsonObject(jsonObject, ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.CREATED.code())); + }); + context.completeNow(); + }); + } } diff --git a/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java b/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java index 7586ea7c..1237f7ac 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/OtherServicesIT.java @@ -151,6 +151,8 @@ void openapiTest(VertxTestContext context) { assertThat(paths.containsKey("/topics/{topicname}/partitions/{partitionid}/offsets"), is(true)); assertThat(paths.containsKey("/topics/{topicname}/partitions"), is(true)); assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/topics/{topicname}/partitions/{partitionid}").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.SEND_TO_PARTITION.toString())); + assertThat(paths.containsKey("/admin/topics"), is(true)); + assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/admin/topics").getJsonObject("post").getString("operationId"), is(HttpOpenApiOperations.CREATE_TOPIC.toString())); assertThat(paths.containsKey("/healthy"), is(true)); assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/healthy").getJsonObject("get").getString("operationId"), is(HttpOpenApiOperations.HEALTHY.toString())); assertThat(paths.containsKey("/ready"), is(true)); @@ -160,7 +162,7 @@ void openapiTest(VertxTestContext context) { assertThat(paths.containsKey("/"), is(true)); assertThat(bridgeResponse.getJsonObject("paths").getJsonObject("/").getJsonObject("get").getString("operationId"), is(HttpOpenApiOperations.INFO.toString())); assertThat(paths.containsKey("/karel"), is(false)); - assertThat(bridgeResponse.getJsonObject("definitions").getMap().size(), is(27)); + assertThat(bridgeResponse.getJsonObject("definitions").getMap().size(), is(28)); assertThat(bridgeResponse.getJsonArray("tags").size(), is(4)); }); context.completeNow();