Skip to content

Commit

Permalink
Replace order of given/willAnswer for spied objects
Browse files Browse the repository at this point in the history
The cause of the test hang was that the test was incorrectly setting up
the spy on the type message builder impl. In previous Pulsar version of
TypedMessageBuilderImpl, the fact that the method sendAsync was being
called at mock setup time was not causing an issue. However, in the
latest impl it did not like that and was throwing things off.

Spied objects should always use the `doReturn|Answer|Throw()` family
as described in https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#important-gotcha-on-spying-real-objects--heading
  • Loading branch information
onobc committed Oct 16, 2024
1 parent fb53ad9 commit 108d7ff
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -195,11 +196,11 @@ void sendOnePulsarException() throws Exception {
given(producer.newMessage()).willAnswer((__) -> {
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
willAnswer((___) -> {
CompletableFuture<MessageId> failed = new CompletableFuture<>();
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
return failed;
});
}).given(typedMessageBuilder).sendAsync();
return typedMessageBuilder;
});

Expand Down Expand Up @@ -231,7 +232,7 @@ void sendManyStopOnError() throws Exception {
given(producer.newMessage()).willAnswer((__) -> {
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
willAnswer((___) -> {
if (entryId.get() == 1) {
CompletableFuture<MessageId> failed = new CompletableFuture<>();
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
Expand All @@ -241,7 +242,7 @@ void sendManyStopOnError() throws Exception {
.newMessageId(1, entryId.incrementAndGet(), 1);
messageIds.add(messageId);
return CompletableFuture.completedFuture(messageId);
});
}).given(typedMessageBuilder).sendAsync();
return typedMessageBuilder;
});

Expand Down Expand Up @@ -279,7 +280,7 @@ void sendMany() throws Exception {
given(producer.newMessage()).willAnswer((__) -> {
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
willAnswer((___) -> {
if (entryId.get() == 2) {
CompletableFuture<MessageId> failed = new CompletableFuture<>();
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
Expand All @@ -289,7 +290,7 @@ void sendMany() throws Exception {
.newMessageId(1, entryId.incrementAndGet(), 1);
messageIds.add(messageId);
return CompletableFuture.completedFuture(messageId);
});
}).given(typedMessageBuilder).sendAsync();
return typedMessageBuilder;
});

Expand Down Expand Up @@ -498,7 +499,7 @@ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>,
given(producer.newMessage()).willAnswer((__) -> {
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
willAnswer((___) -> {
CompletableFuture<MessageId> messageSender = new CompletableFuture<>();
finalExecutorService.execute(() -> {
long current = totalRequests.incrementAndGet();
Expand All @@ -512,7 +513,7 @@ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>,
DefaultImplementation.getDefaultImplementation().newMessageId(1, encodedEntryId, 1));
}, 100, TimeUnit.MILLISECONDS);
return messageSender;
});
}).given(typedMessageBuilder).sendAsync();
return typedMessageBuilder;
});

Expand Down

0 comments on commit 108d7ff

Please sign in to comment.