Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document message acknowledgement in ReactiveMessageConsumer #8

Open
lhotari opened this issue Sep 12, 2022 · 5 comments
Open

Document message acknowledgement in ReactiveMessageConsumer #8

lhotari opened this issue Sep 12, 2022 · 5 comments

Comments

@lhotari
Copy link
Member

lhotari commented Sep 12, 2022

The Reactive Java client takes a different approach for message acknowledgements.
The acknowledgement is modeled as a value (part of org.apache.pulsar.reactive.client.api.MessageResult) instead of a "side-effect".
This needs to be documented and explained.

@cbornet
Copy link
Contributor

cbornet commented Oct 27, 2022

Beware that batched messages may not be acked if all the messages from the batch are not acked before closing the consumer (the consumer is closed when consumeOne/consumeMany completes).

@lhotari
Copy link
Member Author

lhotari commented Nov 11, 2022

Beware that batched messages may not be acked if all the messages from the batch are not acked before closing the consumer (the consumer is closed when consumeOne/consumeMany completes).

This is most likely caused by the lack of configuring .batchIndexAckEnabled(true) on the ReactiveMessageConsumerBuilder and enabling acknowledgmentAtBatchIndexLevelEnabled=true in broker.conf (PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled=true env variable for docker container, #13 contains changes for Pulsar Testcontainer). If it doesn't fix the issue, we'd have to investigate further.

@lhotari
Copy link
Member Author

lhotari commented Nov 11, 2022

The purpose of MessageResult is to have a functional and data (value) oriented way to do acknowledgments without side-effects. In functional programming, everything is a value. By googling, I found a related presentation "Railway Oriented Programming: Functional error handling". It's in different context, but that presentation explains the type of design. Wikipedia's "Result type" contains a short explanation.

@cbornet
Copy link
Contributor

cbornet commented Nov 11, 2022

This is most likely caused by the lack of configuring .batchIndexAckEnabled(true) on the ReactiveMessageConsumerBuilder and enabling acknowledgmentAtBatchIndexLevelEnabled=true in broker.conf

I think that if the consumer gets closed the batch will be fully redelivered to another consumer. So it's not really enough for the consumeOne case.

@lhotari
Copy link
Member Author

lhotari commented Nov 11, 2022

This is most likely caused by the lack of configuring .batchIndexAckEnabled(true) on the ReactiveMessageConsumerBuilder and enabling acknowledgmentAtBatchIndexLevelEnabled=true in broker.conf

I think that if the consumer gets closed the batch will be fully redelivered to another consumer. So it's not really enough for the consumeOne case.

.batchIndexAckEnabled(true) and acknowledgmentAtBatchIndexLevelEnabled=true should handle that. I guess the gap is with the asynchronous acknowledgement mode which might not wait for all pending acknowledgements to "drain" when closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants