diff --git a/documentation/src/main/docs/kafka/request-reply.md b/documentation/src/main/docs/kafka/request-reply.md index 2ef09781d..ddcaad4ac 100644 --- a/documentation/src/main/docs/kafka/request-reply.md +++ b/documentation/src/main/docs/kafka/request-reply.md @@ -65,6 +65,21 @@ Like the core Emitter's `send` methods, `request` method also can receive a `Mes The ingested reply type of the `KafkaRequestReply` is discovered at runtime, in order to configure a `MessageConveter` to be applied on the incoming message before returning the `Uni` result. +## Requesting multiple replies + +You can use the `requestMulti` method to expect any number of replies represented by the `Multi` return type. + +For example this can be used to aggregate multiple replies to a single request. + +``` java +{{ insert('kafka/outbound/KafkaRequestReplyMultiEmitter.java') }} +``` +Like the other `request` you can also request `Message` types. + +!!! note + The channel attribute `reply.timeout` will be applied between each message, if reached the returned `Multi` will + fail. + ## Scaling Request/Reply If multiple requestor instances are configured on the same outgoing topic, and the same reply topic, diff --git a/documentation/src/main/java/kafka/outbound/KafkaRequestReplyMultiEmitter.java b/documentation/src/main/java/kafka/outbound/KafkaRequestReplyMultiEmitter.java new file mode 100644 index 000000000..8d8897ba1 --- /dev/null +++ b/documentation/src/main/java/kafka/outbound/KafkaRequestReplyMultiEmitter.java @@ -0,0 +1,21 @@ +package org.acme; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply; + +@ApplicationScoped +public class KafkaRequestReplyMultiEmitter { + + @Inject + @Channel("my-request") + KafkaRequestReply quoteRequest; + + public Multi requestQuote(String request) { + return quoteRequest.requestMulti(request).select().first(5); + } +} diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java index 0bc4ec68c..60d7fdd3b 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java @@ -9,6 +9,7 @@ import org.eclipse.microprofile.reactive.messaging.Message; import io.smallrye.common.annotation.Experimental; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.EmitterType; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; @@ -127,6 +128,22 @@ public interface KafkaRequestReply extends EmitterType { */ Uni> request(Message request); + /** + * Sends a request and receives responses. + * + * @param request the request object to be sent + * @return a Multi object representing the results of the send and receive operation + */ + Multi requestMulti(Req request); + + /** + * Sends a request and receives responses. + * + * @param request the request object to be sent + * @return a Multi object representing the results of the send and receive operation + */ + Multi> requestMulti(Message request); + /** * Blocks until the consumer has been assigned all partitions for consumption. * If a {@code reply.partition} is provided, waits only for the assignment of that particular partition. diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java index 238096eef..978f27d84 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java @@ -31,8 +31,8 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.subscription.MultiEmitter; import io.smallrye.mutiny.subscription.MultiSubscriber; -import io.smallrye.mutiny.subscription.UniEmitter; import io.smallrye.reactive.messaging.ClientCustomizer; import io.smallrye.reactive.messaging.EmitterConfiguration; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; @@ -181,6 +181,12 @@ public void complete() { pendingReplies.size(), pendingReplies.keySet()); } } + for (CorrelationId correlationId : pendingReplies.keySet()) { + PendingReplyImpl reply = pendingReplies.remove(correlationId); + if (reply != null) { + reply.complete(); + } + } replySource.closeQuietly(); } @@ -194,12 +200,22 @@ private void grace(Duration duration) { @Override public Uni request(Req request) { - return request(ContextAwareMessage.of(request)) - .map(Message::getPayload); + return requestMulti(request).toUni(); } @Override public Uni> request(Message request) { + return requestMulti(request).toUni(); + } + + @Override + public Multi requestMulti(Req request) { + return requestMulti(ContextAwareMessage.of(request)) + .map(Message::getPayload); + } + + @Override + public Multi> requestMulti(Message request) { var builder = request.getMetadata(OutgoingKafkaRecordMetadata.class) .map(metadata -> OutgoingKafkaRecordMetadata.from(metadata)) .orElseGet(OutgoingKafkaRecordMetadata::builder); @@ -213,16 +229,26 @@ public Uni> request(Message request) { OutgoingMessageMetadata outMetadata = new OutgoingMessageMetadata<>(); return sendMessage(request.addMetadata(builder.build()).addMetadata(outMetadata)) .invoke(() -> subscription.get().request(1)) - .chain(unused -> Uni.createFrom().> emitter(emitter -> pendingReplies.put(correlationId, - new PendingReplyImpl<>(outMetadata.getResult(), replyTopic, replyPartition, - (UniEmitter>) emitter))) - .ifNoItem().after(replyTimeout).fail()) - .onItemOrFailure().invoke(() -> pendingReplies.remove(correlationId)) - .plug(uni -> replyFailureHandler != null ? uni.onItem().transformToUni(f -> { - Throwable failure = replyFailureHandler.handleReply((KafkaRecord) f); - return failure != null ? Uni.createFrom().failure(failure) : Uni.createFrom().item(f); - }) : uni) - .plug(uni -> replyConverter != null ? uni.map(f -> replyConverter.apply(f)) : uni); + .onItem() + .transformToMulti(unused -> Multi.createFrom().> emitter(emitter -> { + pendingReplies.put(correlationId, + new PendingReplyImpl<>(outMetadata.getResult(), + replyTopic, + replyPartition, + (MultiEmitter>) emitter)); + })) + .ifNoItem().after(replyTimeout).failWith(() -> new KafkaRequestReplyTimeoutException(correlationId)) + .onItem().transformToUniAndConcatenate(m -> { + if (replyFailureHandler != null) { + Throwable failure = replyFailureHandler.handleReply((KafkaRecord) m); + if (failure != null) { + return Uni.createFrom().failure(failure); + } + } + return Uni.createFrom().item(m); + }) + .onTermination().invoke(() -> pendingReplies.remove(correlationId)) + .plug(multi -> replyConverter != null ? multi.map(f -> replyConverter.apply(f)) : multi); } @Override @@ -271,10 +297,9 @@ public void onItem(KafkaRecord record) { // If reply topic header is NOT null, it is considered a request not a reply if (header != null && record.getHeaders().lastHeader(replyTopicHeader) == null) { CorrelationId correlationId = correlationIdHandler.parse(header.value()); - PendingReplyImpl reply = pendingReplies.remove(correlationId); + PendingReplyImpl reply = pendingReplies.get(correlationId); if (reply != null) { - reply.getEmitter().complete(record); - return; + reply.getEmitter().emit(record); } else { log.requestReplyRecordIgnored(channel, record.getTopic(), correlationId.toString()); } @@ -298,10 +323,10 @@ public static class PendingReplyImpl implements PendingReply { private final RecordMetadata metadata; private final String replyTopic; private final int replyPartition; - private final UniEmitter> emitter; + private final MultiEmitter> emitter; public PendingReplyImpl(RecordMetadata metadata, String replyTopic, int replyPartition, - UniEmitter> emitter) { + MultiEmitter> emitter) { this.replyTopic = replyTopic; this.replyPartition = replyPartition; this.metadata = metadata; @@ -323,7 +348,17 @@ public RecordMetadata recordMetadata() { return metadata; } - public UniEmitter> getEmitter() { + @Override + public void complete() { + emitter.complete(); + } + + @Override + public boolean isCancelled() { + return emitter.isCancelled(); + } + + public MultiEmitter> getEmitter() { return emitter; } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTimeoutException.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTimeoutException.java new file mode 100644 index 000000000..338c1e58a --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTimeoutException.java @@ -0,0 +1,12 @@ +package io.smallrye.reactive.messaging.kafka.reply; + +/** + * Exception thrown when a reply is not received within the configured timeout. + */ +public class KafkaRequestReplyTimeoutException extends RuntimeException { + + public KafkaRequestReplyTimeoutException(CorrelationId correlationId) { + super("Timeout waiting for a reply for request with correlation ID: " + correlationId); + } + +} diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/PendingReply.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/PendingReply.java index ec5adbc31..f00a7ac0c 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/PendingReply.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/PendingReply.java @@ -21,4 +21,14 @@ public interface PendingReply { * @return the recordMetadata of the request */ RecordMetadata recordMetadata(); + + /** + * Complete the pending reply. + */ + void complete(); + + /** + * @return whether the pending reply was terminated (with a completion or failure). + */ + boolean isCancelled(); } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java index b3145ba8a..b4a501ef7 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java @@ -5,6 +5,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import org.junit.jupiter.api.Test; import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.TimeoutException; import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; import io.smallrye.reactive.messaging.annotations.Blocking; @@ -88,6 +90,7 @@ void testReply() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -111,6 +114,7 @@ void testReplyWithConverter() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value) .containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -137,6 +141,70 @@ void testReplyMessage() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value) .containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + void testReplyMessageMulti() { + addBeans(ReplyServerMultipleReplies.class); + topic = companion.topics().createAndWait(topic, 3); + String replyTopic = topic + "-replies"; + companion.topics().createAndWait(replyTopic, 3); + + List replies = new CopyOnWriteArrayList<>(); + + RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class); + List expected = new ArrayList<>(); + int sent = 5; + for (int i = 0; i < sent; i++) { + app.requestReply().requestMulti(i) + .subscribe() + .with(replies::add); + for (int j = 0; j < ReplyServerMultipleReplies.REPLIES; j++) { + expected.add(i + ": " + j); + } + } + await().untilAsserted(() -> assertThat(replies).hasSize(ReplyServerMultipleReplies.REPLIES * sent)); + assertThat(replies) + .containsAll(expected); + + assertThat(companion.consumeStrings().fromTopics(replyTopic, ReplyServerMultipleReplies.REPLIES * sent).awaitCompletion()) + .extracting(ConsumerRecord::value) + .containsAll(expected); + + Map pendingReplies = app.requestReply().getPendingReplies(); + assertThat(pendingReplies).allSatisfy((k, v) -> assertThat(v.isCancelled()).isFalse()); + for (PendingReply pending : pendingReplies.values()) { + pending.complete(); + } + assertThat(pendingReplies).allSatisfy((k, v) -> assertThat(v.isCancelled()).isTrue()); + assertThat(app.requestReply().getPendingReplies()) + .allSatisfy((k, v) -> assertThat(v.isCancelled()).isTrue()); + await().untilAsserted(() -> assertThat(app.requestReply().getPendingReplies()).isEmpty()); + } + + @Test + void testReplyMessageMultiLimit() { + addBeans(ReplyServerMultipleReplies.class); + topic = companion.topics().createAndWait(topic, 3); + String replyTopic = topic + "-replies"; + companion.topics().createAndWait(replyTopic, 3); + + List replies = new CopyOnWriteArrayList<>(); + + RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class); + app.requestReply().requestMulti(0) + .select().first(5) + .subscribe() + .with(replies::add); + await().untilAsserted(() -> assertThat(replies).hasSize(5)); + assertThat(replies) + .containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4"); + + assertThat(companion.consumeStrings().fromTopics(replyTopic, 5).awaitCompletion()) + .extracting(ConsumerRecord::value) + .containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -186,6 +254,7 @@ void testReplyWithSameTopic() { await().untilAsserted(() -> assertThat(replies).hasSize(10)); assertThat(replies).extracting(ConsumerRecord::value) .containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -213,6 +282,7 @@ void testReplyWithReplyPartition() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .allSatisfy(record -> assertThat(record.partition()).isEqualTo(2)) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -245,6 +315,7 @@ void testReplyWithConsumerConfig() { assertThat(companion.consumerGroups().list()).extracting(ConsumerGroupListing::groupId) .contains(replyTopicConsumer); await().untilAsserted(() -> assertThat(companion.consumerGroups().offsets(replyTopicConsumer)).isNotEmpty()); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -277,6 +348,7 @@ void testReplyWithCustomHeadersReplyServerMessage() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .allSatisfy(record -> assertThat(record.partition()).isEqualTo(2)) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -307,6 +379,7 @@ void testReplyWithCustomHeaders() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .allSatisfy(record -> assertThat(record.partition()).isEqualTo(2)) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -421,6 +494,7 @@ void testReplyMessageBytesCorrelationId() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -453,6 +527,7 @@ void testReplyFailureHandler() { .extracting(Throwable::getMessage) .allSatisfy(message -> assertThat(message).containsAnyOf("0", "3", "6", "9") .contains("Cannot reply to")); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -477,6 +552,7 @@ void testReplyOffsetResetEarliest() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -536,6 +612,7 @@ void testReplyAssignAndSeekOffset() { assertThat(companion.consumeStrings().fromOffsets(Map.of(tp(replyTopic, 2), 10L), 10).awaitCompletion()) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -551,7 +628,7 @@ void testReplyTimeout() { app.requestReply().request(1) .subscribe().withSubscriber(UniAssertSubscriber.create()) - .awaitFailure().assertFailedWith(TimeoutException.class); + .awaitFailure().assertFailedWith(KafkaRequestReplyTimeoutException.class); } @Test @@ -634,6 +711,26 @@ String process(Integer payload) { } } + @ApplicationScoped + public static class ReplyServerMultipleReplies { + + public static final int REPLIES = 10; + + @Incoming("req") + @Outgoing("rep") + Multi process(Integer payload) { + if (payload == null) { + return null; + } + return Multi.createFrom().emitter(multiEmitter -> { + for (int i = 0; i < REPLIES; i++) { + multiEmitter.emit(payload + ": " + i); + } + multiEmitter.complete(); + }); + } + } + @ApplicationScoped public static class ReplyServerSlow {