From 34f92aade743a77a7ff6970d2a87431e0600d92b Mon Sep 17 00:00:00 2001 From: onobc Date: Tue, 15 Oct 2024 22:33:49 -0500 Subject: [PATCH] Replace order of `given/willAnswer` for spied objects The cause of the test hang was that the test was incorrectly setting up the spy on the type message builder impl. In previous Pulsar version of TypedMessageBuilderImpl, the fact that the method sendAsync was being called at mock setup time was not causing an issue. However, in the latest impl it did not like that and was throwing things off. Spied objects should always use the `doReturn|Answer|Throw()` family as described in https://javadoc.io/doc/org .mockito/mockito-core/latest/org/mockito/Mockito.html#important-gotcha-on-spying-real-objects--heading --- .../AdaptedReactiveMessageSenderTests.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java index 7ccda26..22a4e22 100644 --- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java +++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java @@ -77,6 +77,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -195,11 +196,11 @@ void sendOnePulsarException() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); return failed; - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -231,7 +232,7 @@ void sendManyStopOnError() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { if (entryId.get() == 1) { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); @@ -241,7 +242,7 @@ void sendManyStopOnError() throws Exception { .newMessageId(1, entryId.incrementAndGet(), 1); messageIds.add(messageId); return CompletableFuture.completedFuture(messageId); - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -279,7 +280,7 @@ void sendMany() throws Exception { given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { if (entryId.get() == 2) { CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full")); @@ -289,7 +290,7 @@ void sendMany() throws Exception { .newMessageId(1, entryId.incrementAndGet(), 1); messageIds.add(messageId); return CompletableFuture.completedFuture(messageId); - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; }); @@ -498,7 +499,7 @@ void doTestMaxInFlight(BiFunction, Flux, given(producer.newMessage()).willAnswer((__) -> { TypedMessageBuilderImpl typedMessageBuilder = spy( new TypedMessageBuilderImpl<>(producer, Schema.STRING)); - given(typedMessageBuilder.sendAsync()).willAnswer((___) -> { + willAnswer((___) -> { CompletableFuture messageSender = new CompletableFuture<>(); finalExecutorService.execute(() -> { long current = totalRequests.incrementAndGet(); @@ -512,7 +513,7 @@ void doTestMaxInFlight(BiFunction, Flux, DefaultImplementation.getDefaultImplementation().newMessageId(1, encodedEntryId, 1)); }, 100, TimeUnit.MILLISECONDS); return messageSender; - }); + }).given(typedMessageBuilder).sendAsync(); return typedMessageBuilder; });