-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 1223] Support ZeroQueueConsumer #1225
Conversation
@RobertIndie PTAL, thanks. |
pulsar/consumer_zero_queue.go
Outdated
opts := &partitionConsumerOpts{ | ||
topic: zc.topic, | ||
consumerName: zc.consumerName, | ||
subscription: zc.options.SubscriptionName, | ||
subscriptionType: zc.options.Type, | ||
subscriptionInitPos: zc.options.SubscriptionInitialPosition, | ||
partitionIdx: 0, | ||
receiverQueueSize: zc.options.ReceiverQueueSize, | ||
nackRedeliveryDelay: nackRedeliveryDelay, | ||
nackBackoffPolicy: zc.options.NackBackoffPolicy, | ||
metadata: zc.options.Properties, | ||
subProperties: zc.options.SubscriptionProperties, | ||
replicateSubscriptionState: zc.options.ReplicateSubscriptionState, | ||
startMessageID: zc.options.startMessageID, | ||
startMessageIDInclusive: zc.options.StartMessageIDInclusive, | ||
subscriptionMode: zc.options.SubscriptionMode, | ||
readCompacted: zc.options.ReadCompacted, | ||
interceptors: zc.options.Interceptors, | ||
maxReconnectToBroker: zc.options.MaxReconnectToBroker, | ||
backoffPolicy: zc.options.BackoffPolicy, | ||
keySharedPolicy: zc.options.KeySharedPolicy, | ||
schema: zc.options.Schema, | ||
decryption: zc.options.Decryption, | ||
ackWithResponse: zc.options.AckWithResponse, | ||
maxPendingChunkedMessage: zc.options.MaxPendingChunkedMessage, | ||
expireTimeOfIncompleteChunk: zc.options.ExpireTimeOfIncompleteChunk, | ||
autoAckIncompleteChunk: zc.options.AutoAckIncompleteChunk, | ||
consumerEventListener: zc.options.EventListener, | ||
enableBatchIndexAck: zc.options.EnableBatchIndexAcknowledgment, | ||
ackGroupingOptions: zc.options.AckGroupingOptions, | ||
autoReceiverQueueSize: zc.options.EnableAutoScaledReceiverQueueSize, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we abstract this logic so that we can reuse the code?
pulsar-client-go/pulsar/consumer_impl.go
Lines 390 to 420 in fc15995
opts := &partitionConsumerOpts{ | |
topic: pt, | |
consumerName: c.consumerName, | |
subscription: c.options.SubscriptionName, | |
subscriptionType: c.options.Type, | |
subscriptionInitPos: c.options.SubscriptionInitialPosition, | |
partitionIdx: idx, | |
receiverQueueSize: receiverQueueSize, | |
nackRedeliveryDelay: nackRedeliveryDelay, | |
nackBackoffPolicy: c.options.NackBackoffPolicy, | |
metadata: metadata, | |
subProperties: subProperties, | |
replicateSubscriptionState: c.options.ReplicateSubscriptionState, | |
startMessageID: c.options.startMessageID, | |
startMessageIDInclusive: c.options.StartMessageIDInclusive, | |
subscriptionMode: c.options.SubscriptionMode, | |
readCompacted: c.options.ReadCompacted, | |
interceptors: c.options.Interceptors, | |
maxReconnectToBroker: c.options.MaxReconnectToBroker, | |
backoffPolicy: c.options.BackoffPolicy, | |
keySharedPolicy: c.options.KeySharedPolicy, | |
schema: c.options.Schema, | |
decryption: c.options.Decryption, | |
ackWithResponse: c.options.AckWithResponse, | |
maxPendingChunkedMessage: c.options.MaxPendingChunkedMessage, | |
expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk, | |
autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk, | |
consumerEventListener: c.options.EventListener, | |
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment, | |
ackGroupingOptions: c.options.AckGroupingOptions, | |
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done.
Fixes #1223
Motivation
Support ZeroQueueConsumer, refer to Java ZeroQueueConsumerImpl
Modifications
EnableZeroQueueConsumer
zeroQueueConsumer
Verifying this change
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation