Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi request to KafkaRequestReply #2761

Merged
merged 3 commits into from
Jan 28, 2025

Conversation

Malandril
Copy link
Contributor

Purpose

This PR adds a new request type to the KafkaRequestReply so that we can receive multiple replies to a request.
This can be useful when you want to send a request, for which multiple consumer groups can answer, or a some service needs to answer with multiple messages.
That was not supported with the Uni<Rep> request.

Implementation

This is a first idea to add the feature, in which the KafkaRequestReplyImpl is modified to use a Multi as a base for all operations instead of the Uni.
If you think this should be a separate, class or if you have any other idea, do not hesitate.

This adds two new methods requestMulti that returns a Multi<Rep>.
I had no idea, for the method names, and we can discuss a better one.
The reply.timeout is applied between each reply, and the ReplyFailureHandler is called for each reply, so one failure on a reply will fail the whole operation.
I didn't write the doc yet.

If you have any input feel free.

@ozangunalp
Copy link
Collaborator

That's a very interesting idea. I'll look later today!

@ozangunalp
Copy link
Collaborator

@Malandril Thanks for the tests as well!
I've pushed small changes, and exposed a way to complete a pending request.

The upstream consumer request handling is slightly different, but I am unsure of the impact. It was designed to be able to pause the consumer when there aren't any pending requests.

@Malandril Malandril force-pushed the kafka-multi-replier branch from 74caf72 to ec37d15 Compare January 21, 2025 20:39
@Malandril
Copy link
Contributor Author

Hello @ozangunalp is there any reason this wasn't merged ?
If there is any issue i can try to fix them.

@ozangunalp
Copy link
Collaborator

Sorry, I need to get back to this. I don't remember if I pushed the refactoring I did.

I remember there was an issue with the strategy to when to complete the returned Multi. I am thinking of a handler to identify the completion message.

wdyt ?

@Malandril
Copy link
Contributor Author

I don't really understand, you mean somethig like a predicate, to the method, so that the user can configure a completion condition ?
That would be passed to the request ?

public Multi<Message<Rep>> requestMulti(Message<Req> request, Predicate<Multi<Message<Rep>>> completionConditon);

And when the predicate returns true, the multi is completed ?

@ozangunalp
Copy link
Collaborator

What I was thinking can easily be done using Mutiny API,
requestMulti(reqMessage).select().first(reply -> isNotCompletion(reply))

@ozangunalp
Copy link
Collaborator

I added a custom timeout exception and rebased/squashed some commits.

If all tests are green I think it is good to go.

Thank you @Malandril for your contribution and patience!

@Malandril
Copy link
Contributor Author

Thanks for the feedbacks and fixes !

@ozangunalp ozangunalp requested a review from cescoffier January 27, 2025 08:58
@cescoffier cescoffier merged commit 832e68a into smallrye:main Jan 28, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants