-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat : adds junits for dead letter queue (#526)
- Loading branch information
1 parent
d713892
commit 13f240e
Showing
10 changed files
with
94 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
30 changes: 30 additions & 0 deletions
30
...outbox-pattern/src/test/java/com/example/outboxpattern/common/listener/OrderListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,58 @@ | ||
package com.example.outboxpattern.common.listener; | ||
|
||
import com.example.outboxpattern.order.OrderRecord; | ||
import com.example.outboxpattern.order.internal.entities.Order; | ||
import java.util.concurrent.CountDownLatch; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.boot.test.context.TestConfiguration; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.kafka.annotation.KafkaListener; | ||
import org.springframework.kafka.core.KafkaOperations; | ||
import org.springframework.kafka.listener.CommonErrorHandler; | ||
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; | ||
import org.springframework.kafka.listener.DefaultErrorHandler; | ||
import org.springframework.util.backoff.FixedBackOff; | ||
|
||
@TestConfiguration | ||
public class OrderListener { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(OrderListener.class); | ||
|
||
private final CountDownLatch latch = new CountDownLatch(1); | ||
private final CountDownLatch dlqLatch = new CountDownLatch(1); | ||
|
||
/* | ||
* Boot will autowire this into the container factory. | ||
*/ | ||
@Bean | ||
CommonErrorHandler errorHandler(KafkaOperations<Object, Object> template) { | ||
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(1000L, 2)); | ||
} | ||
|
||
@KafkaListener(topics = "order-created", groupId = "notification") | ||
public void notify(OrderRecord event) { | ||
log.info( | ||
"Notifying user for created order {} and productCode {}", | ||
event.id(), | ||
event.orderItems().getFirst().productCode()); | ||
if (event.status().equals(Order.OrderStatus.FAILED.name())) { | ||
throw new RuntimeException("failed"); | ||
} | ||
latch.countDown(); | ||
} | ||
|
||
@KafkaListener(id = "dltGroup", topics = "order-created.DLT") | ||
public void dltListen(byte[] in) { | ||
log.info("Received from DLT: {}", new String(in)); | ||
dlqLatch.countDown(); | ||
} | ||
|
||
public CountDownLatch getLatch() { | ||
return latch; | ||
} | ||
|
||
public CountDownLatch getDlqLatch() { | ||
return dlqLatch; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters