Skip to content

Commit

Permalink
[fix] Fix Infinite Loop in Reader's HasNext Function (apache#1182)
Browse files Browse the repository at this point in the history
Fixes apache#1171

### Motivation

If `getLastMessageId` continually fails, the reader.HasNext can get stuck in an infinite loop. Without any backoff, the reader would keep trying forever.

### Modifications

- Implemented a backoff policy for `getLastMessageID`.
- If HasNext fails, it now returns false.

#### Should the reader.HasNext returned `false` in case of failure?

Currently, the `HasNext` method doesn't report errors. However, failure is still possible. For instance, if `getLastMessageID` repeatedly fails and hits the retry limit. An option is to keep trying forever, but this would stall all user code. This isn't user-friendly, so I rejected this solution.

#### Couldn't utilize the BackOffPolicy in the Reader Options

The `HasNext` retry mechanism requires to use of `IsMaxBackoffReached` for the backoff. But it isn't exposed in the `BackOffPolicy` interface. Introducing a new method to the `BackOffPolicy` would introduce breaking changes for the user backoff implementation. So, I choose not to implement it. Before we do it, we need to refine the BackOffPolicy.
  • Loading branch information
RobertIndie authored Feb 28, 2024
1 parent a881240 commit 88a8d85
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 27 deletions.
24 changes: 13 additions & 11 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ const (
)

type client struct {
cnxPool internal.ConnectionPool
rpcClient internal.RPCClient
handlers internal.ClientHandlers
lookupService internal.LookupService
metrics *internal.Metrics
tcClient *transactionCoordinatorClient
memLimit internal.MemoryLimitController
closeOnce sync.Once
cnxPool internal.ConnectionPool
rpcClient internal.RPCClient
handlers internal.ClientHandlers
lookupService internal.LookupService
metrics *internal.Metrics
tcClient *transactionCoordinatorClient
memLimit internal.MemoryLimitController
closeOnce sync.Once
operationTimeout time.Duration

log log.Logger
}
Expand Down Expand Up @@ -161,9 +162,10 @@ func newClient(options ClientOptions) (Client, error) {
c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold),
operationTimeout: operationTimeout,
}
serviceNameResolver := internal.NewPulsarServiceNameResolver(url)

Expand Down
53 changes: 37 additions & 16 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,15 +570,41 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {

func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer")
return nil, errors.New("failed to redeliver closing or closed consumer")
pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer")
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
}
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
remainTime := pc.client.operationTimeout
var backoff internal.BackoffPolicy
if pc.options.backoffPolicy != nil {
backoff = pc.options.backoffPolicy
} else {
backoff = &internal.DefaultBackoff{}
}
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req

// wait for the request to complete
<-req.doneCh
return req.msgID, req.err
// wait for the request to complete
<-req.doneCh
return req.msgID, req.err
}
for {
msgID, err := request()
if err == nil {
return msgID, nil
}
if remainTime <= 0 {
pc.log.WithError(err).Error("Failed to getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}
nextDelay := backoff.Next()
if nextDelay > remainTime {
nextDelay = remainTime
}
remainTime -= nextDelay
pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay)
time.Sleep(nextDelay)
}
}

func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
Expand Down Expand Up @@ -1987,16 +2013,11 @@ func (pc *partitionConsumer) hasNext() bool {
return true
}

for {
lastMsgID, err := pc.getLastMessageID()
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id from broker")
continue
} else {
pc.lastMessageInBroker = lastMsgID
break
}
lastMsgID, err := pc.getLastMessageID()
if err != nil {
return false
}
pc.lastMessageInBroker = lastMsgID

return pc.hasMoreMessages()
}
Expand Down
1 change: 1 addition & 0 deletions pulsar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type Reader interface {
Next(context.Context) (Message, error)

// HasNext checks if there is any message available to read from the current position
// If there is any errors, it will return false
HasNext() bool

// Close the reader and stop the broker to push more messages
Expand Down
64 changes: 64 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -1023,3 +1024,66 @@ func createPartitionedTopic(topic string, n int) error {
}
return nil
}

func TestReaderHasNextFailed(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
topic := newTopicName()
r, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)
r.(*reader).c.consumers[0].state.Store(consumerClosing)
assert.False(t, r.HasNext())
}

func TestReaderHasNextRetryFailed(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
OperationTimeout: 2 * time.Second,
})
assert.Nil(t, err)
topic := newTopicName()
r, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)

c := make(chan interface{})
defer close(c)

// Close the consumer events loop and assign a mock eventsCh
pc := r.(*reader).c.consumers[0]
pc.Close()
pc.state.Store(consumerReady)
pc.eventsCh = c

go func() {
for e := range c {
req, ok := e.(*getLastMsgIDRequest)
assert.True(t, ok, "unexpected event type")
req.err = errors.New("expected error")
close(req.doneCh)
}
}()
minTimer := time.NewTimer(1 * time.Second) // Timer to check if r.HasNext() blocked for at least 1s
maxTimer := time.NewTimer(3 * time.Second) // Timer to ensure r.HasNext() doesn't block for more than 3s
done := make(chan bool)
go func() {
assert.False(t, r.HasNext())
done <- true
}()

select {
case <-maxTimer.C:
t.Fatal("r.HasNext() blocked for more than 3s")
case <-done:
assert.False(t, minTimer.Stop(), "r.HasNext() did not block for at least 1s")
assert.True(t, maxTimer.Stop())
}

}

0 comments on commit 88a8d85

Please sign in to comment.