diff --git a/boot-rabbitmq-thymeleaf/pom.xml b/boot-rabbitmq-thymeleaf/pom.xml index 579d7495e..42e3e15ae 100644 --- a/boot-rabbitmq-thymeleaf/pom.xml +++ b/boot-rabbitmq-thymeleaf/pom.xml @@ -1,7 +1,8 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 @@ -13,7 +14,8 @@ com.example boot-rabbitmq-thymeleaf 0.0.1-SNAPSHOT - Spring Boot RabbitMQ POC + boot-rabbitmq-thymeleaf + Spring Boot RabbitMQ POC UTF-8 diff --git a/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/config/RabbitMQConfig.java b/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/config/RabbitMQConfig.java index 1a6238d86..a2ddd4ca6 100644 --- a/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/config/RabbitMQConfig.java +++ b/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/config/RabbitMQConfig.java @@ -1,5 +1,9 @@ package com.poc.boot.rabbitmq.config; +import com.poc.boot.rabbitmq.entities.TrackingState; +import com.poc.boot.rabbitmq.repository.TrackingStateRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; @@ -15,24 +19,24 @@ import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; +import org.springframework.util.Assert; @Configuration(proxyBeanMethods = false) public class RabbitMQConfig { public static final String DLX_ORDERS_EXCHANGE = "DLX.ORDERS.EXCHANGE"; - public static final String DLQ_ORDERS_QUEUE = "DLQ.ORDERS.QUEUE"; public static final String ORDERS_QUEUE = "ORDERS.QUEUE"; - private static final String ORDERS_EXCHANGE = "ORDERS.EXCHANGE"; - private static final String ROUTING_KEY_ORDERS_QUEUE = "ROUTING_KEY_ORDERS_QUEUE"; - private final RabbitTemplateConfirmCallback rabbitTemplateConfirmCallback; + private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class); + + private final TrackingStateRepository trackingStateRepository; - RabbitMQConfig(RabbitTemplateConfirmCallback rabbitTemplateConfirmCallback) { - this.rabbitTemplateConfirmCallback = rabbitTemplateConfirmCallback; + public RabbitMQConfig(TrackingStateRepository trackingStateRepository) { + this.trackingStateRepository = trackingStateRepository; } @Bean @@ -79,7 +83,35 @@ Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchan @Bean RabbitTemplateCustomizer rabbitTemplateCustomizer() { - return rabbitTemplate -> rabbitTemplate.setConfirmCallback(rabbitTemplateConfirmCallback); + return rabbitTemplate -> { + rabbitTemplate.setConfirmCallback( + (correlationData, ack, cause) -> { + Assert.notNull(correlationData, () -> "correlationData can't be null"); + log.info( + "correlation id : {} , acknowledgement : {}, cause : {}", + correlationData.getId(), + ack, + cause); + log.debug( + "persisted correlationId in db : {}", + trackingStateRepository.save( + new TrackingState() + .setCorrelationId(correlationData.getId()) + .setAck(ack) + .setCause(cause) + .setStatus("processed"))); + }); + // This block ensures that returned, un-routable messages are logged. + rabbitTemplate.setReturnsCallback( + returnedMessage -> + log.info( + "Returned: {}\nreplyCode: {}\nreplyText: {}\nexchange/rk: {}/{}", + returnedMessage.getMessage().toString(), + returnedMessage.getReplyCode(), + returnedMessage.getReplyText(), + returnedMessage.getExchange(), + returnedMessage.getRoutingKey())); + }; } @Bean diff --git a/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/config/RabbitTemplateConfirmCallback.java b/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/config/RabbitTemplateConfirmCallback.java deleted file mode 100644 index 8ec414778..000000000 --- a/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/config/RabbitTemplateConfirmCallback.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.poc.boot.rabbitmq.config; - -import com.poc.boot.rabbitmq.entities.TrackingState; -import com.poc.boot.rabbitmq.repository.TrackingStateRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.amqp.rabbit.connection.CorrelationData; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.stereotype.Component; -import org.springframework.util.Assert; - -@Component -public class RabbitTemplateConfirmCallback implements RabbitTemplate.ConfirmCallback { - - private static final Logger log = LoggerFactory.getLogger(RabbitTemplateConfirmCallback.class); - private final TrackingStateRepository trackingStateRepository; - - public RabbitTemplateConfirmCallback(TrackingStateRepository trackingStateRepository) { - this.trackingStateRepository = trackingStateRepository; - } - - @Override - public void confirm(CorrelationData correlationData, boolean ack, String cause) { - Assert.notNull(correlationData, () -> "correlationData can't be null"); - log.info( - "correlation id : {} , acknowledgement : {}, cause : {}", - correlationData.getId(), - ack, - cause); - log.debug( - "persisted correlationId in db : {}", - this.trackingStateRepository.save( - new TrackingState() - .setCorrelationId(correlationData.getId()) - .setStatus("processed"))); - } -} diff --git a/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/entities/TrackingState.java b/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/entities/TrackingState.java index 65d64248d..01db62f5a 100644 --- a/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/entities/TrackingState.java +++ b/boot-rabbitmq-thymeleaf/src/main/java/com/poc/boot/rabbitmq/entities/TrackingState.java @@ -15,9 +15,12 @@ public class TrackingState { @Column(name = "id", nullable = false) private Long id; + @Column(unique = true, nullable = false) private String correlationId; + private boolean ack; private String status = "processed"; + private String cause; public TrackingState() {} @@ -39,6 +42,15 @@ public TrackingState setCorrelationId(String correlationId) { return this; } + public boolean isAck() { + return ack; + } + + public TrackingState setAck(boolean ack) { + this.ack = ack; + return this; + } + public String getStatus() { return status; } @@ -48,12 +60,23 @@ public TrackingState setStatus(String status) { return this; } + public String getCause() { + return cause; + } + + public TrackingState setCause(String cause) { + this.cause = cause; + return this; + } + @Override public String toString() { return new StringJoiner(", ", TrackingState.class.getSimpleName() + "[", "]") .add("id=" + id) .add("correlationId='" + correlationId + "'") + .add("ack=" + ack) .add("status='" + status + "'") + .add("cause='" + cause + "'") .toString(); } } diff --git a/boot-rabbitmq-thymeleaf/src/main/resources/application.properties b/boot-rabbitmq-thymeleaf/src/main/resources/application.properties index dc9030c03..e540605d1 100644 --- a/boot-rabbitmq-thymeleaf/src/main/resources/application.properties +++ b/boot-rabbitmq-thymeleaf/src/main/resources/application.properties @@ -4,17 +4,22 @@ logging.level.com.poc.boot.rabbitmq=debug # Additional RabbitMQ properties spring.rabbitmq.publisher-confirmType=CORRELATED -spring.rabbitmq.listener.simple.retry.enabled=true +spring.rabbitmq.publisher-returns=true + spring.rabbitmq.listener.simple.concurrency=4 spring.rabbitmq.listener.simple.max-concurrency=8 +spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.initial-interval=1s spring.rabbitmq.listener.simple.retry.max-attempts=3 -spring.rabbitmq.listener.simple.retry.multiplier=2 +spring.rabbitmq.listener.simple.retry.multiplier=1.5 spring.rabbitmq.listener.simple.retry.max-interval=2s spring.rabbitmq.listener.simple.acknowledge-mode=auto spring.rabbitmq.listener.simple.observation-enabled=true + spring.rabbitmq.template.retry.enabled=true -spring.rabbitmq.template.retry.multiplier=2 +spring.rabbitmq.template.retry.multiplier=1.5 +spring.rabbitmq.template.mandatory=true +spring.rabbitmq.template.observation-enabled=true spring.mvc.problemdetails.enabled=true spring.threads.virtual.enabled=true @@ -24,6 +29,8 @@ spring.testcontainers.beans.startup=parallel ################ Actuator ##################### management.endpoints.web.exposure.include=health,info,metrics,prometheus management.endpoint.health.show-details=always +management.metrics.tags.application=${spring.application.name} +management.metrics.enable.spring.rabbitmq=true ################ Database ##################### spring.jpa.show-sql=false diff --git a/boot-rabbitmq-thymeleaf/src/main/resources/db/changelog/01-create-tables.xml b/boot-rabbitmq-thymeleaf/src/main/resources/db/changelog/01-create-tables.xml index 3f48ac73d..14c22e088 100644 --- a/boot-rabbitmq-thymeleaf/src/main/resources/db/changelog/01-create-tables.xml +++ b/boot-rabbitmq-thymeleaf/src/main/resources/db/changelog/01-create-tables.xml @@ -27,18 +27,14 @@ - + + + - - - - - -