Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : enable and disable kafka listener #600

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.example.springbootkafkasample.config;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ProblemDetail;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;

@Order(Ordered.HIGHEST_PRECEDENCE)
@ControllerAdvice
class GlobalExceptionHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make the GlobalExceptionHandler class public to ensure Spring can detect it

The GlobalExceptionHandler class is currently package-private. To allow Spring's @ControllerAdvice to automatically detect and apply this global exception handler across the application, the class should be declared as public.

Apply this diff to update the class access modifier:

-class GlobalExceptionHandler {
+public class GlobalExceptionHandler {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class GlobalExceptionHandler {
public class GlobalExceptionHandler {


@ExceptionHandler(MethodArgumentNotValidException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Declare the onException method as public for proper accessibility

The onException method is package-private. Exception handler methods annotated with @ExceptionHandler should be public to ensure they are accessible to Spring's exception handling mechanism.

Apply this diff to change the method's access modifier:

-ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) {
+public ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) {
public ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) {

ProblemDetail problemDetail =
ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), "Invalid request content.");
problemDetail.setTitle("Constraint Violation");
List<ApiValidationError> validationErrorsList = methodArgumentNotValidException.getAllErrors().stream()
.map(objectError -> {
FieldError fieldError = (FieldError) objectError;
return new ApiValidationError(
fieldError.getObjectName(),
fieldError.getField(),
fieldError.getRejectedValue(),
Objects.requireNonNull(fieldError.getDefaultMessage(), ""));
})
.sorted(Comparator.comparing(ApiValidationError::field))
.toList();
problemDetail.setProperty("violations", validationErrorsList);
return problemDetail;
}

@ExceptionHandler(IllegalArgumentException.class)
ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Declare the handleIllegalArgumentException method as public

Similar to the previous method, handleIllegalArgumentException is package-private. It should be declared as public to be recognized by Spring's exception handling.

Apply this diff to change the method's access modifier:

-ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) {
+public ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) {
public ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) {

ProblemDetail problemDetail = ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), ex.getMessage());
problemDetail.setTitle("Bad Request");
return problemDetail;
}

@ExceptionHandler(HttpMessageNotReadableException.class)
public ProblemDetail handleInvalidEnum(HttpMessageNotReadableException ex) {
if (ex.getMessage()
.contains("Cannot deserialize value of type `com.example.springbootkafkasample.dto.Operation`")) {
return ProblemDetail.forStatusAndDetail(
HttpStatusCode.valueOf(400), "Invalid operation value. Allowed values are: START, STOP.");
}
return ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), "Invalid request.");
}

record ApiValidationError(String object, String field, Object rejectedValue, String message) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.lang.NonNull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Addition of @nonnull is safe but needs to be applied consistently

The verification revealed another implementation of KafkaListenerConfigurer in the multi-producers-consumers module that doesn't use @NonNull. While the annotation addition is good practice, it should be applied consistently across all implementations. The other implementation should be updated to maintain consistency in the null-safety contract.

  • kafka-spring-boot/boot-multiple-producers-consumers/src/main/java/com/example/springbootkafka/multi/config/KafkaConfig.java: Add @NonNull to the configureKafkaListeners parameter to match the contract
🔗 Analysis chain

LGTM! Good addition of null-safety.

The addition of @NonNull annotation improves the contract of the configureKafkaListeners method by making null-safety explicit. This is aligned with Spring Framework's null-safety initiative.

Let's verify that this change doesn't conflict with any custom implementations:

Also applies to: 27-27

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any custom implementations of KafkaListenerConfigurer
# that might be affected by the stricter null check

# Search for KafkaListenerConfigurer implementations
ast-grep --pattern 'class $_ implements $$$KafkaListenerConfigurer$$$'

# Search for direct calls to configureKafkaListeners
rg -l "configureKafkaListeners.*registrar"

Length of output: 349


Script:

#!/bin/bash
# Let's check the implementation in the other KafkaConfig file to see if it also has @NonNull
# and verify there are no conflicts

# Check the implementation in boot-multiple-producers-consumers
cat kafka-spring-boot/boot-multiple-producers-consumers/src/main/java/com/example/springbootkafka/multi/config/KafkaConfig.java

# Also check if there are any other files importing KafkaListenerConfigurer
# to ensure we haven't missed any implementations
rg -l "import.*KafkaListenerConfigurer"

Length of output: 5852

import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;

@Configuration
Expand All @@ -23,7 +24,7 @@ public KafkaConfig(LocalValidatorFactoryBean validator) {
}

@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
public void configureKafkaListeners(@NonNull KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package com.example.springbootkafkasample.controller;

import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.dto.TopicInfo;
import com.example.springbootkafkasample.service.MessageService;
import com.example.springbootkafkasample.service.sender.Sender;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import jakarta.validation.Valid;
import java.util.List;
import java.util.Map;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand All @@ -22,6 +31,28 @@ public MessageRestController(Sender sender, MessageService messageService) {
this.messageService = messageService;
}

@GetMapping("/listeners")
public ResponseEntity<Map<String, Boolean>> getListeners() {
return ResponseEntity.ok(messageService.getListenersState());
}

@Operation(summary = "Update the state of a Kafka listener")
@ApiResponses(
value = {
@ApiResponse(
responseCode = "200",
description = "Listener state updated successfully",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = Map.class))),
@ApiResponse(responseCode = "400", description = "Invalid request", content = @Content),
@ApiResponse(responseCode = "404", description = "Listener not found", content = @Content)
})
@PostMapping("/listeners")
public ResponseEntity<Map<String, Boolean>> updateListenerState(
@RequestBody @Valid final KafkaListenerRequest kafkaListenerRequest) {
return ResponseEntity.ok(messageService.updateListenerState(kafkaListenerRequest));
}
Comment on lines +50 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate request body and handle validation errors

Ensure that KafkaListenerRequest is validated and any constraint violations return meaningful error responses. Currently, validation annotations are used, but there is no exception handler for MethodArgumentNotValidException.

Consider adding an exception handler:

@ExceptionHandler(MethodArgumentNotValidException.class)
public ResponseEntity<String> handleValidationExceptions(MethodArgumentNotValidException ex) {
    String errorMessage = ex.getBindingResult().getAllErrors().stream()
        .map(DefaultMessageSourceResolvable::getDefaultMessage)
        .collect(Collectors.joining(", "));
    return ResponseEntity.badRequest().body(errorMessage);
}


@PostMapping("/messages")
public void sendMessage(@RequestBody MessageDTO messageDTO) {
this.sender.send(messageDTO);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.example.springbootkafkasample.dto;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;

public record KafkaListenerRequest(
@NotBlank(message = "Container ID must not be blank") String containerId,
@NotNull(message = "Operation must not be null") Operation operation) {}
Comment on lines +1 to +8
Copy link
Contributor

@coderabbitai coderabbitai bot Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Security controls needed for Kafka listener management endpoint

Based on the verification results:

  1. The Operation enum is simple with just START/STOP operations
  2. The endpoint /listeners is exposed without any security controls
  3. Basic validation exists for containerId and invalid container handling
  4. No audit logging is implemented for listener state changes

Required security improvements:

  • Add Spring Security with role-based access control to /listeners endpoint
  • Implement audit logging for listener state changes
  • Consider rate limiting to prevent potential DoS attacks
  • Add request origin validation if the endpoint is exposed externally
🔗 Analysis chain

Verify security implications and Operation enum usage

The ability to start/stop Kafka listeners could have security implications. Consider:

  1. Adding role-based access control
  2. Audit logging for listener state changes
  3. Validation that the containerId exists before operations

Let's verify the Operation enum implementation and usage:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find and display the Operation enum definition
echo "Looking for Operation enum definition:"
ast-grep --pattern 'enum Operation {
  $$$
}'

# Find all usages of KafkaListenerRequest to verify security measures
echo -e "\nChecking for security controls in endpoints using KafkaListenerRequest:"
rg -l "KafkaListenerRequest" | xargs rg -A 5 "@(PreAuthorize|Secured|RolesAllowed)"

Length of output: 377


Script:

#!/bin/bash
# Let's try a different approach to find the Operation enum
echo "Looking for Operation enum definition:"
fd -e java | xargs rg -l "enum Operation"

echo -e "\nChecking Operation enum content:"
fd -e java | xargs rg -A 5 "enum Operation"

echo -e "\nChecking KafkaListenerRequest usage in controllers/services:"
fd -e java | xargs rg -B 2 -A 5 "KafkaListenerRequest"

echo -e "\nChecking for any security configurations:"
fd -e java -e xml -e yml -e yaml -e properties | xargs rg "security|Security|@Secured|@PreAuthorize|WebSecurity"

Length of output: 13815

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai, please help me to implement above security improvements

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajadilipkolli, I'd be happy to help you implement the security improvements. Here are the steps to enhance the security of your /listeners endpoint:


1. Add Spring Security with Role-Based Access Control

a. Add Spring Security Dependency

Include the Spring Security starter dependency in your pom.xml:

<!-- Add Spring Security starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-security</artifactId>
</dependency>

b. Create a Security Configuration Class

Create a new class SecurityConfig in the config package to configure security settings:

package com.example.springbootkafkasample.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.web.configuration.WebSecurityCustomizer;
import org.springframework.security.web.SecurityFilterChain;
import static org.springframework.security.config.Customizer.withDefaults;

@Configuration
@EnableMethodSecurity
public class SecurityConfig {

    @Bean
    public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
        http
            .authorizeHttpRequests(authorize -> authorize
                .requestMatchers("/listeners/**").hasRole("ADMIN")
                .anyRequest().permitAll()
            )
            .httpBasic(withDefaults())
            .csrf().disable(); // Disable CSRF for simplicity; consider enabling it in production.

        return http.build();
    }

    @Bean
    public WebSecurityCustomizer webSecurityCustomizer() {
        return (web) -> web.debug(false);
    }
}

c. Configure an In-Memory User for Testing

For demonstration purposes, you can set up an in-memory user with the ADMIN role:

import org.springframework.context.annotation.Bean;
import org.springframework.security.provisioning.InMemoryUserDetailsManager;
import org.springframework.security.core.userdetails.User;

@Bean
public InMemoryUserDetailsManager userDetailsService() {
    return new InMemoryUserDetailsManager(
        User.withUsername("admin")
            .password("{noop}adminpass") // {noop} specifies that no encoding is required
            .roles("ADMIN")
            .build()
    );
}

d. Secure the Controller Methods

Ensure that only users with the ADMIN role can access the /listeners endpoints. You can use method-level security annotations:

package com.example.springbootkafkasample.controller;

import org.springframework.security.access.prepost.PreAuthorize;

//...

@RestController
@RequestMapping("/listeners")
public class MessageRestController {

    //...

    @PreAuthorize("hasRole('ADMIN')")
    @GetMapping
    public ResponseEntity<Map<String, Boolean>> getListeners() {
        // Existing code
    }

    @PreAuthorize("hasRole('ADMIN')")
    @PostMapping
    public ResponseEntity<Map<String, Boolean>> updateListenerState(@RequestBody KafkaListenerRequest kafkaListenerRequest) {
        // Existing code
    }
}

2. Implement Audit Logging for Listener State Changes

a. Create an Audit Aspect

Use Spring AOP to log listener state changes:

package com.example.springbootkafkasample.aspect;

import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Aspect
@Component
public class AuditAspect {

    private static final Logger logger = LoggerFactory.getLogger(AuditAspect.class);

    @AfterReturning(pointcut = "execution(* com.example.springbootkafkasample.service.MessageService.updateListenerState(..)) && args(request)", returning = "result")
    public void logListenerStateChange(JoinPoint joinPoint, KafkaListenerRequest request, Object result) {
        String containerId = request.containerId();
        String operation = request.operation().name();
        logger.info("Listener with containerId '{}' performed operation '{}'", containerId, operation);
    }
}

b. Enable AspectJ Auto-Proxy Support

Make sure AspectJ proxy support is enabled:

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;

@Configuration
@EnableAspectJAutoProxy
public class AppConfig {
    // Other beans if necessary
}

3. Implement Rate Limiting to Prevent DoS Attacks

a. Add Bucket4J Dependency

Include the Bucket4J library for rate limiting:

<!-- Add Bucket4J dependency -->
<dependency>
    <groupId>com.giffing.bucket4j.spring.boot.starter</groupId>
    <artifactId>bucket4j-spring-boot-starter</artifactId>
    <version>0.7.0</version>
</dependency>

b. Configure Rate Limiting

Create a rate-limiting filter:

package com.example.springbootkafkasample.config;

import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Refill;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class RateLimitingFilter extends OncePerRequestFilter {

    private final Map<String, Bucket> cache = new ConcurrentHashMap<>();

    @Override
    protected void doFilterInternal(HttpServletRequest request,
                                    HttpServletResponse response,
                                    FilterChain filterChain) throws ServletException, IOException {
        String clientIP = request.getRemoteAddr();
        Bucket bucket = cache.computeIfAbsent(clientIP, this::newBucket);

        if (bucket.tryConsume(1)) {
            filterChain.doFilter(request, response);
        } else {
            response.setStatus(HttpServletResponse.SC_TOO_MANY_REQUESTS);
            response.getWriter().write("Too many requests");
        }
    }

    private Bucket newBucket(String key) {
        return Bucket.builder()
                .addLimit(Bandwidth.classic(5, Refill.greedy(5, Duration.ofMinutes(1))))
                .build();
    }
}

c. Register the Filter

Add the filter to your security configuration:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;

// In your SecurityConfig class

@Autowired
private RateLimitingFilter rateLimitingFilter;

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
    http
        // Existing configurations
        .addFilterBefore(rateLimitingFilter, UsernamePasswordAuthenticationFilter.class);
    return http.build();
}

4. Add Request Origin Validation

If your endpoint is exposed externally, validate the request origin.

a. Configure CORS in Security Configuration

// In your SecurityConfig class

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
    http
        // Existing configurations
        .cors().configurationSource(request -> {
            CorsConfiguration config = new CorsConfiguration();
            config.setAllowedOrigins(List.of("https://your-trusted-domain.com"));
            config.setAllowedMethods(List.of("GET", "POST"));
            return config;
        })
        .and()
        .csrf().disable();
    return http.build();
}

b. Set Up Additional Origin Checks (Optional)

For extra security, you can validate the Origin header in your controller:

// In your MessageRestController

@PostMapping
public ResponseEntity<Map<String, Boolean>> updateListenerState(
        @RequestBody KafkaListenerRequest kafkaListenerRequest,
        @RequestHeader(value = "Origin", required = false) String origin) {

    if (origin == null || !origin.equals("https://your-trusted-domain.com")) {
        return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
    }

    // Existing code
}

Note: Remember to replace placeholders like "https://your-trusted-domain.com", usernames, passwords, and other configurations with values appropriate for your environment.

Let me know if you have any questions or need further assistance!

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.example.springbootkafkasample.dto;

public enum Operation {
START,
STOP
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.example.springbootkafkasample.service;

import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.Operation;
import com.example.springbootkafkasample.dto.TopicInfo;
import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -8,20 +10,25 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
public class MessageService {

private final KafkaAdmin kafkaAdmin;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

public MessageService(KafkaAdmin kafkaAdmin) {
public MessageService(KafkaAdmin kafkaAdmin, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
this.kafkaAdmin = kafkaAdmin;
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
}

public List<TopicInfo> getTopicsWithPartitions(boolean showInternalTopics) {
Expand Down Expand Up @@ -57,4 +64,29 @@ public List<TopicInfo> getTopicsWithPartitions(boolean showInternalTopics) {
topicPartitionCounts.sort(Comparator.comparing(TopicInfo::topicName));
return topicPartitionCounts;
}

public Map<String, Boolean> getListenersState() {
return kafkaListenerEndpointRegistry.getListenerContainers().stream()
.collect(
Collectors.toMap(MessageListenerContainer::getListenerId, MessageListenerContainer::isRunning));
}

public Map<String, Boolean> updateListenerState(KafkaListenerRequest kafkaListenerRequest) {
MessageListenerContainer listenerContainer =
kafkaListenerEndpointRegistry.getListenerContainer(kafkaListenerRequest.containerId());
if (listenerContainer == null) {
throw new IllegalArgumentException(
"Listener container with ID '" + kafkaListenerRequest.containerId() + "' not found");
}
if (kafkaListenerRequest.operation().equals(Operation.START)) {
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
} else if (kafkaListenerRequest.operation().equals(Operation.STOP)) {
if (listenerContainer.isRunning()) {
listenerContainer.stop();
}
}
return getListenersState();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.spring

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.UUIDSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.threads.virtual.enabled=true

spring.threads.virtual.enabled=true
spring.mvc.problemdetails.enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import static com.example.springbootkafkasample.config.Initializer.TOPIC_TEST_1;
import static com.example.springbootkafkasample.config.Initializer.TOPIC_TEST_2;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.service.listener.Receiver2;
import java.time.Duration;
import java.util.UUID;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -67,15 +64,6 @@ void setUp() {
}
}

@Test
void sendAndReceiveData() throws InterruptedException {
template.send(TOPIC_TEST_1, UUID.randomUUID(), new MessageDTO(TOPIC_TEST_1, "foo"));
// 4 from topic1 and 3 from topic2 on startUp, plus 1 from test
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(45))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(9));
}

@Test
void topicsWithPartitionsCount() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.example.springbootkafkasample.common.ContainerConfig;
import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import com.example.springbootkafkasample.dto.MessageDTO;
import com.example.springbootkafkasample.dto.Operation;
import com.example.springbootkafkasample.service.listener.Receiver2;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -23,7 +27,7 @@
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;

@SpringBootTest(classes = TestBootKafkaSampleApplication.class)
@SpringBootTest(classes = ContainerConfig.class)
@AutoConfigureMockMvc
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class KafkaSampleIntegrationTest {
Expand All @@ -40,6 +44,7 @@ class KafkaSampleIntegrationTest {
@Test
@Order(1)
void sendAndReceiveMessage() throws Exception {
long initialCount = receiver2.getLatch().getCount();
this.mockMvc
.perform(post("/messages")
.content(this.objectMapper.writeValueAsString(new MessageDTO("test_1", "junitTest")))
Expand All @@ -48,8 +53,8 @@ void sendAndReceiveMessage() throws Exception {

// 4 from topic1 and 3 from topic2 on startUp, plus 1 from test
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(30))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(7));
.atMost(Duration.ofSeconds(15))
.untilAsserted(() -> assertThat(receiver2.getLatch().getCount()).isEqualTo(initialCount - 1));
assertThat(receiver2.getDeadLetterLatch().getCount()).isEqualTo(1);
}

Expand Down Expand Up @@ -97,4 +102,91 @@ void topicsWithPartitionsCount() throws Exception {
.andExpect(jsonPath("$[6].partitionCount").value(32))
.andExpect(jsonPath("$[6].replicationCount").value(1));
}

@Test
void getListOfContainers() throws Exception {
this.mockMvc
.perform(get("/listeners"))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
}

@Test
void stopAndStartContainers() throws Exception {
this.mockMvc
.perform(post("/listeners")
.content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
"org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(false))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
this.mockMvc
.perform(post("/listeners")
.content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
"org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.START)))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andExpect(jsonPath("$.size()").value(5))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']")
.value(true))
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']")
.value(true));
}
Comment on lines +125 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reduce assertion duplication by extracting common verification logic

The test has duplicated assertions for verifying container states. Consider extracting a helper method to reduce duplication and improve maintainability.

+    private void assertContainerStates(ResultActions result, String targetContainer, boolean expectedState) 
+            throws Exception {
+        result
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.APPLICATION_JSON))
+            .andExpect(jsonPath("$.size()").value(EXPECTED_CONTAINER_COUNT))
+            .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
+                    .value(targetContainer.contains("#1-dlt") ? expectedState : true))
+            .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
+                    .value(true))
+            // ... repeat for other containers
+    }
+
     @Test
     void stopAndStartContainers() throws Exception {
-        this.mockMvc
-                .perform(post("/listeners")
-                        .content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
-                                "org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
-                        .contentType(MediaType.APPLICATION_JSON))
-                .andExpect(/* ... duplicated assertions ... */);
+        assertContainerStates(
+            this.mockMvc
+                    .perform(post("/listeners")
+                            .content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
+                                    "org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
+                            .contentType(MediaType.APPLICATION_JSON)),
+            "#1-dlt",
+            false
+        );

Committable suggestion skipped: line range outside the PR's diff.


@Test
void invalidContainerOperation() throws Exception {
this.mockMvc
.perform(post("/listeners")
.content(objectMapper.writeValueAsString(
new KafkaListenerRequest("invalid-container-id", Operation.STOP)))
.contentType(MediaType.APPLICATION_JSON))
.andExpect(status().isBadRequest())
.andExpect(content().contentType(MediaType.APPLICATION_PROBLEM_JSON_VALUE));
}

@Test
void whenInvalidOperation_thenReturnsBadRequest() throws Exception {
String invalidRequest = "{ \"containerId\": \"myListener\", \"operation\": \"INVALID\" }";

mockMvc.perform(post("/listeners")
.contentType(MediaType.APPLICATION_JSON)
.content(invalidRequest))
.andExpect(status().isBadRequest())
.andExpect(content().contentType(MediaType.APPLICATION_PROBLEM_JSON_VALUE))
.andExpect(jsonPath("$.type", CoreMatchers.is("about:blank")))
.andExpect(jsonPath("$.title", CoreMatchers.is("Bad Request")))
.andExpect(jsonPath("$.status", CoreMatchers.is(400)))
.andExpect(jsonPath(
"$.detail", CoreMatchers.is("Invalid operation value. Allowed values are: START, STOP.")))
.andExpect(jsonPath("$.instance", CoreMatchers.is("/listeners")));
}
}
Loading
Loading