Skip to content

Commit

Permalink
improve: use ctx and timer instead sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Nov 7, 2024
1 parent 35076ac commit 9904d77
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 142 deletions.
82 changes: 40 additions & 42 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"container/list"
"context"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -612,7 +613,6 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer")
return nil, errors.New("failed to getLastMessageID for the closing or closed consumer")
}
remainTime := pc.client.operationTimeout
bo := pc.backoffPolicyFunc()
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
Expand All @@ -622,23 +622,23 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
<-req.doneCh
return req.msgID, req.err
}
for {
msgID, err := request()
if err == nil {
return msgID, nil
}
if remainTime <= 0 {
pc.log.WithError(err).Error("Failed to getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}

opFn := func() (*trackingMessageID, error) {
return request()
}
ctx, cancel := context.WithTimeout(context.Background(), pc.client.operationTimeout)
defer cancel()
res, err := internal.Retry(ctx, opFn, func(err error) time.Duration {
nextDelay := bo.Next()
if nextDelay > remainTime {
nextDelay = remainTime
}
remainTime -= nextDelay
pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay)
time.Sleep(nextDelay)
return nextDelay
})
if err != nil {
pc.log.WithError(err).Error("Failed to getLastMessageID")
return nil, fmt.Errorf("failed to getLastMessageID due to %w", err)
}

return res, nil
}

func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
Expand Down Expand Up @@ -1805,8 +1805,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
pc.log.Debug("seek operation triggers reconnection, and reset isSeeking")
}
var (
maxRetry int
delayReconnectTime, totalDelayReconnectTime time.Duration
maxRetry int
)

if pc.options.maxReconnectToBroker == nil {
Expand All @@ -1816,50 +1815,39 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
}
bo := pc.backoffPolicyFunc()

for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
return
}

var assignedBrokerURL string
var assignedBrokerURL string
if connectionClosed != nil && connectionClosed.HasURL() {
assignedBrokerURL = connectionClosed.assignedBrokerURL
}

if connectionClosed != nil && connectionClosed.HasURL() {
delayReconnectTime = 0
assignedBrokerURL = connectionClosed.assignedBrokerURL
connectionClosed = nil // Attempt connecting to the assigned broker just once
} else {
delayReconnectTime = bo.Next()
opFn := func() (struct{}, error) {
if maxRetry == 0 {
return struct{}{}, nil
}
totalDelayReconnectTime += delayReconnectTime

pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
"delayReconnectTime": delayReconnectTime,
}).Info("Reconnecting to broker")
time.Sleep(delayReconnectTime)

// double check
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
return
return struct{}{}, nil
}

err := pc.grabConn(assignedBrokerURL)
if assignedBrokerURL != "" {
// Attempt connecting to the assigned broker just once
assignedBrokerURL = ""
}
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
bo.Reset()
return
return struct{}{}, nil
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
pc.log.Warn("Topic Not Found.")
break
return struct{}{}, nil
}

if maxRetry > 0 {
Expand All @@ -1869,7 +1857,17 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
if maxRetry == 0 || bo.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}

return struct{}{}, err
}
_, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration {
delayReconnectTime := bo.Next()
pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
"delayReconnectTime": delayReconnectTime,
}).Info("Reconnecting to broker")
return delayReconnectTime
})
}

func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestConsumerConnectError(t *testing.T) {
assert.Nil(t, consumer)
assert.NotNil(t, err)

assert.Equal(t, err.Error(), "connection error")
assert.ErrorContains(t, err, "connection error")
}

func TestBatchMessageReceive(t *testing.T) {
Expand Down
23 changes: 14 additions & 9 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"

"github.com/apache/pulsar-client-go/pulsar/backoff"

"github.com/apache/pulsar-client-go/pulsar/log"
Expand Down Expand Up @@ -165,7 +167,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {

// Retry to create producer indefinitely
bo := r.backOffPolicyFunc()
for {
opFn := func() (Producer, error) {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema
Expand All @@ -179,14 +181,17 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
if r.policy.ProducerOptions.CompressionType == NoCompression {
opt.CompressionType = LZ4
}
producer, err := r.client.CreateProducer(opt)
return r.client.CreateProducer(opt)
}

if err != nil {
r.log.WithError(err).Error("Failed to create DLQ producer")
time.Sleep(bo.Next())
continue
}
r.producer = producer
return producer
res, err := internal.Retry(context.Background(), opFn, func(err error) time.Duration {
r.log.WithError(err).Error("Failed to create DLQ producer")
return bo.Next()
})

if err == nil {
r.producer = res
}

return res
}
37 changes: 20 additions & 17 deletions pulsar/internal/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package internal

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand Down Expand Up @@ -146,25 +147,27 @@ func (c *httpClient) MakeRequest(method, endpoint string) (*http.Response, error
}

func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]string) error {
_, err := c.GetWithQueryParams(endpoint, obj, params, true)
if _, ok := err.(*url.Error); ok {
// We can retry this kind of requests over a connection error because they're
// not specific to a particular broker.
bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond)
startTime := time.Now()
var retryTime time.Duration

for time.Since(startTime) < c.requestTimeout {
retryTime = bo.Next()
c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
_, err = c.GetWithQueryParams(endpoint, obj, params, true)
if _, ok := err.(*url.Error); !ok {
// We either succeeded or encountered a non connection error
break
}
var err error
opFn := func() (struct{}, error) {
_, err = c.GetWithQueryParams(endpoint, obj, params, true)
if _, ok := err.(*url.Error); ok {
// We can retry this kind of requests over a connection error because they're
// not specific to a particular broker.
return struct{}{}, err
}
return struct{}{}, nil
}

bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond)

ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
defer cancel()

_, _ = Retry(ctx, opFn, func(_ error) time.Duration {
retryTime := bo.Next()
c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
return retryTime
})
return err
}

Expand Down
62 changes: 62 additions & 0 deletions pulsar/internal/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package internal

import (
"context"
"errors"
"time"
)

type OpFn[T any] func() (T, error)

// Retry the given operation until the returned error is nil or the context is done.
func Retry[T any](ctx context.Context, op OpFn[T], nextDuration func(error) time.Duration) (T, error) {
var (
timer *time.Timer
res T
err error
)

cleanTimer := func() {
if timer != nil {
timer.Stop()
}
}
defer cleanTimer()

for {
res, err = op()
if err == nil {
return res, nil
}

duration := nextDuration(err)
if timer == nil {
timer = time.NewTimer(duration)
} else {
timer.Reset(duration)
}

select {
case <-ctx.Done():
return res, errors.Join(ctx.Err(), err)
case <-timer.C:
}
}
}
58 changes: 58 additions & 0 deletions pulsar/internal/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package internal

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestRetryWithCtxBackground(t *testing.T) {
ctx := context.Background()
i := 0
res, err := Retry(ctx, func() (string, error) {
if i == 2 {
return "ok", nil
}
i++
return "", errors.New("error")
}, func(_ error) time.Duration {
return 1 * time.Second
})
require.NoError(t, err)
require.Equal(t, "ok", res)
}

func TestRetryWithCtxTimeout(t *testing.T) {
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
retryErr := errors.New("error")
res, err := Retry(ctx, func() (string, error) {
return "", retryErr
}, func(err error) time.Duration {
require.Equal(t, retryErr, err)
return 1 * time.Second
})
require.ErrorIs(t, err, context.DeadlineExceeded)
require.ErrorContains(t, err, retryErr.Error())
require.Equal(t, "", res)
}
Loading

0 comments on commit 9904d77

Please sign in to comment.