diff --git a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/GlobalExceptionHandler.java b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/GlobalExceptionHandler.java new file mode 100644 index 00000000..1dd8d46a --- /dev/null +++ b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/GlobalExceptionHandler.java @@ -0,0 +1,61 @@ +package com.example.springbootkafkasample.config; + +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.http.HttpStatus; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.ProblemDetail; +import org.springframework.http.converter.HttpMessageNotReadableException; +import org.springframework.validation.FieldError; +import org.springframework.web.bind.MethodArgumentNotValidException; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.ResponseStatus; + +@Order(Ordered.HIGHEST_PRECEDENCE) +@ControllerAdvice +class GlobalExceptionHandler { + + @ExceptionHandler(MethodArgumentNotValidException.class) + @ResponseStatus(HttpStatus.BAD_REQUEST) + ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) { + ProblemDetail problemDetail = + ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), "Invalid request content."); + problemDetail.setTitle("Constraint Violation"); + List validationErrorsList = methodArgumentNotValidException.getAllErrors().stream() + .map(objectError -> { + FieldError fieldError = (FieldError) objectError; + return new ApiValidationError( + fieldError.getObjectName(), + fieldError.getField(), + fieldError.getRejectedValue(), + Objects.requireNonNull(fieldError.getDefaultMessage(), "")); + }) + .sorted(Comparator.comparing(ApiValidationError::field)) + .toList(); + problemDetail.setProperty("violations", validationErrorsList); + return problemDetail; + } + + @ExceptionHandler(IllegalArgumentException.class) + ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) { + ProblemDetail problemDetail = ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), ex.getMessage()); + problemDetail.setTitle("Bad Request"); + return problemDetail; + } + + @ExceptionHandler(HttpMessageNotReadableException.class) + public ProblemDetail handleInvalidEnum(HttpMessageNotReadableException ex) { + if (ex.getMessage() + .contains("Cannot deserialize value of type `com.example.springbootkafkasample.dto.Operation`")) { + return ProblemDetail.forStatusAndDetail( + HttpStatusCode.valueOf(400), "Invalid operation value. Allowed values are: START, STOP."); + } + return ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), "Invalid request."); + } + + record ApiValidationError(String object, String field, Object rejectedValue, String message) {} +} diff --git a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/KafkaConfig.java b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/KafkaConfig.java index 7ca55746..485f5c06 100644 --- a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/KafkaConfig.java +++ b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/KafkaConfig.java @@ -8,6 +8,7 @@ import org.springframework.kafka.annotation.KafkaListenerConfigurer; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.lang.NonNull; import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean; @Configuration @@ -23,7 +24,7 @@ public KafkaConfig(LocalValidatorFactoryBean validator) { } @Override - public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { + public void configureKafkaListeners(@NonNull KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(this.validator); } diff --git a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java index 9c7c6143..ecb03170 100644 --- a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java +++ b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java @@ -1,10 +1,19 @@ package com.example.springbootkafkasample.controller; +import com.example.springbootkafkasample.dto.KafkaListenerRequest; import com.example.springbootkafkasample.dto.MessageDTO; import com.example.springbootkafkasample.dto.TopicInfo; import com.example.springbootkafkasample.service.MessageService; import com.example.springbootkafkasample.service.sender.Sender; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import jakarta.validation.Valid; import java.util.List; +import java.util.Map; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -22,6 +31,28 @@ public MessageRestController(Sender sender, MessageService messageService) { this.messageService = messageService; } + @GetMapping("/listeners") + public ResponseEntity> getListeners() { + return ResponseEntity.ok(messageService.getListenersState()); + } + + @Operation(summary = "Update the state of a Kafka listener") + @ApiResponses( + value = { + @ApiResponse( + responseCode = "200", + description = "Listener state updated successfully", + content = + @Content(mediaType = "application/json", schema = @Schema(implementation = Map.class))), + @ApiResponse(responseCode = "400", description = "Invalid request", content = @Content), + @ApiResponse(responseCode = "404", description = "Listener not found", content = @Content) + }) + @PostMapping("/listeners") + public ResponseEntity> updateListenerState( + @RequestBody @Valid final KafkaListenerRequest kafkaListenerRequest) { + return ResponseEntity.ok(messageService.updateListenerState(kafkaListenerRequest)); + } + @PostMapping("/messages") public void sendMessage(@RequestBody MessageDTO messageDTO) { this.sender.send(messageDTO); diff --git a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/KafkaListenerRequest.java b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/KafkaListenerRequest.java new file mode 100644 index 00000000..493cbc3b --- /dev/null +++ b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/KafkaListenerRequest.java @@ -0,0 +1,8 @@ +package com.example.springbootkafkasample.dto; + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; + +public record KafkaListenerRequest( + @NotBlank(message = "Container ID must not be blank") String containerId, + @NotNull(message = "Operation must not be null") Operation operation) {} diff --git a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/Operation.java b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/Operation.java new file mode 100644 index 00000000..7cda525e --- /dev/null +++ b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/Operation.java @@ -0,0 +1,6 @@ +package com.example.springbootkafkasample.dto; + +public enum Operation { + START, + STOP +} diff --git a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java index c21ed138..6ec2cbfb 100644 --- a/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java +++ b/kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java @@ -1,5 +1,7 @@ package com.example.springbootkafkasample.service; +import com.example.springbootkafkasample.dto.KafkaListenerRequest; +import com.example.springbootkafkasample.dto.Operation; import com.example.springbootkafkasample.dto.TopicInfo; import java.util.ArrayList; import java.util.Comparator; @@ -8,20 +10,25 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.TopicDescription; import org.springframework.kafka.KafkaException; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Service; @Service public class MessageService { private final KafkaAdmin kafkaAdmin; + private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; - public MessageService(KafkaAdmin kafkaAdmin) { + public MessageService(KafkaAdmin kafkaAdmin, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) { this.kafkaAdmin = kafkaAdmin; + this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry; } public List getTopicsWithPartitions(boolean showInternalTopics) { @@ -57,4 +64,29 @@ public List getTopicsWithPartitions(boolean showInternalTopics) { topicPartitionCounts.sort(Comparator.comparing(TopicInfo::topicName)); return topicPartitionCounts; } + + public Map getListenersState() { + return kafkaListenerEndpointRegistry.getListenerContainers().stream() + .collect( + Collectors.toMap(MessageListenerContainer::getListenerId, MessageListenerContainer::isRunning)); + } + + public Map updateListenerState(KafkaListenerRequest kafkaListenerRequest) { + MessageListenerContainer listenerContainer = + kafkaListenerEndpointRegistry.getListenerContainer(kafkaListenerRequest.containerId()); + if (listenerContainer == null) { + throw new IllegalArgumentException( + "Listener container with ID '" + kafkaListenerRequest.containerId() + "' not found"); + } + if (kafkaListenerRequest.operation().equals(Operation.START)) { + if (!listenerContainer.isRunning()) { + listenerContainer.start(); + } + } else if (kafkaListenerRequest.operation().equals(Operation.STOP)) { + if (listenerContainer.isRunning()) { + listenerContainer.stop(); + } + } + return getListenersState(); + } } diff --git a/kafka-spring-boot/boot-kafka-sample/src/main/resources/application.properties b/kafka-spring-boot/boot-kafka-sample/src/main/resources/application.properties index 321460de..f36e91c1 100644 --- a/kafka-spring-boot/boot-kafka-sample/src/main/resources/application.properties +++ b/kafka-spring-boot/boot-kafka-sample/src/main/resources/application.properties @@ -8,4 +8,6 @@ spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.spring spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.UUIDSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer -spring.threads.virtual.enabled=true \ No newline at end of file + +spring.threads.virtual.enabled=true +spring.mvc.problemdetails.enabled=true diff --git a/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleApplicationTests.java b/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleApplicationTests.java index bce430be..7702cc44 100644 --- a/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleApplicationTests.java +++ b/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleApplicationTests.java @@ -2,15 +2,12 @@ import static com.example.springbootkafkasample.config.Initializer.TOPIC_TEST_1; import static com.example.springbootkafkasample.config.Initializer.TOPIC_TEST_2; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import com.example.springbootkafkasample.dto.MessageDTO; import com.example.springbootkafkasample.service.listener.Receiver2; -import java.time.Duration; import java.util.UUID; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -67,15 +64,6 @@ void setUp() { } } - @Test - void sendAndReceiveData() throws InterruptedException { - template.send(TOPIC_TEST_1, UUID.randomUUID(), new MessageDTO(TOPIC_TEST_1, "foo")); - // 4 from topic1 and 3 from topic2 on startUp, plus 1 from test - await().pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(45)) - .untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(9)); - } - @Test void topicsWithPartitionsCount() throws Exception { diff --git a/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java b/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java index 13c7eedf..87470ae9 100644 --- a/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java +++ b/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java @@ -9,10 +9,14 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import com.example.springbootkafkasample.common.ContainerConfig; +import com.example.springbootkafkasample.dto.KafkaListenerRequest; import com.example.springbootkafkasample.dto.MessageDTO; +import com.example.springbootkafkasample.dto.Operation; import com.example.springbootkafkasample.service.listener.Receiver2; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.Duration; +import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -23,7 +27,7 @@ import org.springframework.http.MediaType; import org.springframework.test.web.servlet.MockMvc; -@SpringBootTest(classes = TestBootKafkaSampleApplication.class) +@SpringBootTest(classes = ContainerConfig.class) @AutoConfigureMockMvc @TestMethodOrder(MethodOrderer.OrderAnnotation.class) class KafkaSampleIntegrationTest { @@ -40,6 +44,7 @@ class KafkaSampleIntegrationTest { @Test @Order(1) void sendAndReceiveMessage() throws Exception { + long initialCount = receiver2.getLatch().getCount(); this.mockMvc .perform(post("/messages") .content(this.objectMapper.writeValueAsString(new MessageDTO("test_1", "junitTest"))) @@ -48,8 +53,8 @@ void sendAndReceiveMessage() throws Exception { // 4 from topic1 and 3 from topic2 on startUp, plus 1 from test await().pollInterval(Duration.ofSeconds(1)) - .atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(7)); + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(initialCount - 1)); assertThat(receiver2.getDeadLetterLatch().getCount()).isEqualTo(1); } @@ -97,4 +102,91 @@ void topicsWithPartitionsCount() throws Exception { .andExpect(jsonPath("$[6].partitionCount").value(32)) .andExpect(jsonPath("$[6].replicationCount").value(1)); } + + @Test + void getListOfContainers() throws Exception { + this.mockMvc + .perform(get("/listeners")) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andExpect(jsonPath("$.size()").value(5)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']") + .value(true)); + } + + @Test + void stopAndStartContainers() throws Exception { + this.mockMvc + .perform(post("/listeners") + .content(this.objectMapper.writeValueAsString(new KafkaListenerRequest( + "org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP))) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andExpect(jsonPath("$.size()").value(5)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']") + .value(false)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']") + .value(true)); + this.mockMvc + .perform(post("/listeners") + .content(this.objectMapper.writeValueAsString(new KafkaListenerRequest( + "org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.START))) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andExpect(jsonPath("$.size()").value(5)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']") + .value(true)) + .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']") + .value(true)); + } + + @Test + void invalidContainerOperation() throws Exception { + this.mockMvc + .perform(post("/listeners") + .content(objectMapper.writeValueAsString( + new KafkaListenerRequest("invalid-container-id", Operation.STOP))) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isBadRequest()) + .andExpect(content().contentType(MediaType.APPLICATION_PROBLEM_JSON_VALUE)); + } + + @Test + void whenInvalidOperation_thenReturnsBadRequest() throws Exception { + String invalidRequest = "{ \"containerId\": \"myListener\", \"operation\": \"INVALID\" }"; + + mockMvc.perform(post("/listeners") + .contentType(MediaType.APPLICATION_JSON) + .content(invalidRequest)) + .andExpect(status().isBadRequest()) + .andExpect(content().contentType(MediaType.APPLICATION_PROBLEM_JSON_VALUE)) + .andExpect(jsonPath("$.type", CoreMatchers.is("about:blank"))) + .andExpect(jsonPath("$.title", CoreMatchers.is("Bad Request"))) + .andExpect(jsonPath("$.status", CoreMatchers.is(400))) + .andExpect(jsonPath( + "$.detail", CoreMatchers.is("Invalid operation value. Allowed values are: START, STOP."))) + .andExpect(jsonPath("$.instance", CoreMatchers.is("/listeners"))); + } } diff --git a/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/TestBootKafkaSampleApplication.java b/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/TestBootKafkaSampleApplication.java index 1b32d491..9e8785f7 100644 --- a/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/TestBootKafkaSampleApplication.java +++ b/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/TestBootKafkaSampleApplication.java @@ -1,27 +1,13 @@ package com.example.springbootkafkasample; +import com.example.springbootkafkasample.common.ContainerConfig; import org.springframework.boot.SpringApplication; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.testcontainers.service.connection.ServiceConnection; -import org.springframework.context.annotation.Bean; -import org.testcontainers.kafka.KafkaContainer; -import org.testcontainers.utility.DockerImageName; -@TestConfiguration(proxyBeanMethods = false) public class TestBootKafkaSampleApplication { - @Bean - @ServiceConnection - KafkaContainer kafkaContainer() { - KafkaContainer kafkaContainer = - new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.0")); - kafkaContainer.addEnv("KAFKA_NUM_PARTITIONS", "32"); - return kafkaContainer; - } - public static void main(String[] args) { SpringApplication.from(BootKafkaSampleApplication::main) - .with(TestBootKafkaSampleApplication.class) + .with(ContainerConfig.class) .run(args); } } diff --git a/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/common/ContainerConfig.java b/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/common/ContainerConfig.java new file mode 100644 index 00000000..df0ccad8 --- /dev/null +++ b/kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/common/ContainerConfig.java @@ -0,0 +1,20 @@ +package com.example.springbootkafkasample.common; + +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.context.annotation.Bean; +import org.testcontainers.kafka.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +@TestConfiguration(proxyBeanMethods = false) +public class ContainerConfig { + + @Bean + @ServiceConnection + KafkaContainer kafkaContainer() { + KafkaContainer kafkaContainer = + new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.1")); + kafkaContainer.addEnv("KAFKA_NUM_PARTITIONS", "32"); + return kafkaContainer; + } +}