Skip to content

Commit

Permalink
Improve the AckIDList performance when there are many topics subscrib…
Browse files Browse the repository at this point in the history
…ed (#1305)

### Motivation

Currently when a consumer subscribes multiple topic-partitions and `AckWithResponse` is true, the `AckIDList` method will iterate over all internal consumers **sequentially**. It harms the performance especially there are many internal consumers. For example, if the connection of an internal consumer was stuck by some reason, message IDs from other consumer would be blocked for the operation timeout.

### Modifications

In `ackIDListFromMultiTopics`, call `consumer.AckIDList` in goroutines and use a channel to receive all errors from these calls.

Add `TestMultiTopicAckIDListTimeout`, which sets a dummy connection instance whose `SendRequest` never completes the callback, to verify the `AckIDList` call will not take much more time than the operation timeout to complete. Without this improvement, it will take more than 5 times of the operation timeout to fail.
  • Loading branch information
BewareMyPower authored Nov 19, 2024
1 parent a144d88 commit ffdc3af
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 15 deletions.
21 changes: 10 additions & 11 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,17 @@ func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer f
}
}

ackError := AckError{}
subErrCh := make(chan error, len(consumerToMsgIDs))
for consumer, ids := range consumerToMsgIDs {
if err := consumer.AckIDList(ids); err != nil {
if topicAckError := err.(AckError); topicAckError != nil {
for id, err := range topicAckError {
ackError[id] = err
}
} else {
// It should not reach here
for _, id := range ids {
ackError[id] = err
}
go func() {
subErrCh <- consumer.AckIDList(ids)
}()
}
ackError := AckError{}
for i := 0; i < len(consumerToMsgIDs); i++ {
if topicAckError, ok := (<-subErrCh).(AckError); ok {
for id, err := range topicAckError {
ackError[id] = err
}
}
}
Expand Down
102 changes: 101 additions & 1 deletion pulsar/consumer_multitopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
package pulsar

import (
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -317,3 +319,101 @@ func runMultiTopicAckIDList(t *testing.T, regex bool) {
assert.Fail(t, "AckIDList should return AckError")
}
}

type dummyConnection struct {
}

func (dummyConnection) SendRequest(_ uint64, _ *pb.BaseCommand, _ func(*pb.BaseCommand, error)) {
}

func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
return nil
}

func (dummyConnection) WriteData(_ internal.Buffer) {
}

func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {
return nil
}

func (dummyConnection) UnregisterListener(_ uint64) {
}

func (dummyConnection) AddConsumeHandler(_ uint64, _ internal.ConsumerHandler) error {
return nil
}

func (dummyConnection) DeleteConsumeHandler(_ uint64) {
}

func (dummyConnection) ID() string {
return "cnx"
}

func (dummyConnection) GetMaxMessageSize() int32 {
return 0
}

func (dummyConnection) Close() {
}

func (dummyConnection) WaitForClose() <-chan struct{} {
return nil
}

func (dummyConnection) IsProxied() bool {
return false
}

func TestMultiTopicAckIDListTimeout(t *testing.T) {
topic := fmt.Sprintf("multiTopicAckIDListTimeout%v", time.Now().UnixNano())
assert.NoError(t, createPartitionedTopic(topic, 5))

cli, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 3 * time.Second,
})
assert.Nil(t, err)
defer cli.Close()

consumer, err := cli.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub",
AckWithResponse: true,
})
assert.Nil(t, err)
defer consumer.Close()

const numMessages = 5
sendMessages(t, cli, topic, 0, numMessages, false)
msgs := receiveMessages(t, consumer, numMessages)
msgIDs := make([]MessageID, len(msgs))

var conn internal.Connection
for i := 0; i < len(msgs); i++ {
msgIDs[i] = msgs[i].ID()
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
assert.True(t, ok)
conn = pc._getConn()
pc._setConn(dummyConnection{})
}

start := time.Now()
err = consumer.AckIDList(msgIDs)
elapsed := time.Since(start)
t.Logf("AckIDList takes %v ms", elapsed)
assert.True(t, elapsed < 5*time.Second && elapsed >= 3*time.Second)
var ackError AckError
if errors.As(err, &ackError) {
for _, err := range ackError {
assert.Equal(t, "request timed out", err.Error())
}
}

for i := 0; i < len(msgs); i++ {
pc, ok := msgIDs[i].(*trackingMessageID).consumer.(*partitionConsumer)
assert.True(t, ok)
pc._setConn(conn)
}
}
6 changes: 3 additions & 3 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type partitionConsumer struct {
state uAtomic.Int32
options *partitionConsumerOpts

conn uAtomic.Value
conn atomic.Pointer[internal.Connection]

topic string
name string
Expand Down Expand Up @@ -2205,7 +2205,7 @@ func (pc *partitionConsumer) hasMoreMessages() bool {
// _setConn sets the internal connection field of this partition consumer atomically.
// Note: should only be called by this partition consumer when a new connection is available.
func (pc *partitionConsumer) _setConn(conn internal.Connection) {
pc.conn.Store(conn)
pc.conn.Store(&conn)
}

// _getConn returns internal connection field of this partition consumer atomically.
Expand All @@ -2214,7 +2214,7 @@ func (pc *partitionConsumer) _getConn() internal.Connection {
// Invariant: The conn must be non-nill for the lifetime of the partitionConsumer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return pc.conn.Load().(internal.Connection)
return *pc.conn.Load()
}

func convertToMessageIDData(msgID *trackingMessageID) *pb.MessageIdData {
Expand Down

0 comments on commit ffdc3af

Please sign in to comment.