From 8e9087339d0d004231d3f8fc5f823e561840182b Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 14 May 2024 17:20:31 +0800 Subject: [PATCH] [fix] Return an error when AckCumulative on a Shared/KeyShared subscription (#1217) ### Motivation The consumer should return error when AckCumulative on a Shared/KeyShared subscription --- pulsar/consumer_partition.go | 14 +++++++++++++- pulsar/consumer_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index dc01e69285..f752afbc61 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -20,7 +20,6 @@ package pulsar import ( "container/list" "encoding/hex" - "errors" "fmt" "math" "strings" @@ -36,6 +35,7 @@ import ( pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" "github.com/bits-and-blooms/bitset" + "github.com/pkg/errors" uAtomic "go.uber.org/atomic" ) @@ -50,6 +50,10 @@ const ( consumerClosed ) +var ( + ErrInvalidAck = errors.New("invalid ack") +) + func (s consumerState) String() string { switch s { case consumerInit: @@ -686,12 +690,20 @@ func (pc *partitionConsumer) AckIDWithResponseCumulative(msgID MessageID) error return pc.internalAckIDCumulative(msgID, true) } +func (pc *partitionConsumer) isAllowAckCumulative() bool { + return pc.options.subscriptionType != Shared && pc.options.subscriptionType != KeyShared +} + func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withResponse bool) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return errors.New("consumer state is closed") } + if !pc.isAllowAckCumulative() { + return errors.Wrap(ErrInvalidAck, "cumulative ack is not allowed for the Shared/KeyShared subscription type") + } + // chunk message id will be converted to tracking message id trackingID := toTrackingMessageID(msgID) if trackingID == nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 4120ba4bac..00f48cae51 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4392,3 +4392,38 @@ func TestMultiConsumerMemoryLimit(t *testing.T) { return assert.Equal(t, pc2PrevQueueSize/2, pc2.currentQueueSize.Load()) }) } + +func TestConsumerAckCumulativeOnSharedSubShouldFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Shared, + }) + assert.Nil(t, err) + defer consumer.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + assert.Nil(t, err) + + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + + err = consumer.AckIDCumulative(msg.ID()) + assert.NotNil(t, err) + assert.ErrorIs(t, err, ErrInvalidAck) +}