Skip to content

Commit

Permalink
[fix] Return an error when AckCumulative on a Shared/KeyShared subscr…
Browse files Browse the repository at this point in the history
…iption (#1217)

### Motivation

The consumer should return error when AckCumulative on a Shared/KeyShared subscription
  • Loading branch information
RobertIndie authored May 14, 2024
1 parent 49fce72 commit 8e90873
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
14 changes: 13 additions & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package pulsar
import (
"container/list"
"encoding/hex"
"errors"
"fmt"
"math"
"strings"
Expand All @@ -36,6 +35,7 @@ import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/bits-and-blooms/bitset"
"github.com/pkg/errors"

uAtomic "go.uber.org/atomic"
)
Expand All @@ -50,6 +50,10 @@ const (
consumerClosed
)

var (
ErrInvalidAck = errors.New("invalid ack")
)

func (s consumerState) String() string {
switch s {
case consumerInit:
Expand Down Expand Up @@ -686,12 +690,20 @@ func (pc *partitionConsumer) AckIDWithResponseCumulative(msgID MessageID) error
return pc.internalAckIDCumulative(msgID, true)
}

func (pc *partitionConsumer) isAllowAckCumulative() bool {
return pc.options.subscriptionType != Shared && pc.options.subscriptionType != KeyShared
}

func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withResponse bool) error {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer")
return errors.New("consumer state is closed")
}

if !pc.isAllowAckCumulative() {
return errors.Wrap(ErrInvalidAck, "cumulative ack is not allowed for the Shared/KeyShared subscription type")
}

// chunk message id will be converted to tracking message id
trackingID := toTrackingMessageID(msgID)
if trackingID == nil {
Expand Down
35 changes: 35 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4392,3 +4392,38 @@ func TestMultiConsumerMemoryLimit(t *testing.T) {
return assert.Equal(t, pc2PrevQueueSize/2, pc2.currentQueueSize.Load())
})
}

func TestConsumerAckCumulativeOnSharedSubShouldFailed(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()

_, err = producer.Send(context.Background(), &ProducerMessage{
Payload: []byte("hello"),
})
assert.Nil(t, err)

msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)

err = consumer.AckIDCumulative(msg.ID())
assert.NotNil(t, err)
assert.ErrorIs(t, err, ErrInvalidAck)
}

0 comments on commit 8e90873

Please sign in to comment.