Skip to content

Commit

Permalink
Init doc for multi request
Browse files Browse the repository at this point in the history
  • Loading branch information
Malandril committed Jan 21, 2025
1 parent ec5d67a commit ec37d15
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
15 changes: 15 additions & 0 deletions documentation/src/main/docs/kafka/request-reply.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ Like the core Emitter's `send` methods, `request` method also can receive a `Mes
The ingested reply type of the `KafkaRequestReply` is discovered at runtime,
in order to configure a `MessageConveter` to be applied on the incoming message before returning the `Uni` result.

## Requesting multiple replies

You can use the `requestMulti` method to expect any number of replies represented by the `Multi` return type.

For example this can be used to aggregate multiple replies to a single request.

``` java
{{ insert('kafka/outbound/KafkaRequestReplyMultiEmitter.java') }}
```
Like the other `request` you can also request `Message` types.

!!! note
The channel attribute `reply.timeout` will be applied between each message, if reached the returned `Multi` will
fail.

## Scaling Request/Reply

If multiple requestor instances are configured on the same outgoing topic, and the same reply topic,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
public class KafkaRequestReplyMultiEmitter {

@Inject
@Channel("my-request")
KafkaRequestReply<String, Integer> quoteRequest;

public Multi<Integer> requestQuote(String request) {
return quoteRequest.requestMulti(request).select().first(5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ void testReplyMessageMulti() {
assertThat(replies)
.containsAll(expected);

assertThat(companion.consumeStrings().fromTopics(replyTopic, ReplyServerMultipleReplies.REPLIES * sent).awaitCompletion())
assertThat(
companion.consumeStrings().fromTopics(replyTopic, ReplyServerMultipleReplies.REPLIES * sent).awaitCompletion())
.extracting(ConsumerRecord::value)
.containsAll(expected);

Expand Down

0 comments on commit ec37d15

Please sign in to comment.