From 13f240eb126de1914a90e46dfc789e4625bf91bd Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Fri, 19 Jul 2024 12:01:29 +0530 Subject: [PATCH] feat : adds junits for dead letter queue (#526) --- .../kafka-sample-spring-modulith.yml | 8 +++- .gitpod.yml | 4 +- .../.github/workflows/maven.yml | 4 +- .../docker/docker-compose.yml | 2 +- .../order/internal/OrderMapper.java | 14 +++++- .../internal/domain/request/OrderRequest.java | 3 +- .../order/internal/entities/Order.java | 3 +- .../common/listener/OrderListener.java | 30 +++++++++++++ .../order/internal/OrderControllerIT.java | 43 ++++++++++++++++--- .../order/internal/OrderModuleIntTests.java | 2 +- 10 files changed, 94 insertions(+), 19 deletions(-) diff --git a/.github/workflows/kafka-sample-spring-modulith.yml b/.github/workflows/kafka-sample-spring-modulith.yml index 0da36131..7b355aef 100644 --- a/.github/workflows/kafka-sample-spring-modulith.yml +++ b/.github/workflows/kafka-sample-spring-modulith.yml @@ -40,6 +40,10 @@ jobs: native-image --version - name: Build with Maven run: ./mvnw -B verify --file pom.xml + - if: ${{ github.ref == 'refs/heads/main' }} - name: Build native Image - run: ./mvnw -Pnative spring-boot:build-image -DskipTests + name: Build and Publish Docker Image + run: | + docker login -u ${{ secrets.DOCKER_USERNAME }} -p ${{ secrets.DOCKER_PASSWORD }} + ./mvnw -Pnative spring-boot:build-image -Dspring-boot.build-image.imageName=${{ secrets.DOCKER_USERNAME }}/spring-modulith-outbox-pattern + docker push ${{ secrets.DOCKER_USERNAME }}/spring-modulith-outbox-pattern diff --git a/.gitpod.yml b/.gitpod.yml index fddd9356..51fa3eab 100644 --- a/.gitpod.yml +++ b/.gitpod.yml @@ -31,7 +31,7 @@ ports: onOpen: open-browser # Learn more from ready-to-use templates: https://www.gitpod.io/docs/introduction/getting-started/quickstart - vscode: extensions: - - Pivotal.vscode-boot-dev-pack \ No newline at end of file + - Pivotal.vscode-boot-dev-pack + - vscjava.vscode-java-pack diff --git a/spring-modulith-outbox-pattern/.github/workflows/maven.yml b/spring-modulith-outbox-pattern/.github/workflows/maven.yml index c76c42d1..87817fb2 100644 --- a/spring-modulith-outbox-pattern/.github/workflows/maven.yml +++ b/spring-modulith-outbox-pattern/.github/workflows/maven.yml @@ -12,11 +12,11 @@ jobs: strategy: matrix: distribution: [ 'temurin' ] - java: [ '17' ] + java: [ '21' ] steps: - uses: actions/checkout@v4 - - name: Setup Java 17 + - name: Setup Java 21 uses: actions/setup-java@v4 with: java-version: ${{ matrix.java }} diff --git a/spring-modulith-outbox-pattern/docker/docker-compose.yml b/spring-modulith-outbox-pattern/docker/docker-compose.yml index 0608322d..9323257a 100644 --- a/spring-modulith-outbox-pattern/docker/docker-compose.yml +++ b/spring-modulith-outbox-pattern/docker/docker-compose.yml @@ -42,7 +42,7 @@ services: - "9411:9411" broker: - image: confluentinc/cp-kafka:7.6.2 + image: apache/kafka:3.7.1 hostname: broker ports: - "9092:9092" diff --git a/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/OrderMapper.java b/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/OrderMapper.java index baa4c0a8..e3960fb9 100644 --- a/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/OrderMapper.java +++ b/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/OrderMapper.java @@ -11,13 +11,19 @@ import java.util.List; import java.util.Objects; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; @Service @Loggable class OrderMapper { Order toEntity(OrderRequest orderRequest) { - Order order = new Order().setOrderedDate(LocalDateTime.now()).setStatus(Order.OrderStatus.CREATED); + Order order = new Order().setOrderedDate(LocalDateTime.now()); + if (StringUtils.hasText(orderRequest.status())) { + order.setStatus(Order.OrderStatus.valueOf(orderRequest.status())); + } else { + order.setStatus(Order.OrderStatus.CREATED); + } convertToOrderItemEntityList(orderRequest.itemsList()).forEach(order::addOrderItem); return order; } @@ -34,7 +40,11 @@ private OrderItem convertToOrderItemEntity(OrderItemRequest orderItemRequest) { } void mapOrderWithRequest(Order order, OrderRequest orderRequest) { - order.setStatus(Order.OrderStatus.COMPLETED); + if (StringUtils.hasText(orderRequest.status())) { + order.setStatus(Order.OrderStatus.valueOf(orderRequest.status())); + } else { + order.setStatus(Order.OrderStatus.COMPLETED); + } // Convert request to OrderItems List detachedOrderItems = convertToOrderItemEntityList(orderRequest.itemsList()); diff --git a/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/domain/request/OrderRequest.java b/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/domain/request/OrderRequest.java index 11c210b1..6d46e336 100644 --- a/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/domain/request/OrderRequest.java +++ b/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/domain/request/OrderRequest.java @@ -3,4 +3,5 @@ import jakarta.validation.constraints.NotEmpty; import java.util.List; -public record OrderRequest(@NotEmpty(message = "ItemsList must not be empty") List itemsList) {} +public record OrderRequest( + String status, @NotEmpty(message = "ItemsList must not be empty") List itemsList) {} diff --git a/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/entities/Order.java b/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/entities/Order.java index 2b24a287..9a82c617 100644 --- a/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/entities/Order.java +++ b/spring-modulith-outbox-pattern/src/main/java/com/example/outboxpattern/order/internal/entities/Order.java @@ -82,7 +82,8 @@ public void removeOrderItem(OrderItem orderItem) { public enum OrderStatus { CREATED, - COMPLETED + COMPLETED, + FAILED } @Override diff --git a/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/common/listener/OrderListener.java b/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/common/listener/OrderListener.java index cc9288e8..d3c86b04 100644 --- a/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/common/listener/OrderListener.java +++ b/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/common/listener/OrderListener.java @@ -1,17 +1,34 @@ package com.example.outboxpattern.common.listener; import com.example.outboxpattern.order.OrderRecord; +import com.example.outboxpattern.order.internal.entities.Order; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; @TestConfiguration public class OrderListener { private static final Logger log = LoggerFactory.getLogger(OrderListener.class); + private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch dlqLatch = new CountDownLatch(1); + + /* + * Boot will autowire this into the container factory. + */ + @Bean + CommonErrorHandler errorHandler(KafkaOperations template) { + return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(1000L, 2)); + } @KafkaListener(topics = "order-created", groupId = "notification") public void notify(OrderRecord event) { @@ -19,10 +36,23 @@ public void notify(OrderRecord event) { "Notifying user for created order {} and productCode {}", event.id(), event.orderItems().getFirst().productCode()); + if (event.status().equals(Order.OrderStatus.FAILED.name())) { + throw new RuntimeException("failed"); + } latch.countDown(); } + @KafkaListener(id = "dltGroup", topics = "order-created.DLT") + public void dltListen(byte[] in) { + log.info("Received from DLT: {}", new String(in)); + dlqLatch.countDown(); + } + public CountDownLatch getLatch() { return latch; } + + public CountDownLatch getDlqLatch() { + return dlqLatch; + } } diff --git a/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/order/internal/OrderControllerIT.java b/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/order/internal/OrderControllerIT.java index f2adca3c..974fc01b 100644 --- a/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/order/internal/OrderControllerIT.java +++ b/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/order/internal/OrderControllerIT.java @@ -135,7 +135,7 @@ class Save { @Test void shouldCreateNewOrder() throws Exception { OrderRequest orderRequest = - new OrderRequest(List.of(new OrderItemRequest("New Order", BigDecimal.TEN, 100))); + new OrderRequest(null, List.of(new OrderItemRequest("New Order", BigDecimal.TEN, 100))); mockMvc.perform(post("/api/orders") .contentType(MediaType.APPLICATION_JSON) .content(objectMapper.writeValueAsString(orderRequest))) @@ -147,15 +147,42 @@ void shouldCreateNewOrder() throws Exception { "$.orderItems[0].productCode", is(orderRequest.itemsList().getFirst().productCode()))); + long count = orderListener.getDlqLatch().getCount(); await().pollInterval(Duration.ofSeconds(1)) .atMost(Duration.ofSeconds(15)) - .untilAsserted(() -> - assertThat(orderListener.getLatch().getCount()).isZero()); + .untilAsserted(() -> { + assertThat(orderListener.getLatch().getCount()).isZero(); + assertThat(orderListener.getDlqLatch().getCount()).isEqualTo(count); + }); + } + + @Test + void shouldCreateNewOrderWithFailedStatus() throws Exception { + long count = orderListener.getLatch().getCount(); + OrderRequest orderRequest = new OrderRequest( + Order.OrderStatus.FAILED.name(), List.of(new OrderItemRequest("New Order", BigDecimal.TEN, 100))); + mockMvc.perform(post("/api/orders") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(orderRequest))) + .andExpect(status().isCreated()) + .andExpect(header().exists(HttpHeaders.LOCATION)) + .andExpect(header().string(HttpHeaders.CONTENT_TYPE, is(MediaType.APPLICATION_JSON_VALUE))) + .andExpect(jsonPath("$.id", notNullValue())) + .andExpect(jsonPath( + "$.orderItems[0].productCode", + is(orderRequest.itemsList().getFirst().productCode()))); + + await().pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> { + assertThat(orderListener.getLatch().getCount()).isEqualTo(count); + assertThat(orderListener.getDlqLatch().getCount()).isZero(); + }); } @Test void shouldReturn400WhenCreateNewOrderWithoutItems() throws Exception { - OrderRequest orderRequest = new OrderRequest(null); + OrderRequest orderRequest = new OrderRequest(null, null); mockMvc.perform(post("/api/orders") .contentType(MediaType.APPLICATION_JSON) @@ -180,8 +207,10 @@ class Update { @Test void shouldUpdateOrder() throws Exception { Long orderId = orderList.getFirst().getId(); - OrderRequest orderRequest = new OrderRequest(List.of(new OrderItemRequest( - orderList.getFirst().getItems().getFirst().getProductCode(), BigDecimal.TEN, 100))); + OrderRequest orderRequest = new OrderRequest( + null, + List.of(new OrderItemRequest( + orderList.getFirst().getItems().getFirst().getProductCode(), BigDecimal.TEN, 100))); mockMvc.perform(put("/api/orders/{id}", orderId) .contentType(MediaType.APPLICATION_JSON) @@ -198,7 +227,7 @@ void shouldUpdateOrder() throws Exception { @Test void shouldReturn404WhenUpdatingNonExistingOrder() throws Exception { Long orderId = 10_000L; - OrderRequest order = new OrderRequest(List.of(new OrderItemRequest("Product1", BigDecimal.TEN, 10))); + OrderRequest order = new OrderRequest(null, List.of(new OrderItemRequest("Product1", BigDecimal.TEN, 10))); mockMvc.perform(put("/api/orders/{id}", orderId) .contentType(MediaType.APPLICATION_JSON) diff --git a/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/order/internal/OrderModuleIntTests.java b/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/order/internal/OrderModuleIntTests.java index 5b5f3eee..331ac832 100644 --- a/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/order/internal/OrderModuleIntTests.java +++ b/spring-modulith-outbox-pattern/src/test/java/com/example/outboxpattern/order/internal/OrderModuleIntTests.java @@ -47,7 +47,7 @@ void shouldTriggerOrderCreatedEvent(Scenario scenario) { }); scenario.stimulate(() -> orders.saveOrder( - new OrderRequest(List.of(new OrderItemRequest("Coffee", BigDecimal.TEN, 100))))) + new OrderRequest(null, List.of(new OrderItemRequest("Coffee", BigDecimal.TEN, 100))))) .andWaitForEventOfType(OrderRecord.class) .toArriveAndVerify(event -> assertThat(event.orderItems().getFirst().productCode()).isEqualTo("Coffee"));