Skip to content

Commit

Permalink
feat : polish rabbit Settings (#1607)
Browse files Browse the repository at this point in the history
* feat : polish rabbit Settings

* adds more tracking fields

* removes duplicate constraints
  • Loading branch information
rajadilipkolli authored Jan 2, 2025
1 parent 36c6ecb commit 7ee5823
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 56 deletions.
6 changes: 4 additions & 2 deletions boot-rabbitmq-thymeleaf/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand All @@ -13,7 +14,8 @@
<groupId>com.example</groupId>
<artifactId>boot-rabbitmq-thymeleaf</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Spring Boot RabbitMQ POC</name>
<name>boot-rabbitmq-thymeleaf</name>
<description>Spring Boot RabbitMQ POC</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.poc.boot.rabbitmq.config;

import com.poc.boot.rabbitmq.entities.TrackingState;
import com.poc.boot.rabbitmq.repository.TrackingStateRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
Expand All @@ -15,24 +19,24 @@
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.util.Assert;

@Configuration(proxyBeanMethods = false)
public class RabbitMQConfig {

public static final String DLX_ORDERS_EXCHANGE = "DLX.ORDERS.EXCHANGE";

public static final String DLQ_ORDERS_QUEUE = "DLQ.ORDERS.QUEUE";

public static final String ORDERS_QUEUE = "ORDERS.QUEUE";

private static final String ORDERS_EXCHANGE = "ORDERS.EXCHANGE";

private static final String ROUTING_KEY_ORDERS_QUEUE = "ROUTING_KEY_ORDERS_QUEUE";

private final RabbitTemplateConfirmCallback rabbitTemplateConfirmCallback;
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

private final TrackingStateRepository trackingStateRepository;

RabbitMQConfig(RabbitTemplateConfirmCallback rabbitTemplateConfirmCallback) {
this.rabbitTemplateConfirmCallback = rabbitTemplateConfirmCallback;
public RabbitMQConfig(TrackingStateRepository trackingStateRepository) {
this.trackingStateRepository = trackingStateRepository;
}

@Bean
Expand Down Expand Up @@ -79,7 +83,35 @@ Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchan

@Bean
RabbitTemplateCustomizer rabbitTemplateCustomizer() {
return rabbitTemplate -> rabbitTemplate.setConfirmCallback(rabbitTemplateConfirmCallback);
return rabbitTemplate -> {
rabbitTemplate.setConfirmCallback(
(correlationData, ack, cause) -> {
Assert.notNull(correlationData, () -> "correlationData can't be null");
log.info(
"correlation id : {} , acknowledgement : {}, cause : {}",
correlationData.getId(),
ack,
cause);
log.debug(
"persisted correlationId in db : {}",
trackingStateRepository.save(
new TrackingState()
.setCorrelationId(correlationData.getId())
.setAck(ack)
.setCause(cause)
.setStatus("processed")));
});
// This block ensures that returned, un-routable messages are logged.
rabbitTemplate.setReturnsCallback(
returnedMessage ->
log.info(
"Returned: {}\nreplyCode: {}\nreplyText: {}\nexchange/rk: {}/{}",
returnedMessage.getMessage().toString(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey()));
};
}

@Bean
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ public class TrackingState {
@Column(name = "id", nullable = false)
private Long id;

@Column(unique = true, nullable = false)
private String correlationId;

private boolean ack;
private String status = "processed";
private String cause;

public TrackingState() {}

Expand All @@ -39,6 +42,15 @@ public TrackingState setCorrelationId(String correlationId) {
return this;
}

public boolean isAck() {
return ack;
}

public TrackingState setAck(boolean ack) {
this.ack = ack;
return this;
}

public String getStatus() {
return status;
}
Expand All @@ -48,12 +60,23 @@ public TrackingState setStatus(String status) {
return this;
}

public String getCause() {
return cause;
}

public TrackingState setCause(String cause) {
this.cause = cause;
return this;
}

@Override
public String toString() {
return new StringJoiner(", ", TrackingState.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("correlationId='" + correlationId + "'")
.add("ack=" + ack)
.add("status='" + status + "'")
.add("cause='" + cause + "'")
.toString();
}
}
13 changes: 10 additions & 3 deletions boot-rabbitmq-thymeleaf/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@ logging.level.com.poc.boot.rabbitmq=debug

# Additional RabbitMQ properties
spring.rabbitmq.publisher-confirmType=CORRELATED
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.publisher-returns=true

spring.rabbitmq.listener.simple.concurrency=4
spring.rabbitmq.listener.simple.max-concurrency=8
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=1s
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.multiplier=2
spring.rabbitmq.listener.simple.retry.multiplier=1.5
spring.rabbitmq.listener.simple.retry.max-interval=2s
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.observation-enabled=true

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.multiplier=2
spring.rabbitmq.template.retry.multiplier=1.5
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.template.observation-enabled=true

spring.mvc.problemdetails.enabled=true
spring.threads.virtual.enabled=true
Expand All @@ -24,6 +29,8 @@ spring.testcontainers.beans.startup=parallel
################ Actuator #####################
management.endpoints.web.exposure.include=health,info,metrics,prometheus
management.endpoint.health.show-details=always
management.metrics.tags.application=${spring.application.name}
management.metrics.enable.spring.rabbitmq=true

################ Database #####################
spring.jpa.show-sql=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,14 @@
<constraints primaryKey="true" primaryKeyName="tracking_state_id"/>
</column>
<column name="correlation_id" type="${stringType}">
<constraints nullable="false"/>
<constraints nullable="false" unique="true"/>
</column>
<column name="ack" type="boolean" />
<column name="status" type="${stringType}">
<constraints nullable="false"/>
</column>
<column name="cause" type="${stringType}" />
</createTable>
</changeSet>

<changeSet id="1689580674038-1" author="rajakolli">
<createIndex tableName="tracking_state" indexName="tracking_state_correlationId_uc">
<column name="correlation_id"/>
</createIndex>
</changeSet>

</databaseChangeLog>

0 comments on commit 7ee5823

Please sign in to comment.