Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : Implement Notify and Listen Support #1639

Merged
merged 5 commits into from
Jan 12, 2025
Merged

Conversation

rajadilipkolli
Copy link
Owner

@rajadilipkolli rajadilipkolli commented Jan 12, 2025

Summary by CodeRabbit

Release Notes

  • New Features

    • Added PostgreSQL Notify and Listen support using reactive Spring Boot
    • Implemented real-time notification streaming via WebSocket and Server-Sent Events (SSE)
    • Created endpoints for publishing and receiving database notifications
  • Documentation

    • Added comprehensive README with project setup and usage instructions
    • Included Docker Compose configurations for PostgreSQL and application deployment
  • Infrastructure

    • Configured Maven wrapper and project dependencies
    • Set up Docker containers for PostgreSQL and pgAdmin
    • Added integration test configurations

@rajadilipkolli rajadilipkolli self-assigned this Jan 12, 2025
Copy link
Contributor

coderabbitai bot commented Jan 12, 2025

Walkthrough

This pull request introduces a new module for PostgreSQL Notify and Listen support using reactive Spring Boot with R2DBC. The project demonstrates real-time event notifications between a PostgreSQL database and a Spring Boot application through pub/sub capabilities. It includes Docker Compose configurations for setting up PostgreSQL and pgAdmin, a Maven project structure, and a comprehensive implementation of reactive notification mechanisms.

Changes

File Change Summary
SUMMARY.md Added documentation link for PostgreSQL Notify and Listen support
pom.xml Added new module r2dbc/boot-r2dbc-notify-listen
README.md New documentation explaining project purpose, architecture, and usage
docker/docker-compose*.yml Added configurations for PostgreSQL, pgAdmin, and application services
src/main/java/com/example/demo/... Added multiple classes for notification handling, routing, and configuration
src/main/resources/application.properties Added notification-related configuration properties

Sequence Diagram

sequenceDiagram
    participant Client
    participant NotificationController
    participant NotificationListener
    participant PostgreSQL

    Client->>NotificationController: Subscribe to notifications
    NotificationController->>NotificationListener: Request notification stream
    NotificationListener->>PostgreSQL: LISTEN on channel
    PostgreSQL-->>NotificationListener: Notification event
    NotificationListener->>Client: Stream notification
Loading

Possibly related PRs

Suggested labels

component: r2dbc

Poem

🐰 Hop, hop, hooray! A new module's in play,
PostgreSQL notifications dance today,
Reactive streams flow with grace,
R2DBC sets a blazing pace,
Notifications leap from database to display! 🎉

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.


@Bean
public RouterFunction<ServerResponse> routes(Notifier notifier) {
return route().GET("/hello", request -> notifier.send()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai, can you please help me to write Integration Test to check notify and listen

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajadilipkolli Here's an integration test for the notify and listen functionality:

package com.example.demo;

import com.example.demo.common.AbstractIntegrationTest;
import com.example.demo.listener.Listener;
import com.example.demo.listener.Notifier;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.web.reactive.server.WebTestClient;
import java.time.Duration;

class NotifyListenIntegrationTest extends AbstractIntegrationTest {

    @Autowired
    private Notifier notifier;

    @Test
    void shouldSendAndReceiveNotification() {
        // when triggering notification through HTTP endpoint
        webTestClient.get()
                .uri("/hello")
                .exchange()
                // then
                .expectStatus().isNoContent()
                .expectBody().isEmpty();

        // Give some time for the notification to be processed
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Test
    void shouldSendNotificationDirectly() {
        // when sending notification directly
        notifier.send()
                .block(Duration.ofSeconds(5));

        // Give some time for the notification to be processed
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

This test class includes two test methods:

  1. shouldSendAndReceiveNotification() - Tests the notification through the HTTP endpoint
  2. shouldSendNotificationDirectly() - Tests the notification by directly using the Notifier

The tests verify that:

  • The endpoint returns a 204 No Content response
  • The notification is sent successfully

Note: The actual notification reception can be verified in the logs as the Listener is configured to log received notifications. You'll see log messages like:

Received notification: with name :mymessage , processId :<pid> and parameter :Hello world at <timestamp>

Let me know if you need any clarification or have questions about the test implementation!

🔗 Analysis chain

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Find Notifier implementation
ast-grep --pattern 'class Notifier {
  $$$
}'

# Find any listener implementations
rg -l "implements.*Listener|@EventListener" --type java

# Find existing test files
fd -e java -p "Test\.java$" r2dbc/boot-r2dbc-notify-listen/src/test/

# Look for test configuration files
fd -e java -p "Config.*Test\.java$" r2dbc/boot-r2dbc-notify-listen/src/test/

Length of output: 1015


Script:

#!/bin/bash
# Check the Notifier implementation in this specific module
rg -p "class Notifier" --type java r2dbc/boot-r2dbc-notify-listen/src/main/java/

# Look for listener implementations in this module
rg -p "class.*Listener|interface.*Listener" --type java r2dbc/boot-r2dbc-notify-listen/src/main/java/

# Check the existing test implementations
cat r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/ApplicationTest.java
cat r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/common/AbstractIntegrationTest.java

Length of output: 1624


Script:

#!/bin/bash
# Get the implementation of Notifier and Listener
cat r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/Notifier.java
cat r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/Listener.java
cat r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/common/ContainerConfig.java

Length of output: 3623

@rajadilipkolli
Copy link
Owner Author

@coderabbitai, can you please help me to fill ReadMe file with contents of this repo and sequence diagram

Copy link
Contributor

coderabbitai bot commented Jan 12, 2025

Tip

For best results, initiate chat on the files or code changes.

@rajadilipkolli Here's the README content with a sequence diagram for the boot-r2dbc-notify-listen project:

# Spring Boot R2DBC Notify/Listen Example

This project demonstrates how to implement PostgreSQL's NOTIFY/LISTEN feature using Spring Boot and R2DBC. It showcases real-time notifications between database and application using PostgreSQL's pub/sub capabilities.

## Features

- Reactive PostgreSQL connectivity using R2DBC
- Implementation of PostgreSQL NOTIFY/LISTEN mechanism
- Real-time event notifications
- Reactive endpoints for publishing and receiving notifications

## Prerequisites

- JDK 21
- Maven
- Docker and Docker Compose
- PostgreSQL (or use provided Docker Compose)

## Getting Started

1. Start PostgreSQL using Docker Compose:
   ```bash
   docker-compose up -d
  1. Build the application:

    ./mvnw clean package
  2. Run the application:

    ./mvnw spring-boot:run

Architecture

sequenceDiagram
    participant Client
    participant Application
    participant Listener
    participant PostgreSQL

    Note over Application: Application starts
    Application->>PostgreSQL: Connect to database
    Application->>PostgreSQL: LISTEN channel_name
    
    Note over Client: Trigger notification
    Client->>Application: POST /api/notify
    Application->>PostgreSQL: NOTIFY channel_name, 'message'
    PostgreSQL-->>Listener: Notification event
    Listener-->>Application: Process notification
    Application-->>Client: Notification processed

    Note over Client: Subscribe to events
    Client->>Application: WebSocket connection
    Application-->>Client: Stream notifications
Loading

Usage

  1. To publish a notification:

    curl -X POST http://localhost:8080/api/notify \
         -H "Content-Type: application/json" \
         -d '{"channel": "events", "message": "Hello World!"}'
  2. To subscribe to notifications:

    • Connect to WebSocket endpoint: ws://localhost:8080/notifications
    • Or use Server-Sent Events endpoint: http://localhost:8080/notifications/sse

Configuration

Key configuration properties in application.properties:

# R2DBC PostgreSQL Configuration
spring.r2dbc.url=r2dbc:postgresql://localhost:5432/postgres
spring.r2dbc.username=postgres
spring.r2dbc.password=postgres

# Notification Channel Configuration
app.notification.channel=events

How It Works

  1. The application establishes a connection to PostgreSQL using R2DBC
  2. It sets up LISTEN on specified channels
  3. When a notification is published:
    • Application executes NOTIFY command
    • PostgreSQL broadcasts to all listeners
    • Listeners receive and process notifications
  4. Connected clients receive notifications via WebSocket/SSE

Contributing

Feel free to submit issues and enhancement requests!


This README provides:
1. Clear overview of the project
2. Setup instructions
3. Sequence diagram showing the notification flow
4. Usage examples
5. Configuration details
6. Explanation of how the system works

Would you like me to modify any part of this documentation?

<details>
<summary>🔗 Analysis chain</summary>








</details>

<details>
<summary>🏁 Scripts executed</summary>

The following scripts were executed for the analysis:

Script:

#!/bin/bash

Check if README.md exists and its current content

echo "=== Current README.md content ==="
cat README.md || echo "README.md not found"

echo -e "\n=== Project Structure ==="
tree -L 3 -I 'node_modules|target|.git|.idea|.mvn'

echo -e "\n=== Main Application File ==="
fd -e java "Application.java$"

echo -e "\n=== Controller Files ==="
fd -e java "Controller.java$"

echo -e "\n=== Service Files ==="
fd -e java "Service.java$"

echo -e "\n=== Configuration Files ==="
fd "application.yml$|application.properties$"

echo -e "\n=== POM File ==="
cat pom.xml


Length of output: 55442

</details>

@rajadilipkolli
Copy link
Owner Author

@coderabbitai, can you please help me to expose an api which satisfy below CRUL

curl -X POST http://localhost:8080/api/notify
-H "Content-Type: application/json"
-d '{"channel": "events", "message": "Hello World!"}'

Copy link
Contributor

coderabbitai bot commented Jan 12, 2025

Tip

For best results, initiate chat on the files or code changes.

@rajadilipkolli Here's the implementation for the notification endpoint:

// src/main/java/com/example/notify/model/NotificationRequest.java
package com.example.notify.model;

import jakarta.validation.constraints.NotBlank;

public record NotificationRequest(
    @NotBlank String channel,
    @NotBlank String message
) {}
// src/main/java/com/example/notify/controller/NotificationController.java
package com.example.notify.controller;

import com.example.notify.model.NotificationRequest;
import com.example.notify.service.NotificationService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import jakarta.validation.Valid;

@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class NotificationController {

    private final NotificationService notificationService;

    @PostMapping("/notify")
    public Mono<ResponseEntity<Void>> notify(@RequestBody @Valid NotificationRequest request) {
        return notificationService.notify(request.channel(), request.message())
                .map(result -> ResponseEntity.ok().build());
    }
}
// src/main/java/com/example/notify/service/NotificationService.java
package com.example.notify.service;

import lombok.RequiredArgsConstructor;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
@RequiredArgsConstructor
public class NotificationService {

    private final DatabaseClient databaseClient;

    public Mono<Void> notify(String channel, String message) {
        return databaseClient.sql("SELECT pg_notify(:channel, :message)")
                .bind("channel", channel)
                .bind("message", message)
                .fetch()
                .first()
                .then();
    }
}

This implementation:

  1. Creates a NotificationRequest record to validate and deserialize the JSON request body
  2. Implements a reactive REST controller with POST endpoint at /api/notify
  3. Uses NotificationService to execute PostgreSQL's pg_notify function
  4. Uses R2DBC's DatabaseClient for reactive database operations

You can now use the endpoint as shown in your CURL command:

curl -X POST http://localhost:8080/api/notify \
     -H "Content-Type: application/json" \
     -d '{"channel": "events", "message": "Hello World!"}'

Would you like me to add any additional features or make any modifications?

🔗 Analysis chain
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check project structure
tree -L 3 -I 'node_modules|target|.git|.idea|.mvn'

# Find and show any existing controller files
echo -e "\n=== Existing Controllers ==="
fd -e java "Controller.java$" --exec cat {}

# Find and show any existing service files
echo -e "\n=== Existing Services ==="
fd -e java "Service.java$" --exec cat {}

# Find and show configuration files
echo -e "\n=== Configuration Files ==="
fd "application.yml$|application.properties$" --exec cat {}

Length of output: 67529

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

♻️ Duplicate comments (1)
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (1)

3-5: ⚠️ Potential issue

Credentials should not be hard-coded.

Database credentials should not be stored in the properties file. Use environment variables or a secure configuration service.

-spring.r2dbc.url=r2dbc:postgresql://localhost/test
-spring.r2dbc.username=user
-spring.r2dbc.password=password
+spring.r2dbc.url=${R2DBC_URL:r2dbc:postgresql://localhost/test}
+spring.r2dbc.username=${R2DBC_USERNAME}
+spring.r2dbc.password=${R2DBC_PASSWORD}
🧹 Nitpick comments (9)
r2dbc/boot-r2dbc-notify-listen/docker/docker-compose-app.yml (1)

1-1: Fix line endings in the YAML file.

The YAML file contains incorrect line endings. Ensure consistent Unix-style line endings (LF) are used.

🧰 Tools
🪛 yamllint (1.35.1)

[error] 1-1: wrong new line character: expected \n

(new-lines)

r2dbc/boot-r2dbc-notify-listen/docker/docker-compose.yml (2)

3-14: Add volume persistence and health check for PostgreSQL.

Consider adding:

  1. Volume persistence to prevent data loss
  2. Health check to ensure container readiness
   postgresqldb:
     image: postgres:17.2-alpine
     hostname: postgresqldb
     extra_hosts: [ 'host.docker.internal:host-gateway' ]
+    volumes:
+      - postgres_data:/var/lib/postgresql/data
+    healthcheck:
+      test: ["CMD-SHELL", "pg_isready -U appuser -d appdb"]
+      interval: 10s
+      timeout: 5s
+      retries: 5
     environment:

Add this at the end of the file:

+volumes:
+  postgres_data:

16-37: Add health check dependency for pgAdmin service.

The pgAdmin service should wait for PostgreSQL to be healthy before starting.

   depends_on:
-      postgresqldb:
-        condition: service_started
+      postgresqldb:
+        condition: service_healthy
r2dbc/boot-r2dbc-notify-listen/README.md (2)

82-92: Add security considerations to configuration section.

Consider adding a section about securing credentials and using environment variables instead of hardcoding them in properties.

Add this section:

# Security Considerations

- Use environment variables or external configuration for sensitive data
- Example secure configuration:
  ```properties
  spring.r2dbc.url=${DB_URL}
  spring.r2dbc.username=${DB_USERNAME}
  spring.r2dbc.password=${DB_PASSWORD}

---

`103-107`: **Add troubleshooting section to the documentation.**

Consider adding a troubleshooting section to help users resolve common issues.

Add this section:
```markdown
## Troubleshooting

Common issues and their solutions:
1. Connection refused
   - Ensure PostgreSQL is running and accessible
   - Verify port mappings in docker-compose
2. Notification not received
   - Check channel name matches
   - Verify WebSocket/SSE connection is established
🧰 Tools
🪛 LanguageTool

[style] ~106-~106: The phrase ‘feel free to’ is used quite frequently. Consider using a less frequent alternative to set your writing apart from others and make it sound more professional.
Context: ...via WebSocket/SSE --- ## Contributing Feel free to submit issues and enhancement requests!...

(FEEL_FREE_TO_STYLE_ME)

r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/TestNotifyListenPGApplication.java (1)

6-13: Consider making the class public for better test reusability.

The implementation looks good and follows Spring Boot's best practices for test configuration. However, consider making the class public if you plan to reuse this test configuration across different test packages.

-class TestNotifyListenPGApplication {
+public class TestNotifyListenPGApplication {
r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/common/AbstractIntegrationTest.java (1)

12-16: Add test lifecycle management and NOTIFY/LISTEN test utilities.

Consider enhancing the base class with:

  1. Test cleanup methods
  2. Notification verification utilities
  3. Async test support
public abstract class AbstractIntegrationTest {
    @Autowired
    protected WebTestClient webTestClient;
    
    @Autowired
    protected ConnectionFactory connectionFactory;
    
    protected StepVerifier.Step<Notification> verifyNotification(String channel) {
        return StepVerifier.create(
            Mono.from(connectionFactory.create())
                .flatMapMany(connection -> 
                    connection.createStatement("LISTEN " + channel)
                        .execute()
                        .flatMap(result -> Mono.from(result.getRowsUpdated()))
                        .thenMany(connection.getNotifications())
                )
        );
    }
    
    @AfterEach
    void cleanup() {
        // Add cleanup logic
    }
}
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/config/PostgresqlConnectionConfig.java (1)

28-38: Enhance error handling in connection unwrapping

The error handling could be more informative by including the actual connection type in the error message.

Consider enhancing the error message:

-return Mono.error(new IllegalStateException("Connection is not a PostgresqlConnection"));
+return Mono.error(new IllegalStateException(
+    String.format("Expected PostgresqlConnection but got %s", connection.getClass().getName())
+));
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java (1)

20-22: Fix constructor parameter name inconsistency

The constructor parameter name receiver is misleading as it's assigned to a field named sender.

Align the parameter name with its usage:

-public Notifier(PostgresqlConnection receiver) {
+public Notifier(PostgresqlConnection sender) {
     this.sender = sender;
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f086b54 and 649bb6b.

📒 Files selected for processing (20)
  • SUMMARY.md (1 hunks)
  • pom.xml (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/.mvn/wrapper/maven-wrapper.properties (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/README.md (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/docker/docker-compose-app.yml (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/docker/docker-compose.yml (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/docker/docker_pgadmin_servers.json (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/mvnw (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/mvnw.cmd (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/pom.xml (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/NotifyListenPGApplication.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/config/PostgresqlConnectionConfig.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/config/WebRouterConfig.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/Listener.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/ApplicationTest.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/TestNotifyListenPGApplication.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/common/AbstractIntegrationTest.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/common/ContainerConfig.java (1 hunks)
✅ Files skipped from review due to trivial changes (4)
  • r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/ApplicationTest.java
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/NotifyListenPGApplication.java
  • r2dbc/boot-r2dbc-notify-listen/docker/docker_pgadmin_servers.json
  • r2dbc/boot-r2dbc-notify-listen/.mvn/wrapper/maven-wrapper.properties
🧰 Additional context used
🪛 LanguageTool
SUMMARY.md

[grammar] ~46-~46: It looks like there is a word missing here. Did you mean “Listen to support”?
Context: ...n/README.md) * [PostgreSQL Notify and Listen support using reactive spring boot](r2dbc/boot-...

(LISTEN_TO_ME)

r2dbc/boot-r2dbc-notify-listen/README.md

[grammar] ~9-~9: It looks like there is a word missing here. Did you mean “LISTEN to mechanism”?
Context: ...C - Implementation of PostgreSQL NOTIFY/LISTEN mechanism - Real-time event notifications - React...

(LISTEN_TO_ME)


[duplication] ~100-~100: Possible typo: you repeated a word.
Context: ...mand - PostgreSQL broadcasts to all listeners - Listeners receive and process notifications 4. Co...

(ENGLISH_WORD_REPEAT_RULE)


[style] ~106-~106: The phrase ‘feel free to’ is used quite frequently. Consider using a less frequent alternative to set your writing apart from others and make it sound more professional.
Context: ...via WebSocket/SSE --- ## Contributing Feel free to submit issues and enhancement requests!...

(FEEL_FREE_TO_STYLE_ME)

🪛 yamllint (1.35.1)
r2dbc/boot-r2dbc-notify-listen/docker/docker-compose-app.yml

[error] 1-1: wrong new line character: expected \n

(new-lines)

r2dbc/boot-r2dbc-notify-listen/docker/docker-compose.yml

[error] 1-1: wrong new line character: expected \n

(new-lines)


[error] 42-42: no new line character at the end of file

(new-line-at-end-of-file)

🔇 Additional comments (8)
r2dbc/boot-r2dbc-notify-listen/mvnw.cmd (1)

1-149: LGTM! Maven wrapper script is properly implemented.

The script follows best practices with proper error handling, platform-specific adaptations, and comprehensive documentation.

r2dbc/boot-r2dbc-notify-listen/mvnw (1)

1-259: LGTM! Maven wrapper script is secure and well-implemented.

The script includes proper error handling, security measures (SHA-256 validation), and follows shell scripting best practices.

r2dbc/boot-r2dbc-notify-listen/docker/docker-compose-app.yml (1)

7-8: Review debug port exposure for production deployment.

Port 18787 appears to be a debug port. While useful in development, exposing debug ports in production could pose security risks.

Consider adding a conditional configuration to expose this port only in development environments.

r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (1)

21-22: Review DEBUG logging configuration.

DEBUG level logging is enabled for application and reactor packages. This could impact performance in production environments.

Consider making the log levels configurable per environment:

-logging.level.com.example.demo=DEBUG
-logging.level.reactor.util.Loggers=DEBUG
+logging.level.com.example.demo=${LOG_LEVEL:INFO}
+logging.level.reactor.util.Loggers=${REACTOR_LOG_LEVEL:INFO}
r2dbc/boot-r2dbc-notify-listen/pom.xml (1)

5-10: Verify stability of Spring Boot 3.4.1 milestone version.

The project uses Spring Boot 3.4.1 which appears to be a milestone version (based on the milestones repository). Consider using the latest stable version for production use.

Run this script to check the latest stable version:

pom.xml (1)

53-53: LGTM!

The new module is correctly added to the parent POM.

SUMMARY.md (1)

46-46: LGTM!

The new documentation entry is correctly added to the table of contents.

🧰 Tools
🪛 LanguageTool

[grammar] ~46-~46: It looks like there is a word missing here. Did you mean “Listen to support”?
Context: ...n/README.md) * [PostgreSQL Notify and Listen support using reactive spring boot](r2dbc/boot-...

(LISTEN_TO_ME)

r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/common/ContainerConfig.java (1)

15-15: Consider version pinning strategy.

The PostgreSQL version 17.2-alpine is very recent. Consider:

  1. Documenting the minimum required version for NOTIFY/LISTEN support
  2. Adding version compatibility tests
  3. Using a version constant for easier updates

Run this script to check PostgreSQL version compatibility:

Comment on lines +10 to +12
@SpringBootTest(webEnvironment = RANDOM_PORT, classes = ContainerConfig.class)
@AutoConfigureWebTestClient
public abstract class AbstractIntegrationTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding NOTIFY/LISTEN specific test configurations.

The current setup only includes web test client configuration. For comprehensive NOTIFY/LISTEN testing, consider:

  1. Adding R2DBC connection configuration
  2. Including test utilities for notification verification

Add these annotations:

 @SpringBootTest(webEnvironment = RANDOM_PORT, classes = ContainerConfig.class)
 @AutoConfigureWebTestClient
+@AutoConfigureR2dbc
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@SpringBootTest(webEnvironment = RANDOM_PORT, classes = ContainerConfig.class)
@AutoConfigureWebTestClient
public abstract class AbstractIntegrationTest {
@SpringBootTest(webEnvironment = RANDOM_PORT, classes = ContainerConfig.class)
@AutoConfigureWebTestClient
@AutoConfigureR2dbc
public abstract class AbstractIntegrationTest {

Comment on lines +20 to +26
@Bean
PostgresqlConnection postgresqlConnection() {
return Mono.from(connectionFactory.create())
.flatMap(this::unwrapToPostgresqlConnection)
.blockOptional()
.orElseThrow(() -> new IllegalStateException("Failed to create PostgresqlConnection"));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid blocking operations in connection creation

The use of blockOptional() in a reactive pipeline introduces blocking behavior, which goes against reactive programming principles. Consider restructuring to maintain reactive flow.

Replace with a reactive approach:

 @Bean
-PostgresqlConnection postgresqlConnection() {
-    return Mono.from(connectionFactory.create())
-            .flatMap(this::unwrapToPostgresqlConnection)
-            .blockOptional()
-            .orElseThrow(() -> new IllegalStateException("Failed to create PostgresqlConnection"));
+Mono<PostgresqlConnection> postgresqlConnection() {
+    return Mono.from(connectionFactory.create())
+            .flatMap(this::unwrapToPostgresqlConnection)
+            .switchIfEmpty(Mono.error(new IllegalStateException("Failed to create PostgresqlConnection")));
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Bean
PostgresqlConnection postgresqlConnection() {
return Mono.from(connectionFactory.create())
.flatMap(this::unwrapToPostgresqlConnection)
.blockOptional()
.orElseThrow(() -> new IllegalStateException("Failed to create PostgresqlConnection"));
}
@Bean
Mono<PostgresqlConnection> postgresqlConnection() {
return Mono.from(connectionFactory.create())
.flatMap(this::unwrapToPostgresqlConnection)
.switchIfEmpty(Mono.error(new IllegalStateException("Failed to create PostgresqlConnection")));
}

Comment on lines 31 to 38
receiver.getNotifications()
.delayElements(Duration.ofSeconds(1))
.subscribe(notification -> log.info(
"Received notification: with name :{} , processId :{} and parameter :{}",
notification.getName(),
notification.getProcessId(),
notification.getParameter()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve notification handling configuration

The delay duration is hardcoded and the subscription lacks error handling. Consider making the delay configurable and adding proper error handling.

Enhance the notification handling:

+@Value("${notification.delay-seconds:1}")
+private int delaySeconds;

 receiver.getNotifications()
-        .delayElements(Duration.ofSeconds(1))
+        .delayElements(Duration.ofSeconds(delaySeconds))
         .subscribe(
             notification -> log.info(
                 "Received notification: with name :{} , processId :{} and parameter :{}",
                 notification.getName(),
                 notification.getProcessId(),
                 notification.getParameter()),
+            error -> log.error("Error processing notification: ", error)
         );

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 23 to 30
@PostConstruct
public void initialize() {
receiver.createStatement("LISTEN mymessage")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.log("listen::")
.subscribe();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling to LISTEN subscription

The subscription lacks error handling which could lead to silent failures.

Add error handling:

 @PostConstruct
 public void initialize() {
     receiver.createStatement("LISTEN mymessage")
             .execute()
             .flatMap(PostgresqlResult::getRowsUpdated)
             .log("listen::")
-            .subscribe();
+            .subscribe(
+                null,
+                error -> log.error("Failed to initialize LISTEN: ", error)
+            );

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 24 to 31
public Mono<Void> send() {
return sender.createStatement(
MessageFormat.format("NOTIFY mymessage, ''Hello world at {0}''", LocalDateTime.now()))
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.doOnNext(rows -> log.info("Notification sent, rows updated: {}", rows))
.then();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent SQL injection in notification message

Using MessageFormat with raw string concatenation could lead to SQL injection. Consider using prepared statements.

Use parameterized query:

 public Mono<Void> send() {
-    return sender.createStatement(
-            MessageFormat.format("NOTIFY mymessage, ''Hello world at {0}''", LocalDateTime.now()))
+    return sender.createStatement("NOTIFY mymessage, $1")
+            .bind("$1", "Hello world at " + LocalDateTime.now())
             .execute()
             .flatMap(PostgresqlResult::getRowsUpdated)
             .doOnNext(rows -> log.info("Notification sent, rows updated: {}", rows))
             .then();
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public Mono<Void> send() {
return sender.createStatement(
MessageFormat.format("NOTIFY mymessage, ''Hello world at {0}''", LocalDateTime.now()))
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.doOnNext(rows -> log.info("Notification sent, rows updated: {}", rows))
.then();
}
public Mono<Void> send() {
return sender.createStatement("NOTIFY mymessage, $1")
.bind("$1", "Hello world at " + LocalDateTime.now())
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.doOnNext(rows -> log.info("Notification sent, rows updated: {}", rows))
.then();
}

Comment on lines 17 to 21
RouterFunction<ServerResponse> routes(Notifier notifier) {
return route().GET("/hello", request -> notifier.send()
.then(Mono.defer(() -> noContent().build())))
.build();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling to route

The route lacks error handling which could result in unhandled exceptions being propagated to the client.

Add error handling:

 @Bean
 RouterFunction<ServerResponse> routes(Notifier notifier) {
     return route().GET("/hello", request -> notifier.send()
-                    .then(Mono.defer(() -> noContent().build())))
+                    .then(Mono.defer(() -> noContent().build()))
+                    .onErrorResume(error -> {
+                        log.error("Failed to send notification: ", error);
+                        return ServerResponse.status(500)
+                            .bodyValue("Failed to send notification");
+                    }))
             .build();
 }

Committable suggestion skipped: line range outside the PR's diff.

@rajadilipkolli
Copy link
Owner Author

@coderabbitai, for below NotificationService , please help me to write listener service to listen the data

// src/main/java/com/example/notify/service/NotificationService.java
package com.example.notify.service;

import lombok.RequiredArgsConstructor;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
@RequiredArgsConstructor
public class NotificationService {

    private final DatabaseClient databaseClient;

    public Mono<Void> notify(String channel, String message) {
        return databaseClient.sql("SELECT pg_notify(:channel, :message)")
                .bind("channel", channel)
                .bind("message", message)
                .fetch()
                .first()
                .then();
    }
}

Copy link
Contributor

coderabbitai bot commented Jan 12, 2025

Tip

For best results, initiate chat on the files or code changes.

@rajadilipkolli Here's the implementation of the notification listener service:

// src/main/java/com/example/notify/event/NotificationEvent.java
package com.example.notify.event;

public record NotificationEvent(
    String channel,
    String message,
    Integer pid
) {}
// src/main/java/com/example/notify/service/NotificationListener.java
package com.example.notify.service;

import com.example.notify.event.NotificationEvent;
import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.api.PostgresqlResult;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@Slf4j
@Service
@RequiredArgsConstructor
public class NotificationListener {

    private final PostgresqlConnection connection;
    private final ApplicationEventPublisher eventPublisher;
    private final Sinks.Many<NotificationEvent> notificationSink = Sinks.many().multicast().onBackpressureBuffer();

    @PostConstruct
    public void initialize() {
        // Subscribe to notifications
        connection.getNotifications()
            .doOnNext(notification -> {
                var event = new NotificationEvent(
                    notification.getName(),
                    notification.getParameter(),
                    notification.getProcessId()
                );
                // Publish Spring event
                eventPublisher.publishEvent(event);
                // Emit to reactive stream
                notificationSink.tryEmitNext(event);
            })
            .doOnError(error -> log.error("Error receiving notification", error))
            .subscribe();
    }

    public Mono<Void> listenTo(String channel) {
        return Mono.from(connection.createStatement("LISTEN " + channel)
            .execute())
            .doOnSuccess(result -> log.info("Listening on channel: {}", channel))
            .doOnError(error -> log.error("Error listening to channel: {}", channel, error))
            .then();
    }

    public Flux<NotificationEvent> getNotificationStream() {
        return notificationSink.asFlux();
    }

    public Flux<NotificationEvent> getNotificationStream(String channel) {
        return notificationSink.asFlux()
            .filter(event -> event.channel().equals(channel));
    }

    @PreDestroy
    public void cleanup() {
        connection.close()
            .subscribe(
                null,
                error -> log.error("Error closing connection", error),
                () -> log.info("Notification listener connection closed")
            );
    }
}

This implementation:

  1. Creates a NotificationEvent record to represent notifications
  2. Uses PostgresqlConnection for low-level notification handling
  3. Publishes notifications as Spring events and through reactive streams
  4. Provides methods to:
    • Listen to specific channels
    • Get all notifications as a Flux
    • Get channel-specific notifications
  5. Handles connection lifecycle with @PostConstruct and @PreDestroy

Usage example:

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationListener notificationListener;

    @GetMapping(path = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<NotificationEvent> streamNotifications() {
        return notificationListener.getNotificationStream();
    }

    @GetMapping(path = "/notifications/{channel}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<NotificationEvent> streamChannelNotifications(@PathVariable String channel) {
        return notificationListener.listenTo(channel)
            .thenMany(notificationListener.getNotificationStream(channel));
    }
}

Would you like me to add any additional features or make any modifications?


✏️ Learnings added
Learnt from: rajadilipkolli
URL: https://github.com/rajadilipkolli/my-spring-boot-experiments/pull/1639

Timestamp: 2025-01-12T10:45:47.838Z
Learning: The NotificationListener service in Spring Boot R2DBC applications should handle PostgreSQL notifications by:
1. Using PostgresqlConnection for low-level notification handling
2. Publishing notifications as both Spring events and reactive streams
3. Managing connection lifecycle with @PostConstruct and @PreDestroy
4. Providing methods to listen to specific channels and get notifications as Flux

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


spring.r2dbc.url=r2dbc:postgresql://localhost:5432/appdb
spring.r2dbc.username=appuser
spring.r2dbc.password=secret

Check failure

Code scanning / SonarCloud

Credentials should not be hard-coded High

Revoke and change this password, as it is compromised. See more on SonarQube Cloud
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (2)
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (1)

7-11: ⚠️ Potential issue

Restrict management endpoint exposure

Exposing all management endpoints (management.endpoints.web.exposure.include=*) could pose security risks. Consider exposing only necessary endpoints.

-management.endpoints.web.exposure.include=*
+management.endpoints.web.exposure.include=health,info,metrics
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java (1)

29-36: ⚠️ Potential issue

Fix SQL injection vulnerability in notification message

Using MessageFormat with raw string concatenation could lead to SQL injection. Use parameterized queries instead.

 public Mono<Void> send() {
-    return sender.createStatement(
-            MessageFormat.format("NOTIFY mymessage, ''Hello world at {0}''", LocalDateTime.now()))
+    return sender.createStatement("NOTIFY mymessage, $1")
+            .bind("$1", "Hello world at " + LocalDateTime.now())
             .execute()
             .flatMap(PostgresqlResult::getRowsUpdated)
             .doOnNext(rows -> log.info("Notification sent, rows updated: {}", rows))
             .then();
 }
🧹 Nitpick comments (5)
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (2)

16-17: Configure environment-specific logging levels

DEBUG level logging can impact performance and potentially expose sensitive information in production.

Consider moving these to application-dev.properties:

-logging.level.com.example.demo=DEBUG
-logging.level.reactor.util.Loggers=DEBUG
+logging.level.com.example.demo=${LOG_LEVEL:INFO}
+logging.level.reactor.util.Loggers=${REACTOR_LOG_LEVEL:INFO}

19-19: Move test configuration to test properties

Test-specific configurations should be separated from main application properties.

Move this line to application-test.properties:

-spring.testcontainers.beans.startup=parallel
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationListener.java (1)

63-69: Avoid passing null to subscribe() in the cleanup() method

Passing null as a parameter to subscribe() is not recommended and can lead to potential NullPointerException. Instead, use method references or lambda expressions to handle the callbacks appropriately.

Apply this diff to improve the code:

 public void cleanup() {
     connection
             .close()
-            .subscribe(
-                    null,
-                    error -> log.error("Error closing connection", error),
-                    () -> log.info("Notification listener connection closed"));
+            .doOnError(error -> log.error("Error closing connection", error))
+            .doOnSuccess(unused -> log.info("Notification listener connection closed"))
+            .subscribe();
 }
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java (2)

24-27: Rename parameter to match field name

The constructor parameter name receiver doesn't match the field name sender, which could be confusing.

-public Notifier(PostgresqlConnection receiver, DatabaseClient databaseClient) {
+public Notifier(PostgresqlConnection sender, DatabaseClient databaseClient) {

21-22: Consider adding connection health check

Add a method to verify the connection status and reconnect if necessary.

private Mono<Void> ensureConnection() {
    return Mono.fromSupplier(() -> !sender.isAutoCommit())
        .flatMap(isValid -> isValid ? Mono.empty() : 
            sender.close()
                .then(Mono.error(new IllegalStateException("Connection lost, please retry")))
        );
}

Then use it in your operations:

 public Mono<Void> send() {
-    return sender.createStatement(
+    return ensureConnection()
+        .then(sender.createStatement(
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 649bb6b and a673f5e.

📒 Files selected for processing (9)
  • r2dbc/boot-r2dbc-notify-listen/pom.xml (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/config/WebRouterConfig.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationController.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationEvent.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationListener.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/NotificationRequest.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/common/ContainerConfig.java (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationEvent.java
🚧 Files skipped from review as they are similar to previous changes (2)
  • r2dbc/boot-r2dbc-notify-listen/src/test/java/com/example/demo/common/ContainerConfig.java
  • r2dbc/boot-r2dbc-notify-listen/pom.xml
🧰 Additional context used
📓 Learnings (3)
📓 Common learnings
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#0
File: :0-0
Timestamp: 2025-01-12T10:45:47.987Z
Learning: The NotificationListener service in Spring Boot R2DBC applications should handle PostgreSQL notifications by:
1. Using PostgresqlConnection for low-level notification handling
2. Publishing notifications as both Spring events and reactive streams
3. Managing connection lifecycle with @PostConstruct and @PreDestroy
4. Providing methods to listen to specific channels and get notifications as Flux
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationListener.java (1)
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#0
File: :0-0
Timestamp: 2025-01-12T10:45:47.987Z
Learning: The NotificationListener service in Spring Boot R2DBC applications should handle PostgreSQL notifications by:
1. Using PostgresqlConnection for low-level notification handling
2. Publishing notifications as both Spring events and reactive streams
3. Managing connection lifecycle with @PostConstruct and @PreDestroy
4. Providing methods to listen to specific channels and get notifications as Flux
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java (1)
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#0
File: :0-0
Timestamp: 2025-01-12T10:45:47.987Z
Learning: The NotificationListener service in Spring Boot R2DBC applications should handle PostgreSQL notifications by:
1. Using PostgresqlConnection for low-level notification handling
2. Publishing notifications as both Spring events and reactive streams
3. Managing connection lifecycle with @PostConstruct and @PreDestroy
4. Providing methods to listen to specific channels and get notifications as Flux
🪛 GitHub Check: SonarCloud
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties

[failure] 5-5: Credentials should not be hard-coded

Revoke and change this password, as it is compromised.

See more on SonarQube Cloud

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (5)
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (1)

1-1: LGTM! Descriptive application name.

The application name clearly indicates its purpose and follows naming conventions.

r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/NotificationRequest.java (1)

5-5: LGTM!

The NotificationRequest record is appropriately defined with validation annotations.

r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationController.java (1)

18-21: LGTM!

The /notifications endpoint correctly streams all notifications.

r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/config/WebRouterConfig.java (2)

21-22: ⚠️ Potential issue

Add error handling to the /hello endpoint

The route lacks error handling which could result in unhandled exceptions being propagated to the client.

Add error handling:

 return route().GET("/hello", request -> notifier.send()
-                    .then(Mono.defer(() -> noContent().build())))
+                    .then(Mono.defer(() -> noContent().build()))
+                    .onErrorResume(error -> {
+                        log.error("Error sending notification", error);
+                        return ServerResponse.internalServerError()
+                            .bodyValue("Failed to send notification");
+                    }))

Likely invalid or redundant comment.


23-23: ⚠️ Potential issue

Add error handling to the /api/notify endpoint

The notifyData endpoint needs error handling for invalid requests and database errors.

Add error handling:

-.POST("/api/notify", contentType(APPLICATION_JSON).and(accept(APPLICATION_JSON)), notifier::notifyData)
+.POST("/api/notify", contentType(APPLICATION_JSON).and(accept(APPLICATION_JSON)), 
+    request -> notifier.notifyData(request)
+        .onErrorResume(error -> {
+            log.error("Error processing notification", error);
+            return ServerResponse.internalServerError()
+                .bodyValue("Failed to process notification");
+        }))

Likely invalid or redundant comment.

Comment on lines 1 to 19
spring.application.name=notify-listen-postgresql

spring.r2dbc.url=r2dbc:postgresql://localhost:5432/appdb
spring.r2dbc.username=appuser
spring.r2dbc.password=secret

management.endpoints.web.exposure.include=*
management.endpoint.health.probes.add-additional-paths=true
management.endpoint.health.probes.enabled=true
management.endpoint.health.show-components=always
management.endpoint.health.show-details=always

#
# Logging level
#
logging.level.com.example.demo=DEBUG
logging.level.reactor.util.Loggers=DEBUG

spring.testcontainers.beans.startup=parallel
Copy link
Contributor

@coderabbitai coderabbitai bot Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add notification channel configuration

Based on the PR objectives to implement PostgreSQL NOTIFY/LISTEN support, consider adding configuration properties for notification channels.

Add these properties:

# Notification Configuration
app.notification.channel-name=${NOTIFICATION_CHANNEL:default_channel}
app.notification.listener.enabled=true
app.notification.connection-pool-size=10
🧰 Tools
🪛 GitHub Check: SonarCloud

[failure] 5-5: Credentials should not be hard-coded

Revoke and change this password, as it is compromised.

See more on SonarQube Cloud

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai, how to configure Notification listener based on above properties

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajadilipkolli Here's how to configure the Notification listener using the properties:

  1. First, create a configuration properties class:
@ConfigurationProperties(prefix = "app.notification")
@Validated
public record NotificationProperties(
    @NotBlank String channelName,
    @NotNull Boolean listenerEnabled,
    @Min(1) @Max(100) Integer connectionPoolSize
) {}
  1. Enable the properties in your main application class:
@EnableConfigurationProperties(NotificationProperties.class)
@SpringBootApplication
public class NotifyListenPGApplication {
    // ... existing code
}
  1. Modify the NotificationListener to use these properties:
@Service
@ConditionalOnProperty(value = "app.notification.listener.enabled", havingValue = "true")
public class NotificationListener {
    private final PostgresqlConnection connection;
    private final ApplicationEventPublisher eventPublisher;
    private final NotificationProperties properties;
    private final Sinks.Many<NotificationEvent> notificationSink;

    public NotificationListener(PostgresqlConnection connection, 
                              ApplicationEventPublisher eventPublisher,
                              NotificationProperties properties) {
        this.connection = connection;
        this.eventPublisher = eventPublisher;
        this.properties = properties;
        this.notificationSink = Sinks.many().multicast().onBackpressureBuffer();
    }

    @PostConstruct
    public void initialize() {
        // Listen to the configured channel by default
        listenTo(properties.channelName())
            .subscribe(null, 
                error -> log.error("Error listening to channel: {}", error.getMessage()),
                () -> log.info("Listening to channel: {}", properties.channelName())
            );

        connection.getNotifications()
            .delayElements(Duration.ofSeconds(1))
            .map(notification -> new NotificationEvent(
                notification.getName(),
                notification.getParameter(),
                notification.getProcessId()
            ))
            .doOnNext(event -> {
                eventPublisher.publishEvent(event);
                notificationSink.tryEmitNext(event);
            })
            .subscribe();
    }
    // ... rest of the existing code
}
  1. Update the PostgresqlConnectionConfig to use the connection pool size:
@Configuration
public class PostgresqlConnectionConfig {
    private final ConnectionFactory connectionFactory;
    private final NotificationProperties properties;

    public PostgresqlConnectionConfig(ConnectionFactory connectionFactory,
                                    NotificationProperties properties) {
        this.connectionFactory = connectionFactory;
        this.properties = properties;
    }

    @Bean
    public PostgresqlConnection postgresqlConnection() {
        ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder()
            .maxSize(properties.connectionPoolSize())
            .build();
        
        ConnectionPool pool = new ConnectionPool(configuration, connectionFactory);
        
        return Mono.from(pool.create())
            .flatMap(this::unwrapToPostgresqlConnection)
            .block();
    }
    // ... rest of the existing code
}

Now you can control the notification listener's behavior through the properties in your application.properties file. The listener will only be created if app.notification.listener.enabled=true, and it will listen to the channel specified by app.notification.channel-name.


✏️ Learnings added
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#1639
File: r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties:1-19
Timestamp: 2025-01-12T11:22:51.833Z
Learning: The NotificationListener in Spring Boot applications can be configured using properties:
- `app.notification.channel-name`: Specifies the default channel to listen to
- `app.notification.listener.enabled`: Controls whether the listener is enabled
- `app.notification.connection-pool-size`: Controls the R2DBC connection pool size
The configuration is bound using @ConfigurationProperties and @EnableConfigurationProperties annotations.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +46 to +50
public Mono<Void> listenTo(String channel) {
return Mono.from(connection.createStatement("LISTEN " + channel).execute())
.doOnSuccess(result -> log.info("Listening on channel: {}", channel))
.doOnError(error -> log.error("Error listening to channel: {}", channel, error))
.then();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate or sanitize the channel parameter to prevent SQL injection

The listenTo method constructs a SQL statement by directly concatenating the channel parameter, which can lead to SQL injection if channel is derived from untrusted input. Since identifiers cannot be parameterized in SQL statements, it's important to validate or sanitize the channel parameter to ensure it contains only allowed characters (e.g., letters, numbers, underscores).

You can apply input validation to the channel parameter before using it in the SQL statement. For example:

 public Mono<Void> listenTo(String channel) {
+    if (!channel.matches("[a-zA-Z0-9_]+")) {
+        return Mono.error(new IllegalArgumentException("Invalid channel name"));
+    }
     return Mono.from(connection.createStatement("LISTEN " + channel).execute())
             .doOnSuccess(result -> log.info("Listening on channel: {}", channel))
             .doOnError(error -> log.error("Error listening to channel: {}", channel, error))
             .then();
 }

This ensures that only valid channel names are accepted, preventing SQL injection.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public Mono<Void> listenTo(String channel) {
return Mono.from(connection.createStatement("LISTEN " + channel).execute())
.doOnSuccess(result -> log.info("Listening on channel: {}", channel))
.doOnError(error -> log.error("Error listening to channel: {}", channel, error))
.then();
public Mono<Void> listenTo(String channel) {
if (!channel.matches("[a-zA-Z0-9_]+")) {
return Mono.error(new IllegalArgumentException("Invalid channel name"));
}
return Mono.from(connection.createStatement("LISTEN " + channel).execute())
.doOnSuccess(result -> log.info("Listening on channel: {}", channel))
.doOnError(error -> log.error("Error listening to channel: {}", channel, error))
.then();

Comment on lines +24 to +25
public Flux<NotificationEvent> streamChannelNotifications(@PathVariable String channel) {
return notificationListener.listenTo(channel).thenMany(notificationListener.getNotificationStream(channel));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate the channel path variable to prevent potential injection attacks

The channel path variable in the streamChannelNotifications endpoint is used in the listenTo method, which constructs a SQL statement. To prevent potential injection attacks, it's important to validate or sanitize the channel parameter.

Consider adding validation to ensure channel contains only allowed characters. For example:

+import org.springframework.validation.annotation.Validated;
+import jakarta.validation.constraints.Pattern;

+@Validated
 public class NotificationController {

     //...

     @GetMapping(path = "/notifications/{channel}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
-    public Flux<NotificationEvent> streamChannelNotifications(@PathVariable String channel) {
+    public Flux<NotificationEvent> streamChannelNotifications(@PathVariable @Pattern(regexp = "[a-zA-Z0-9_]+") String channel) {
         return notificationListener.listenTo(channel).thenMany(notificationListener.getNotificationStream(channel));
     }
 }

Alternatively, you can perform manual validation:

 public Flux<NotificationEvent> streamChannelNotifications(@PathVariable String channel) {
+    if (!channel.matches("[a-zA-Z0-9_]+")) {
+        return Flux.error(new IllegalArgumentException("Invalid channel name"));
+    }
     return notificationListener.listenTo(channel).thenMany(notificationListener.getNotificationStream(channel));
 }

This ensures that only valid channel names are accepted.

Committable suggestion skipped: line range outside the PR's diff.


public Mono<ServerResponse> notifyData(ServerRequest req) {
return req.bodyToMono(NotificationRequest.class)
.flatMap(notificationRequest -> databaseClient
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai, please add a check that if the channel is being listened to else register it for listening

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajadilipkolli Here's the updated Notifier class with channel registration check:

@Component
public class Notifier {

    private static final Logger log = LoggerFactory.getLogger(Notifier.class);

    private final PostgresqlConnection sender;
    private final DatabaseClient databaseClient;
    private final NotificationListener notificationListener;

    public Notifier(PostgresqlConnection receiver, DatabaseClient databaseClient, NotificationListener notificationListener) {
        this.sender = receiver;
        this.databaseClient = databaseClient;
        this.notificationListener = notificationListener;
    }

    public Mono<Void> send() {
        return notificationListener.listenTo("mymessage")
                .then(sender.createStatement(
                        MessageFormat.format("NOTIFY mymessage, ''Hello world at {0}''", LocalDateTime.now()))
                .execute()
                .flatMap(PostgresqlResult::getRowsUpdated)
                .doOnNext(rows -> log.info("Notification sent, rows updated: {}", rows))
                .then());
    }

    @PreDestroy
    public void destroy() {
        sender.close().doOnSuccess(v -> log.info("Notifier connection closed")).subscribe();
    }

    public Mono<ServerResponse> notifyData(ServerRequest req) {
        return req.bodyToMono(NotificationRequest.class)
                .flatMap(notificationRequest -> notificationListener
                        .listenTo(notificationRequest.channel())
                        .then(databaseClient
                                .sql("NOTIFY " + notificationRequest.channel() + ", '" + notificationRequest.message() + "'")
                                .fetch()
                                .rowsUpdated()
                                .flatMap(rowsUpdated -> ServerResponse.ok().bodyValue(rowsUpdated))));
    }
}

The changes ensure that:

  1. The channel is registered for listening before sending notifications
  2. Both send() and notifyData() methods now check and register channels
  3. The notification is sent only after successful channel registration

✏️ Learnings added
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#1639
File: r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java:45-45
Timestamp: 2025-01-12T11:55:37.786Z
Learning: In PostgreSQL NOTIFY/LISTEN implementation, always ensure that a channel is being listened to before sending notifications to it. Use `listenTo` method to register the channel if it's not already being listened to.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
E Security Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationListener.java (1)

56-61: ⚠️ Potential issue

Validate channel name to prevent SQL injection.

The listenTo method is vulnerable to SQL injection. Add validation for the channel parameter.

 public Mono<Void> listenTo(String channel) {
+    if (!channel.matches("[a-zA-Z0-9_]+")) {
+        return Mono.error(new IllegalArgumentException("Invalid channel name"));
+    }
     return Mono.from(connection.createStatement("LISTEN " + channel).execute())
             .doOnSuccess(result -> log.info("Listening on channel: {}", channel))
             .doOnError(error -> log.error("Error listening to channel: {}", channel, error))
             .then();
 }
🧹 Nitpick comments (5)
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application-local.properties (2)

1-1: Consider adding connection pool configuration.

For better performance and resource management, consider adding R2DBC connection pool settings:

spring.r2dbc.pool.initial-size=5
spring.r2dbc.pool.max-size=10
spring.r2dbc.pool.max-idle-time=30m

1-3: Document these properties in README.

Ensure these configuration properties are documented in the README.md, including:

  • Required/optional status
  • Default values
  • Environment variable overrides
  • Production deployment considerations
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationProperties.java (1)

10-13: Add Javadoc to document configuration properties.

Consider adding Javadoc to document the purpose and constraints of each property:

 @ConfigurationProperties(prefix = "app.notification")
 @Validated
+/**
+ * Configuration properties for PostgreSQL notification listener.
+ *
+ * @param channelName Name of the notification channel to listen to
+ * @param listenerEnabled Flag to enable/disable the notification listener
+ * @param connectionPoolSize Size of the R2DBC connection pool (1-100)
+ */
 public record NotificationProperties(
         @NotBlank String channelName,
         @NotNull Boolean listenerEnabled,
         @Min(1) @Max(100) Integer connectionPoolSize) {}
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationListener.java (2)

34-54: Improve error handling in initialize method.

The current implementation has several areas for improvement:

  1. Subscription error handling is minimal
  2. No retry mechanism for connection issues
  3. No backpressure strategy for notification processing

Consider this improved implementation:

 @PostConstruct
 public void initialize() {
     // Listen to the configured channel by default
     listenTo(properties.channelName())
-            .doOnError(error -> log.error("Error listening to channel: {}", error.getMessage()))
-            .doOnSuccess(notificationEvent -> log.info("Listening to channel: {}", notificationEvent))
-            .subscribe();
+            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
+            .doOnError(error -> log.error("Failed to listen to channel after retries: {}", error.getMessage()))
+            .doOnSuccess(notificationEvent -> log.info("Listening to channel: {}", notificationEvent))
+            .subscribe(null, error -> log.error("Fatal error in channel subscription", error));

     // Subscribe to notifications
     connection
             .getNotifications()
+            .limitRate(100)
             .doOnNext(notification -> {
                 var event = new NotificationEvent(
                         notification.getName(), notification.getParameter(), notification.getProcessId());
                 // Publish Spring event
                 eventPublisher.publishEvent(event);
                 // Emit to reactive stream
-                notificationSink.tryEmitNext(event);
+                Sinks.EmitResult result = notificationSink.tryEmitNext(event);
+                if (result.isFailure()) {
+                    log.error("Failed to emit notification: {}", result);
+                }
             })
             .doOnError(error -> log.error("Error receiving notification", error))
-            .subscribe();
+            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
+            .subscribe(null, error -> log.error("Fatal error in notification subscription", error));

71-78: Ensure proper cleanup of resources.

The cleanup method should:

  1. Unsubscribe from all channels
  2. Complete the sink
  3. Wait for the connection to close
 @PreDestroy
 public void cleanup() {
+    // Complete the sink
+    notificationSink.tryEmitComplete();
+    
+    // Unsubscribe from the channel
+    Mono.from(connection.createStatement("UNLISTEN " + properties.channelName()).execute())
+        .then(connection.close())
-    connection
-            .close()
             .doOnError(error -> log.error("Error closing connection", error))
             .doOnSuccess(unused -> log.info("Notification listener connection closed"))
-            .subscribe();
+            .block(Duration.ofSeconds(5));
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a673f5e and 0357745.

📒 Files selected for processing (7)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/NotifyListenPGApplication.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/config/WebRouterConfig.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationListener.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationProperties.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/resources/application-local.properties (1 hunks)
  • r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/NotifyListenPGApplication.java
  • r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/config/WebRouterConfig.java
🧰 Additional context used
📓 Learnings (5)
📓 Common learnings
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#0
File: :0-0
Timestamp: 2025-01-12T10:45:47.987Z
Learning: The NotificationListener service in Spring Boot R2DBC applications should handle PostgreSQL notifications by:
1. Using PostgresqlConnection for low-level notification handling
2. Publishing notifications as both Spring events and reactive streams
3. Managing connection lifecycle with @PostConstruct and @PreDestroy
4. Providing methods to listen to specific channels and get notifications as Flux
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationProperties.java (1)
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#1639
File: r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties:1-19
Timestamp: 2025-01-12T11:22:52.010Z
Learning: The NotificationListener in Spring Boot applications can be configured using properties:
- `app.notification.channel-name`: Specifies the default channel to listen to
- `app.notification.listener.enabled`: Controls whether the listener is enabled
- `app.notification.connection-pool-size`: Controls the R2DBC connection pool size
The configuration is bound using @ConfigurationProperties and @EnableConfigurationProperties annotations.
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (2)
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#1639
File: r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties:1-19
Timestamp: 2025-01-12T11:22:52.010Z
Learning: The NotificationListener in Spring Boot applications can be configured using properties:
- `app.notification.channel-name`: Specifies the default channel to listen to
- `app.notification.listener.enabled`: Controls whether the listener is enabled
- `app.notification.connection-pool-size`: Controls the R2DBC connection pool size
The configuration is bound using @ConfigurationProperties and @EnableConfigurationProperties annotations.
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#0
File: :0-0
Timestamp: 2025-01-12T10:45:47.987Z
Learning: The NotificationListener service in Spring Boot R2DBC applications should handle PostgreSQL notifications by:
1. Using PostgresqlConnection for low-level notification handling
2. Publishing notifications as both Spring events and reactive streams
3. Managing connection lifecycle with @PostConstruct and @PreDestroy
4. Providing methods to listen to specific channels and get notifications as Flux
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java (1)
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#1639
File: r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java:45-45
Timestamp: 2025-01-12T11:55:38.018Z
Learning: In PostgreSQL NOTIFY/LISTEN implementation, always ensure that a channel is being listened to before sending notifications to it. Use `listenTo` method to register the channel if it's not already being listened to.
r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/listener/NotificationListener.java (2)
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#0
File: :0-0
Timestamp: 2025-01-12T10:45:47.987Z
Learning: The NotificationListener service in Spring Boot R2DBC applications should handle PostgreSQL notifications by:
1. Using PostgresqlConnection for low-level notification handling
2. Publishing notifications as both Spring events and reactive streams
3. Managing connection lifecycle with @PostConstruct and @PreDestroy
4. Providing methods to listen to specific channels and get notifications as Flux
Learnt from: rajadilipkolli
PR: rajadilipkolli/my-spring-boot-experiments#1639
File: r2dbc/boot-r2dbc-notify-listen/src/main/java/com/example/demo/notifier/Notifier.java:45-45
Timestamp: 2025-01-12T11:55:38.018Z
Learning: In PostgreSQL NOTIFY/LISTEN implementation, always ensure that a channel is being listened to before sending notifications to it. Use `listenTo` method to register the channel if it's not already being listened to.
🪛 GitHub Check: SonarCloud
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties

[failure] 5-5: Credentials should not be hard-coded

Revoke and change this password, as it is compromised.

See more on SonarQube Cloud

⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (1)
r2dbc/boot-r2dbc-notify-listen/src/main/resources/application.properties (1)

3-7: Restrict management endpoint exposure.

Exposing all management endpoints (management.endpoints.web.exposure.include=*) could pose security risks. Consider exposing only necessary endpoints.

-management.endpoints.web.exposure.include=*
+management.endpoints.web.exposure.include=health,info,metrics
🧰 Tools
🪛 GitHub Check: SonarCloud

[failure] 5-5: Credentials should not be hard-coded

Revoke and change this password, as it is compromised.

See more on SonarQube Cloud

Comment on lines +20 to +32
public Mono<ServerResponse> notifyData(ServerRequest req) {
return req.bodyToMono(NotificationRequest.class).flatMap(notificationRequest -> databaseClient
.sql("SELECT pg_notify(:channel, :message)")
.bind("channel", notificationRequest.channel())
.bind("message", notificationRequest.message())
.fetch()
.first()
.then(ServerResponse.ok()
.bodyValue(Map.of(
"status", "success",
"message", "Notification sent",
"channel", notificationRequest.channel()))));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve notification handling with validation, channel registration, and error handling

The current implementation needs several improvements for production readiness:

  1. Channel registration check (from learnings)
  2. Input validation
  3. Error handling
  4. Logging for operational visibility

Update the method implementation:

 public Mono<ServerResponse> notifyData(ServerRequest req) {
+    log.debug("Processing notification request");
     return req.bodyToMono(NotificationRequest.class)
+            .doOnNext(this::validateRequest)
+            .flatMap(notificationRequest -> notificationListener
+                    .listenTo(notificationRequest.channel())
+                    .then(Mono.defer(() -> {
+                        log.debug("Channel {} registered, sending notification", notificationRequest.channel());
                         return databaseClient
                             .sql("SELECT pg_notify(:channel, :message)")
                             .bind("channel", notificationRequest.channel())
                             .bind("message", notificationRequest.message())
                             .fetch()
                             .first();
+                    })))
+            .doOnError(error -> log.error("Failed to send notification", error))
+            .onErrorResume(error -> ServerResponse.badRequest()
+                    .bodyValue(Map.of("error", error.getMessage())))
             .then(ServerResponse.ok()
                     .bodyValue(Map.of(
                             "status", "success",
                             "message", "Notification sent",
                             "channel", notificationRequest.channel())));
 }

+private void validateRequest(NotificationRequest request) {
+    if (request.channel() == null || request.channel().isBlank()) {
+        throw new IllegalArgumentException("Channel name cannot be empty");
+    }
+    if (request.message() == null || request.message().isBlank()) {
+        throw new IllegalArgumentException("Message cannot be empty");
+    }
+}

Consider adding the following lifecycle management method:

@PreDestroy
public void destroy() {
    connection.close()
        .doOnSuccess(v -> log.info("Notifier connection closed"))
        .doOnError(error -> log.error("Error closing notifier connection", error))
        .block();
}

@rajadilipkolli rajadilipkolli merged commit bbd40ea into main Jan 12, 2025
6 of 9 checks passed
@rajadilipkolli rajadilipkolli deleted the notify-listen branch January 12, 2025 12:05
@rajadilipkolli rajadilipkolli removed their assignment Jan 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant