-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat : enable and disable kafka listener #600
Conversation
WalkthroughThe pull request introduces new functionality to the Changes
Possibly related PRs
Suggested labels
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (5)
kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/TestBootKafkaSampleApplication.java (1)
9-11
: LGTM: Clean integration with ContainerConfigThe integration with ContainerConfig through SpringApplication.from() is the correct approach for test configuration.
This change improves the test architecture by:
- Separating container configuration concerns
- Making the test setup more maintainable and reusable
- Providing a centralized place for Kafka test configurations
kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java (2)
28-31
: Add API documentation for better developer experience.Consider adding OpenAPI/Swagger annotations to document:
- Purpose of the endpoint
- Response structure (what the boolean values represent)
- Possible error scenarios
Example addition:
+ @Operation(summary = "Get status of all Kafka listeners") + @ApiResponse(responseCode = "200", description = "Map of listener IDs to their active states") @GetMapping("/listeners") public ResponseEntity<Map<String, Boolean>> getListeners() { return ResponseEntity.ok(messageService.getListeners()); }
28-37
: Consider security implications of listener management endpoints.These endpoints provide powerful functionality to control Kafka listeners, which could pose operational risks:
- Consider adding authentication/authorization (e.g., Spring Security)
- Implement rate limiting to prevent potential DoS attacks
- Add audit logging for listener state changes
kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java (1)
124-162
: LGTM with suggestions for enhanced test coverage.The test effectively verifies the basic START/STOP operations. Consider enhancing it with:
- Error scenarios (e.g., invalid container ID, invalid operation)
- Concurrent operations on multiple containers
- Operation on already stopped/started containers
Example test case for error scenario:
@Test void invalidContainerOperation() throws Exception { this.mockMvc .perform(post("/listeners") .content(objectMapper.writeValueAsString(new KafkaListenerRequest( "invalid-container-id", Operation.STOP))) .contentType(MediaType.APPLICATION_JSON)) .andExpect(status().isBadRequest()); }kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java (1)
69-73
: Consider adding error handling for potential Kafka exceptions.While the implementation is clean and functional, consider adding try-catch block to handle potential
KafkaException
that might occur when accessing listener containers. This would make the method more robust and provide better error feedback.public Map<String, Boolean> getListeners() { + try { return kafkaListenerEndpointRegistry.getListenerContainers().stream() .collect( Collectors.toMap(MessageListenerContainer::getListenerId, MessageListenerContainer::isRunning)); + } catch (KafkaException ke) { + throw new KafkaException("Failed to retrieve listener states", ke); + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java
(2 hunks)kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/KafkaListenerRequest.java
(1 hunks)kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/Operation.java
(1 hunks)kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java
(2 hunks)kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleApplicationTests.java
(0 hunks)kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java
(5 hunks)kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/TestBootKafkaSampleApplication.java
(1 hunks)kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/common/ContainerConfig.java
(1 hunks)
💤 Files with no reviewable changes (1)
- kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleApplicationTests.java
✅ Files skipped from review due to trivial changes (2)
- kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/Operation.java
- kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/KafkaListenerRequest.java
🔇 Additional comments (9)
kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/TestBootKafkaSampleApplication.java (1)
3-3
: Verify ContainerConfig implementation
The migration to a separate ContainerConfig class improves modularity, but we should ensure it's properly implemented.
✅ Verification successful
ContainerConfig implementation is properly set up
The ContainerConfig class is correctly implemented with:
- Proper
@Bean
and@ServiceConnection
annotations - KafkaContainer configuration using apache/kafka-native:3.8.1
- Additional Kafka configuration for partitions
- Consistent with similar container configurations across the codebase
The import and usage in TestBootKafkaSampleApplication.java follows Spring Boot's test container integration best practices.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify ContainerConfig implementation and its Kafka container setup
# Expected: Find ContainerConfig with proper Kafka container configuration
# Check ContainerConfig implementation
ast-grep --pattern 'class ContainerConfig {
$$$
@Bean
$$$
KafkaContainer $$$() {
$$$
}
}'
# Verify test coverage for new listener management features
rg -l "KafkaListenerContainerFactory|KafkaListenerEndpointRegistry" --type java
Length of output: 1088
Script:
#!/bin/bash
# Let's check the actual ContainerConfig implementation since it wasn't found in the first search
fd ContainerConfig.java
# If found, let's see its content
rg -A 10 "class ContainerConfig" --type java
# Let's also check the test class to understand the usage
cat "kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/TestBootKafkaSampleApplication.java"
Length of output: 8932
kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/common/ContainerConfig.java (2)
9-10
: LGTM! Well-structured test configuration.
The class is properly configured with @TestConfiguration(proxyBeanMethods = false)
, which is the recommended approach for test configurations as it prevents caching of bean instances between tests.
12-19
: Verify partition configuration and consider security settings.
The Kafka container setup looks good, but there are a few considerations:
- The partition count of 32 seems high for a test environment. Consider if this many partitions are necessary for your test scenarios.
- The container lacks security configurations which might not match your production environment.
Consider these improvements:
- Add security configurations if they match your production setup:
KafkaContainer kafkaContainer() {
KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("3.8.1"));
kafkaContainer.addEnv("KAFKA_NUM_PARTITIONS", "32");
+ // Consider adding security configurations
+ kafkaContainer.addEnv("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT");
return kafkaContainer;
}
Let's verify the Kafka configuration in the codebase:
kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java (1)
3-3
: LGTM! Import statements are clean and necessary.
The new imports are properly organized and directly support the added functionality.
Also applies to: 9-9, 10-10
kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java (3)
105-122
: LGTM! Well-structured test for listener containers.
The test thoroughly verifies:
- Total number of containers (5)
- Existence and state of each container type (main, retry, DLT)
- All containers are running by default
46-46
: Verify the expected message count in assertion.
There appears to be an inconsistency between the comment and the assertion:
- Comment suggests: 4 (topic1) + 3 (topic2) + 1 (test) messages
- Assertion checks for: initialCount + 3
Please verify if the assertion initialCount + 3
correctly reflects the expected behavior.
Also applies to: 56-56
12-15
: Verify the test configuration change impact.
The switch from TestBootKafkaSampleApplication
to ContainerConfig
indicates a more focused test configuration. Let's verify that all necessary configurations are properly migrated.
Also applies to: 29-29
✅ Verification successful
Let me analyze the results. I can see that:
ContainerConfig
is a focused configuration class that provides a Kafka container with specific settings (32 partitions).TestBootKafkaSampleApplication
actually usesContainerConfig
in its configuration, showing they work together.- The change in imports and configuration is consistent with the codebase structure.
Let me verify one more aspect to ensure there are no missing configurations:
Let me check the actual content of these test files to ensure the configuration change is properly handled:
Based on the comprehensive analysis of the test files and configurations, I can now provide the final response:
Configuration change is properly implemented
The switch from TestBootKafkaSampleApplication
to ContainerConfig
is correct and well-structured:
ContainerConfig
provides the necessary Kafka container configuration with specific settings (32 partitions)TestBootKafkaSampleApplication
actually usesContainerConfig
in its bootstrap- All Kafka-related test components (KafkaTemplate, KafkaListener) are properly configured and accessible in the test classes
- The new imports align with the actual usage in the integration tests
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if ContainerConfig contains all necessary configurations from TestBootKafkaSampleApplication
# Find and compare the configuration classes
echo "=== ContainerConfig ==="
fd -e java -E test --exec rg -l "class ContainerConfig"
rg -A 10 "class ContainerConfig"
echo "=== TestBootKafkaSampleApplication ==="
fd -e java -E test --exec rg -l "class TestBootKafkaSampleApplication"
rg -A 10 "class TestBootKafkaSampleApplication"
Length of output: 9678
Script:
#!/bin/bash
# Check for any Kafka-related configurations in the test application
rg -l "KafkaTemplate|KafkaListener|@Bean.*Kafka" kafka-spring-boot/boot-kafka-sample/src/test/java/
Length of output: 334
Script:
#!/bin/bash
# Check the Kafka-related configurations in both test files
echo "=== KafkaSampleApplicationTests.java ==="
rg -A 5 "KafkaTemplate|KafkaListener|@Bean.*Kafka" kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleApplicationTests.java
echo "=== KafkaSampleIntegrationTest.java ==="
rg -A 5 "KafkaTemplate|KafkaListener|@Bean.*Kafka" kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java
Length of output: 4815
kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java (2)
3-32
: LGTM! Clean dependency injection and proper imports.
The new dependencies and imports are properly organized, and the constructor injection follows best practices.
75-88
: Verify the impact on existing Kafka listeners.
Let's verify if there are any existing Kafka listeners in the codebase that might be affected by this new functionality.
✅ Verification successful
Let me gather more information about the listener configurations to understand their setup better.
Let me check one final aspect about the container configurations to ensure we have a complete picture.
The new listener management functionality is safe to implement
Based on the analysis of the codebase:
- The project has two Kafka listeners (Receiver1 and Receiver2) with proper configurations and error handling
- The listeners are independent and use standard Spring Kafka configurations
- The new functionality using KafkaListenerEndpointRegistry is the recommended way to manage listener lifecycles
- The implementation follows Spring Kafka's best practices for listener container management
The changes provide a controlled way to start/stop listeners without impacting their core functionality or configurations.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for @KafkaListener annotations to identify existing listeners
ast-grep --pattern '@KafkaListener($$$)'
# Search for MessageListenerContainer usage
rg -l "MessageListenerContainer"
Length of output: 2603
Script:
#!/bin/bash
# Check the listener configurations to understand their setup
rg -A 5 "@KafkaListener" kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/listener/
# Check if there are any custom container factory configurations
rg -l "KafkaListenerContainerFactory"
# Look for any error handler configurations
ast-grep --pattern 'errorHandler = "$_"'
Length of output: 2734
Script:
#!/bin/bash
# Check for container factory configurations in the kafka-sample project
rg -A 10 "KafkaListenerContainerFactory" kafka-spring-boot/boot-kafka-sample/
# Look for any container-related configurations
rg -A 5 "containerFactory" kafka-spring-boot/boot-kafka-sample/
# Check for any custom container configurations
rg -l "KafkaListenerEndpointRegistry" kafka-spring-boot/boot-kafka-sample/
Length of output: 445
...sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java
Outdated
Show resolved
Hide resolved
...oot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java
Outdated
Show resolved
Hide resolved
...oot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java (2)
104-121
: Consider extracting magic number and documenting container rolesThe test verifies the presence of 5 containers, but this number is hardcoded. Consider:
- Extracting this as a constant with a descriptive name
- Adding a comment explaining the purpose of each container
+ private static final int EXPECTED_CONTAINER_COUNT = 5; + // Containers: main listener, DLT listener, and 3 retry listeners @Test void getListOfContainers() throws Exception { this.mockMvc .perform(get("/listeners")) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.APPLICATION_JSON)) - .andExpect(jsonPath("$.size()").value(5)) + .andExpect(jsonPath("$.size()").value(EXPECTED_CONTAINER_COUNT))
104-161
: Consider adding error scenario testsThe current tests cover the happy path, but consider adding tests for:
- Attempting to stop/start a non-existent container
- Invalid operations
- Malformed requests
Would you like me to provide example test cases for these scenarios?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java
(4 hunks)
🔇 Additional comments (2)
kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java (2)
12-15
: LGTM: Good move towards isolated testing configuration
The switch to ContainerConfig
in @SpringBootTest
improves test isolation and maintainability by separating test configuration from the main application configuration.
Also applies to: 29-29
54-55
: Verify if 15 seconds timeout is sufficient
While reducing the timeout from 30s to 15s improves test execution time, please ensure this is sufficient across all environments (CI/CD, different hardware specs, etc.) to avoid flaky tests.
✅ Verification successful
15 seconds timeout appears sufficient
The timeout of 15 seconds is adequate for this test scenario because:
- The test is using a local Kafka setup (via ContainerConfig)
- The message processing is simple (just logging)
- The retry configuration in Receiver2 has small delays (1 second initial with 2.0 multiplier)
- The test is waiting for a specific latch count (7) which accounts for startup messages (4 from topic1 + 3 from topic2) plus the test message
- The polling interval is 1 second, giving multiple chances to check the condition
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check recent CI runs to verify if tests are stable with 15s timeout
gh run list --limit 10 --json conclusion,name | jq '.[] | select(.name | contains("test")) | .conclusion'
Length of output: 107
Script:
#!/bin/bash
# Let's check the test file to understand the test setup and timing requirements
cat "kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java"
Length of output: 8717
Script:
#!/bin/bash
# Let's check the Receiver2 class to understand the latch setup and timing requirements
fd -e java -E test --exec grep -l "Receiver2" {}
Length of output: 168
Script:
#!/bin/bash
# Let's examine the Receiver2 class implementation
cat "./kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/listener/Receiver2.java"
Length of output: 2278
@Test | ||
void stopAndStartContainers() throws Exception { | ||
this.mockMvc | ||
.perform(post("/listeners") | ||
.content(this.objectMapper.writeValueAsString(new KafkaListenerRequest( | ||
"org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP))) | ||
.contentType(MediaType.APPLICATION_JSON)) | ||
.andExpect(status().isOk()) | ||
.andExpect(content().contentType(MediaType.APPLICATION_JSON)) | ||
.andExpect(jsonPath("$.size()").value(5)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']") | ||
.value(false)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']") | ||
.value(true)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']") | ||
.value(true)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']") | ||
.value(true)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']") | ||
.value(true)); | ||
this.mockMvc | ||
.perform(post("/listeners") | ||
.content(this.objectMapper.writeValueAsString(new KafkaListenerRequest( | ||
"org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.START))) | ||
.contentType(MediaType.APPLICATION_JSON)) | ||
.andExpect(status().isOk()) | ||
.andExpect(content().contentType(MediaType.APPLICATION_JSON)) | ||
.andExpect(jsonPath("$.size()").value(5)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']") | ||
.value(true)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']") | ||
.value(true)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1']") | ||
.value(true)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-0']") | ||
.value(true)) | ||
.andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-retry-1']") | ||
.value(true)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Reduce assertion duplication by extracting common verification logic
The test has duplicated assertions for verifying container states. Consider extracting a helper method to reduce duplication and improve maintainability.
+ private void assertContainerStates(ResultActions result, String targetContainer, boolean expectedState)
+ throws Exception {
+ result
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.APPLICATION_JSON))
+ .andExpect(jsonPath("$.size()").value(EXPECTED_CONTAINER_COUNT))
+ .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt']")
+ .value(targetContainer.contains("#1-dlt") ? expectedState : true))
+ .andExpect(jsonPath("$.['org.springframework.kafka.KafkaListenerEndpointContainer#0']")
+ .value(true))
+ // ... repeat for other containers
+ }
+
@Test
void stopAndStartContainers() throws Exception {
- this.mockMvc
- .perform(post("/listeners")
- .content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
- "org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
- .contentType(MediaType.APPLICATION_JSON))
- .andExpect(/* ... duplicated assertions ... */);
+ assertContainerStates(
+ this.mockMvc
+ .perform(post("/listeners")
+ .content(this.objectMapper.writeValueAsString(new KafkaListenerRequest(
+ "org.springframework.kafka.KafkaListenerEndpointContainer#1-dlt", Operation.STOP)))
+ .contentType(MediaType.APPLICATION_JSON)),
+ "#1-dlt",
+ false
+ );
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (5)
kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java (1)
74-91
: Enhance exception handling for clarityThe
updateListenerState
method throwsIllegalArgumentException
when a listener container is not found. Consider creating a custom exception or using a more specific exception to provide clearer context to the clients and improve error handling.kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java (1)
125-163
: Refactor tests to reduce duplicationThe
stopAndStartContainers
test contains repeated assertions for container states. Extract common assertion logic into a helper method to improve readability and maintainability.Example:
private void assertContainerStates(MockMvc mockMvc, String containerId, boolean expectedState) throws Exception { mockMvc.perform(get("/listeners")) .andExpect(status().isOk()) .andExpect(jsonPath("$['" + containerId + "']").value(expectedState)); }kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/GlobalExceptionHandler.java (2)
24-24
: Ensure consistent naming for exception handler methodsFor clarity and consistency, consider renaming the
onException
method to follow thehandleExceptionType
naming convention, matching the other methods.Apply this diff to rename the method:
-public ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) { +public ProblemDetail handleMethodArgumentNotValidException(MethodArgumentNotValidException ex) {Also, update the parameter name for consistency:
-public ProblemDetail handleMethodArgumentNotValidException(MethodArgumentNotValidException methodArgumentNotValidException) { +public ProblemDetail handleMethodArgumentNotValidException(MethodArgumentNotValidException ex) {And ensure all references within the method use the new parameter name
ex
.Also applies to: 44-44, 51-51
51-57
: Avoid relying on exception messages for control flow; inspect the exception cause insteadThe
handleInvalidEnum
method checks the exception message to identify deserialization issues with theOperation
enum. Parsing exception messages is fragile and may break with changes in the message wording or localization. Instead, inspect the root cause of the exception to determine the specific error.Refactor the method to examine the root cause:
public ProblemDetail handleInvalidEnum(HttpMessageNotReadableException ex) { - if (ex.getMessage() - .contains("Cannot deserialize value of type `com.example.springbootkafkasample.dto.Operation`")) { + Throwable rootCause = ex.getRootCause(); + if (rootCause instanceof InvalidFormatException ife && ife.getTargetType() == Operation.class) { return ProblemDetail.forStatusAndDetail( HttpStatusCode.valueOf(400), "Invalid operation value. Allowed values are: START, STOP."); } return ProblemDetail.forStatusAndDetail(HttpStatusCode.valueOf(400), "Invalid request."); }Make sure to import the necessary classes:
+import com.fasterxml.jackson.databind.exc.InvalidFormatException; +import com.example.springbootkafkasample.dto.Operation;This approach is more robust and less prone to errors due to message changes.
kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/KafkaListenerRequest.java (1)
6-8
: Add Javadoc documentation for API clarityThe record is well-structured with appropriate validation constraints. Consider adding Javadoc documentation to describe:
- The purpose of this DTO
- Expected format/pattern for
containerId
if any- Valid operations and their effects
+/** + * Request DTO for Kafka listener operations. + * + * @param containerId The unique identifier of the Kafka listener container + * @param operation The operation to perform (START/STOP) + */ public record KafkaListenerRequest( @NotBlank(message = "Container ID must not be blank") String containerId, @NotNull(message = "Operation must not be null") Operation operation) {}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/GlobalExceptionHandler.java
(1 hunks)kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/config/KafkaConfig.java
(2 hunks)kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/controller/MessageRestController.java
(2 hunks)kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/dto/KafkaListenerRequest.java
(1 hunks)kafka-spring-boot/boot-kafka-sample/src/main/java/com/example/springbootkafkasample/service/MessageService.java
(3 hunks)kafka-spring-boot/boot-kafka-sample/src/main/resources/application.properties
(1 hunks)kafka-spring-boot/boot-kafka-sample/src/test/java/com/example/springbootkafkasample/KafkaSampleIntegrationTest.java
(5 hunks)
✅ Files skipped from review due to trivial changes (1)
- kafka-spring-boot/boot-kafka-sample/src/main/resources/application.properties
@@ -8,6 +8,7 @@ | |||
import org.springframework.kafka.annotation.KafkaListenerConfigurer; | |||
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; | |||
import org.springframework.kafka.listener.KafkaListenerErrorHandler; | |||
import org.springframework.lang.NonNull; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Addition of @nonnull is safe but needs to be applied consistently
The verification revealed another implementation of KafkaListenerConfigurer
in the multi-producers-consumers module that doesn't use @NonNull
. While the annotation addition is good practice, it should be applied consistently across all implementations. The other implementation should be updated to maintain consistency in the null-safety contract.
kafka-spring-boot/boot-multiple-producers-consumers/src/main/java/com/example/springbootkafka/multi/config/KafkaConfig.java
: Add@NonNull
to theconfigureKafkaListeners
parameter to match the contract
🔗 Analysis chain
LGTM! Good addition of null-safety.
The addition of @NonNull
annotation improves the contract of the configureKafkaListeners
method by making null-safety explicit. This is aligned with Spring Framework's null-safety initiative.
Let's verify that this change doesn't conflict with any custom implementations:
Also applies to: 27-27
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any custom implementations of KafkaListenerConfigurer
# that might be affected by the stricter null check
# Search for KafkaListenerConfigurer implementations
ast-grep --pattern 'class $_ implements $$$KafkaListenerConfigurer$$$'
# Search for direct calls to configureKafkaListeners
rg -l "configureKafkaListeners.*registrar"
Length of output: 349
Script:
#!/bin/bash
# Let's check the implementation in the other KafkaConfig file to see if it also has @NonNull
# and verify there are no conflicts
# Check the implementation in boot-multiple-producers-consumers
cat kafka-spring-boot/boot-multiple-producers-consumers/src/main/java/com/example/springbootkafka/multi/config/KafkaConfig.java
# Also check if there are any other files importing KafkaListenerConfigurer
# to ensure we haven't missed any implementations
rg -l "import.*KafkaListenerConfigurer"
Length of output: 5852
@PostMapping("/listeners") | ||
public ResponseEntity<Map<String, Boolean>> updateListenerState( | ||
@RequestBody @Valid final KafkaListenerRequest kafkaListenerRequest) { | ||
return ResponseEntity.ok(messageService.updateListenerState(kafkaListenerRequest)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate request body and handle validation errors
Ensure that KafkaListenerRequest
is validated and any constraint violations return meaningful error responses. Currently, validation annotations are used, but there is no exception handler for MethodArgumentNotValidException
.
Consider adding an exception handler:
@ExceptionHandler(MethodArgumentNotValidException.class)
public ResponseEntity<String> handleValidationExceptions(MethodArgumentNotValidException ex) {
String errorMessage = ex.getBindingResult().getAllErrors().stream()
.map(DefaultMessageSourceResolvable::getDefaultMessage)
.collect(Collectors.joining(", "));
return ResponseEntity.badRequest().body(errorMessage);
}
} | ||
|
||
@ExceptionHandler(IllegalArgumentException.class) | ||
ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Declare the handleIllegalArgumentException
method as public
Similar to the previous method, handleIllegalArgumentException
is package-private. It should be declared as public
to be recognized by Spring's exception handling.
Apply this diff to change the method's access modifier:
-ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) {
+public ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) { | |
public ProblemDetail handleIllegalArgumentException(IllegalArgumentException ex) { |
|
||
@ExceptionHandler(MethodArgumentNotValidException.class) | ||
@ResponseStatus(HttpStatus.BAD_REQUEST) | ||
ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Declare the onException
method as public
for proper accessibility
The onException
method is package-private. Exception handler methods annotated with @ExceptionHandler
should be public
to ensure they are accessible to Spring's exception handling mechanism.
Apply this diff to change the method's access modifier:
-ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) {
+public ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) { | |
public ProblemDetail onException(MethodArgumentNotValidException methodArgumentNotValidException) { |
|
||
@Order(Ordered.HIGHEST_PRECEDENCE) | ||
@ControllerAdvice | ||
class GlobalExceptionHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Make the GlobalExceptionHandler
class public
to ensure Spring can detect it
The GlobalExceptionHandler
class is currently package-private. To allow Spring's @ControllerAdvice
to automatically detect and apply this global exception handler across the application, the class should be declared as public
.
Apply this diff to update the class access modifier:
-class GlobalExceptionHandler {
+public class GlobalExceptionHandler {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class GlobalExceptionHandler { | |
public class GlobalExceptionHandler { |
package com.example.springbootkafkasample.dto; | ||
|
||
import jakarta.validation.constraints.NotBlank; | ||
import jakarta.validation.constraints.NotNull; | ||
|
||
public record KafkaListenerRequest( | ||
@NotBlank(message = "Container ID must not be blank") String containerId, | ||
@NotNull(message = "Operation must not be null") Operation operation) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Security controls needed for Kafka listener management endpoint
Based on the verification results:
- The Operation enum is simple with just START/STOP operations
- The endpoint
/listeners
is exposed without any security controls - Basic validation exists for containerId and invalid container handling
- No audit logging is implemented for listener state changes
Required security improvements:
- Add Spring Security with role-based access control to
/listeners
endpoint - Implement audit logging for listener state changes
- Consider rate limiting to prevent potential DoS attacks
- Add request origin validation if the endpoint is exposed externally
🔗 Analysis chain
Verify security implications and Operation enum usage
The ability to start/stop Kafka listeners could have security implications. Consider:
- Adding role-based access control
- Audit logging for listener state changes
- Validation that the containerId exists before operations
Let's verify the Operation enum implementation and usage:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Find and display the Operation enum definition
echo "Looking for Operation enum definition:"
ast-grep --pattern 'enum Operation {
$$$
}'
# Find all usages of KafkaListenerRequest to verify security measures
echo -e "\nChecking for security controls in endpoints using KafkaListenerRequest:"
rg -l "KafkaListenerRequest" | xargs rg -A 5 "@(PreAuthorize|Secured|RolesAllowed)"
Length of output: 377
Script:
#!/bin/bash
# Let's try a different approach to find the Operation enum
echo "Looking for Operation enum definition:"
fd -e java | xargs rg -l "enum Operation"
echo -e "\nChecking Operation enum content:"
fd -e java | xargs rg -A 5 "enum Operation"
echo -e "\nChecking KafkaListenerRequest usage in controllers/services:"
fd -e java | xargs rg -B 2 -A 5 "KafkaListenerRequest"
echo -e "\nChecking for any security configurations:"
fd -e java -e xml -e yml -e yaml -e properties | xargs rg "security|Security|@Secured|@PreAuthorize|WebSecurity"
Length of output: 13815
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai, please help me to implement above security improvements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rajadilipkolli, I'd be happy to help you implement the security improvements. Here are the steps to enhance the security of your /listeners
endpoint:
1. Add Spring Security with Role-Based Access Control
a. Add Spring Security Dependency
Include the Spring Security starter dependency in your pom.xml
:
<!-- Add Spring Security starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
b. Create a Security Configuration Class
Create a new class SecurityConfig
in the config
package to configure security settings:
package com.example.springbootkafkasample.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.web.configuration.WebSecurityCustomizer;
import org.springframework.security.web.SecurityFilterChain;
import static org.springframework.security.config.Customizer.withDefaults;
@Configuration
@EnableMethodSecurity
public class SecurityConfig {
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(authorize -> authorize
.requestMatchers("/listeners/**").hasRole("ADMIN")
.anyRequest().permitAll()
)
.httpBasic(withDefaults())
.csrf().disable(); // Disable CSRF for simplicity; consider enabling it in production.
return http.build();
}
@Bean
public WebSecurityCustomizer webSecurityCustomizer() {
return (web) -> web.debug(false);
}
}
c. Configure an In-Memory User for Testing
For demonstration purposes, you can set up an in-memory user with the ADMIN
role:
import org.springframework.context.annotation.Bean;
import org.springframework.security.provisioning.InMemoryUserDetailsManager;
import org.springframework.security.core.userdetails.User;
@Bean
public InMemoryUserDetailsManager userDetailsService() {
return new InMemoryUserDetailsManager(
User.withUsername("admin")
.password("{noop}adminpass") // {noop} specifies that no encoding is required
.roles("ADMIN")
.build()
);
}
d. Secure the Controller Methods
Ensure that only users with the ADMIN
role can access the /listeners
endpoints. You can use method-level security annotations:
package com.example.springbootkafkasample.controller;
import org.springframework.security.access.prepost.PreAuthorize;
//...
@RestController
@RequestMapping("/listeners")
public class MessageRestController {
//...
@PreAuthorize("hasRole('ADMIN')")
@GetMapping
public ResponseEntity<Map<String, Boolean>> getListeners() {
// Existing code
}
@PreAuthorize("hasRole('ADMIN')")
@PostMapping
public ResponseEntity<Map<String, Boolean>> updateListenerState(@RequestBody KafkaListenerRequest kafkaListenerRequest) {
// Existing code
}
}
2. Implement Audit Logging for Listener State Changes
a. Create an Audit Aspect
Use Spring AOP to log listener state changes:
package com.example.springbootkafkasample.aspect;
import com.example.springbootkafkasample.dto.KafkaListenerRequest;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Aspect
@Component
public class AuditAspect {
private static final Logger logger = LoggerFactory.getLogger(AuditAspect.class);
@AfterReturning(pointcut = "execution(* com.example.springbootkafkasample.service.MessageService.updateListenerState(..)) && args(request)", returning = "result")
public void logListenerStateChange(JoinPoint joinPoint, KafkaListenerRequest request, Object result) {
String containerId = request.containerId();
String operation = request.operation().name();
logger.info("Listener with containerId '{}' performed operation '{}'", containerId, operation);
}
}
b. Enable AspectJ Auto-Proxy Support
Make sure AspectJ proxy support is enabled:
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
@Configuration
@EnableAspectJAutoProxy
public class AppConfig {
// Other beans if necessary
}
3. Implement Rate Limiting to Prevent DoS Attacks
a. Add Bucket4J Dependency
Include the Bucket4J library for rate limiting:
<!-- Add Bucket4J dependency -->
<dependency>
<groupId>com.giffing.bucket4j.spring.boot.starter</groupId>
<artifactId>bucket4j-spring-boot-starter</artifactId>
<version>0.7.0</version>
</dependency>
b. Configure Rate Limiting
Create a rate-limiting filter:
package com.example.springbootkafkasample.config;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Refill;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class RateLimitingFilter extends OncePerRequestFilter {
private final Map<String, Bucket> cache = new ConcurrentHashMap<>();
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
String clientIP = request.getRemoteAddr();
Bucket bucket = cache.computeIfAbsent(clientIP, this::newBucket);
if (bucket.tryConsume(1)) {
filterChain.doFilter(request, response);
} else {
response.setStatus(HttpServletResponse.SC_TOO_MANY_REQUESTS);
response.getWriter().write("Too many requests");
}
}
private Bucket newBucket(String key) {
return Bucket.builder()
.addLimit(Bandwidth.classic(5, Refill.greedy(5, Duration.ofMinutes(1))))
.build();
}
}
c. Register the Filter
Add the filter to your security configuration:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;
// In your SecurityConfig class
@Autowired
private RateLimitingFilter rateLimitingFilter;
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
// Existing configurations
.addFilterBefore(rateLimitingFilter, UsernamePasswordAuthenticationFilter.class);
return http.build();
}
4. Add Request Origin Validation
If your endpoint is exposed externally, validate the request origin.
a. Configure CORS in Security Configuration
// In your SecurityConfig class
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
// Existing configurations
.cors().configurationSource(request -> {
CorsConfiguration config = new CorsConfiguration();
config.setAllowedOrigins(List.of("https://your-trusted-domain.com"));
config.setAllowedMethods(List.of("GET", "POST"));
return config;
})
.and()
.csrf().disable();
return http.build();
}
b. Set Up Additional Origin Checks (Optional)
For extra security, you can validate the Origin
header in your controller:
// In your MessageRestController
@PostMapping
public ResponseEntity<Map<String, Boolean>> updateListenerState(
@RequestBody KafkaListenerRequest kafkaListenerRequest,
@RequestHeader(value = "Origin", required = false) String origin) {
if (origin == null || !origin.equals("https://your-trusted-domain.com")) {
return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
}
// Existing code
}
Note: Remember to replace placeholders like "https://your-trusted-domain.com"
, usernames, passwords, and other configurations with values appropriate for your environment.
Let me know if you have any questions or need further assistance!
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests