diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 471d45a3a..6d9856370 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -19,6 +19,7 @@ package pulsar import ( "container/list" + "context" "encoding/hex" "fmt" "math" @@ -612,7 +613,6 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer") return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } - remainTime := pc.client.operationTimeout bo := pc.backoffPolicyFunc() request := func() (*trackingMessageID, error) { req := &getLastMsgIDRequest{doneCh: make(chan struct{})} @@ -622,23 +622,23 @@ func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { <-req.doneCh return req.msgID, req.err } - for { - msgID, err := request() - if err == nil { - return msgID, nil - } - if remainTime <= 0 { - pc.log.WithError(err).Error("Failed to getLastMessageID") - return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) - } + + opFn := func() (*trackingMessageID, error) { + return request() + } + ctx, cancel := context.WithTimeout(context.Background(), pc.client.operationTimeout) + defer cancel() + res, err := internal.Retry(ctx, opFn, func(err error) time.Duration { nextDelay := bo.Next() - if nextDelay > remainTime { - nextDelay = remainTime - } - remainTime -= nextDelay pc.log.WithError(err).Errorf("Failed to get last message id from broker, retrying in %v...", nextDelay) - time.Sleep(nextDelay) + return nextDelay + }) + if err != nil { + pc.log.WithError(err).Error("Failed to getLastMessageID") + return nil, fmt.Errorf("failed to getLastMessageID due to %w", err) } + + return res, nil } func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) { @@ -1805,8 +1805,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose pc.log.Debug("seek operation triggers reconnection, and reset isSeeking") } var ( - maxRetry int - delayReconnectTime, totalDelayReconnectTime time.Duration + maxRetry int ) if pc.options.maxReconnectToBroker == nil { @@ -1816,50 +1815,39 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose } bo := pc.backoffPolicyFunc() - for maxRetry != 0 { - if pc.getConsumerState() != consumerReady { - // Consumer is already closing - pc.log.Info("consumer state not ready, exit reconnect") - return - } - - var assignedBrokerURL string + var assignedBrokerURL string + if connectionClosed != nil && connectionClosed.HasURL() { + assignedBrokerURL = connectionClosed.assignedBrokerURL + } - if connectionClosed != nil && connectionClosed.HasURL() { - delayReconnectTime = 0 - assignedBrokerURL = connectionClosed.assignedBrokerURL - connectionClosed = nil // Attempt connecting to the assigned broker just once - } else { - delayReconnectTime = bo.Next() + opFn := func() (struct{}, error) { + if maxRetry == 0 { + return struct{}{}, nil } - totalDelayReconnectTime += delayReconnectTime - pc.log.WithFields(log.Fields{ - "assignedBrokerURL": assignedBrokerURL, - "delayReconnectTime": delayReconnectTime, - }).Info("Reconnecting to broker") - time.Sleep(delayReconnectTime) - - // double check if pc.getConsumerState() != consumerReady { // Consumer is already closing pc.log.Info("consumer state not ready, exit reconnect") - return + return struct{}{}, nil } err := pc.grabConn(assignedBrokerURL) + if assignedBrokerURL != "" { + // Attempt connecting to the assigned broker just once + assignedBrokerURL = "" + } if err == nil { // Successfully reconnected pc.log.Info("Reconnected consumer to broker") bo.Reset() - return + return struct{}{}, nil } pc.log.WithError(err).Error("Failed to create consumer at reconnect") errMsg := err.Error() if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. pc.log.Warn("Topic Not Found.") - break + return struct{}{}, nil } if maxRetry > 0 { @@ -1869,7 +1857,17 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose if maxRetry == 0 || bo.IsMaxBackoffReached() { pc.metrics.ConsumersReconnectMaxRetry.Inc() } + + return struct{}{}, err } + _, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration { + delayReconnectTime := bo.Next() + pc.log.WithFields(log.Fields{ + "assignedBrokerURL": assignedBrokerURL, + "delayReconnectTime": delayReconnectTime, + }).Info("Reconnecting to broker") + return delayReconnectTime + }) } func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 2524f6816..83521dd4e 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -129,7 +129,7 @@ func TestConsumerConnectError(t *testing.T) { assert.Nil(t, consumer) assert.NotNil(t, err) - assert.Equal(t, err.Error(), "connection error") + assert.ErrorContains(t, err, "connection error") } func TestBatchMessageReceive(t *testing.T) { diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 00c7e03b3..7d908ff60 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -22,6 +22,8 @@ import ( "fmt" "time" + "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" "github.com/apache/pulsar-client-go/pulsar/log" @@ -165,7 +167,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { // Retry to create producer indefinitely bo := r.backOffPolicyFunc() - for { + opFn := func() (Producer, error) { opt := r.policy.ProducerOptions opt.Topic = r.policy.DeadLetterTopic opt.Schema = schema @@ -179,14 +181,17 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { if r.policy.ProducerOptions.CompressionType == NoCompression { opt.CompressionType = LZ4 } - producer, err := r.client.CreateProducer(opt) + return r.client.CreateProducer(opt) + } - if err != nil { - r.log.WithError(err).Error("Failed to create DLQ producer") - time.Sleep(bo.Next()) - continue - } - r.producer = producer - return producer + res, err := internal.Retry(context.Background(), opFn, func(err error) time.Duration { + r.log.WithError(err).Error("Failed to create DLQ producer") + return bo.Next() + }) + + if err == nil { + r.producer = res } + + return res } diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go index 632d5a4f3..b7dea1fe2 100644 --- a/pulsar/internal/http_client.go +++ b/pulsar/internal/http_client.go @@ -19,6 +19,7 @@ package internal import ( "bytes" + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -146,25 +147,27 @@ func (c *httpClient) MakeRequest(method, endpoint string) (*http.Response, error } func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]string) error { - _, err := c.GetWithQueryParams(endpoint, obj, params, true) - if _, ok := err.(*url.Error); ok { - // We can retry this kind of requests over a connection error because they're - // not specific to a particular broker. - bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond) - startTime := time.Now() - var retryTime time.Duration - - for time.Since(startTime) < c.requestTimeout { - retryTime = bo.Next() - c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout) - time.Sleep(retryTime) - _, err = c.GetWithQueryParams(endpoint, obj, params, true) - if _, ok := err.(*url.Error); !ok { - // We either succeeded or encountered a non connection error - break - } + var err error + opFn := func() (struct{}, error) { + _, err = c.GetWithQueryParams(endpoint, obj, params, true) + if _, ok := err.(*url.Error); ok { + // We can retry this kind of requests over a connection error because they're + // not specific to a particular broker. + return struct{}{}, err } + return struct{}{}, nil } + + bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout) + defer cancel() + + _, _ = Retry(ctx, opFn, func(_ error) time.Duration { + retryTime := bo.Next() + c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout) + return retryTime + }) return err } diff --git a/pulsar/internal/retry.go b/pulsar/internal/retry.go new file mode 100644 index 000000000..2f7bf658c --- /dev/null +++ b/pulsar/internal/retry.go @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "context" + "errors" + "time" +) + +type OpFn[T any] func() (T, error) + +// Retry the given operation until the returned error is nil or the context is done. +func Retry[T any](ctx context.Context, op OpFn[T], nextDuration func(error) time.Duration) (T, error) { + var ( + timer *time.Timer + res T + err error + ) + + cleanTimer := func() { + if timer != nil { + timer.Stop() + } + } + defer cleanTimer() + + for { + res, err = op() + if err == nil { + return res, nil + } + + duration := nextDuration(err) + if timer == nil { + timer = time.NewTimer(duration) + } else { + timer.Reset(duration) + } + + select { + case <-ctx.Done(): + return res, errors.Join(ctx.Err(), err) + case <-timer.C: + } + } +} diff --git a/pulsar/internal/retry_test.go b/pulsar/internal/retry_test.go new file mode 100644 index 000000000..ab9f4dc66 --- /dev/null +++ b/pulsar/internal/retry_test.go @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRetryWithCtxBackground(t *testing.T) { + ctx := context.Background() + i := 0 + res, err := Retry(ctx, func() (string, error) { + if i == 2 { + return "ok", nil + } + i++ + return "", errors.New("error") + }, func(_ error) time.Duration { + return 1 * time.Second + }) + require.NoError(t, err) + require.Equal(t, "ok", res) +} + +func TestRetryWithCtxTimeout(t *testing.T) { + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + retryErr := errors.New("error") + res, err := Retry(ctx, func() (string, error) { + return "", retryErr + }, func(err error) time.Duration { + require.Equal(t, retryErr, err) + return 1 * time.Second + }) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.ErrorContains(t, err, retryErr.Error()) + require.Equal(t, "", res) +} diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index 0593203a4..9a88a1673 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -18,6 +18,7 @@ package internal import ( + "context" "errors" "fmt" "net/url" @@ -117,27 +118,26 @@ func (c *rpcClient) requestToHost(serviceNameResolver *ServiceNameResolver, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { var err error var host *url.URL - var rpcResult *RPCResult - startTime := time.Now() bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * time.Millisecond) // we can retry these requests because this kind of request is // not specific to any particular broker - for time.Since(startTime) < c.requestTimeout { + opFn := func() (*RPCResult, error) { host, err = (*serviceNameResolver).ResolveHost() if err != nil { c.log.WithError(err).Errorf("rpc client failed to resolve host") return nil, err } - rpcResult, err = c.Request(host, host, requestID, cmdType, message) - // success we got a response - if err == nil { - break - } + return c.Request(host, host, requestID, cmdType, message) + } + + ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout) + defer cancel() + rpcResult, err := Retry(ctx, opFn, func(_ error) time.Duration { retryTime := bo.Next() c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout) - time.Sleep(retryTime) - } + return retryTime + }) return rpcResult, err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b1fc3f02c..c5ae259a9 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -471,8 +471,7 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed) { var ( - maxRetry int - delayReconnectTime time.Duration + maxRetry int ) if p.options.MaxReconnectToBroker == nil { maxRetry = -1 @@ -482,49 +481,39 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed bo := p.backOffPolicyFunc() - for maxRetry != 0 { - select { - case <-p.ctx.Done(): - return - default: - } + var assignedBrokerURL string + if connectionClosed != nil && connectionClosed.HasURL() { + assignedBrokerURL = connectionClosed.assignedBrokerURL + } - if p.getProducerState() != producerReady { - // Producer is already closing - p.log.Info("producer state not ready, exit reconnect") - return + opFn := func() (struct{}, error) { + if maxRetry == 0 { + return struct{}{}, nil } - var assignedBrokerURL string - - if connectionClosed != nil && connectionClosed.HasURL() { - delayReconnectTime = 0 - assignedBrokerURL = connectionClosed.assignedBrokerURL - connectionClosed = nil // Only attempt once - } else { - delayReconnectTime = bo.Next() + select { + case <-p.ctx.Done(): + return struct{}{}, nil + default: } - p.log.WithFields(log.Fields{ - "assignedBrokerURL": assignedBrokerURL, - "delayReconnectTime": delayReconnectTime, - }).Info("Reconnecting to broker") - time.Sleep(delayReconnectTime) - - // double check if p.getProducerState() != producerReady { // Producer is already closing p.log.Info("producer state not ready, exit reconnect") - return + return struct{}{}, nil } atomic.AddUint64(&p.epoch, 1) err := p.grabCnx(assignedBrokerURL) + if assignedBrokerURL != "" { + // Only attempt once + assignedBrokerURL = "" + } if err == nil { // Successfully reconnected p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker") bo.Reset() - return + return struct{}{}, nil } p.log.WithError(err).Error("Failed to create producer at reconnect") errMsg := err.Error() @@ -532,25 +521,25 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed // when topic is deleted, we should give up reconnection. p.log.Warn("Topic not found, stop reconnecting, close the producer") p.doClose(joinErrors(ErrTopicNotfound, err)) - break + return struct{}{}, nil } if strings.Contains(errMsg, errMsgTopicTerminated) { p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer") p.doClose(joinErrors(ErrTopicTerminated, err)) - break + return struct{}{}, nil } if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) { p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting") p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err)) - break + return struct{}{}, nil } if strings.Contains(errMsg, errMsgProducerFenced) { p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting") p.doClose(joinErrors(ErrProducerFenced, err)) - break + return struct{}{}, nil } if maxRetry > 0 { @@ -560,7 +549,17 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed if maxRetry == 0 || bo.IsMaxBackoffReached() { p.metrics.ProducersReconnectMaxRetry.Inc() } + + return struct{}{}, err } + _, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration { + delayReconnectTime := bo.Next() + p.log.WithFields(log.Fields{ + "assignedBrokerURL": assignedBrokerURL, + "delayReconnectTime": delayReconnectTime, + }).Info("Reconnecting to broker") + return delayReconnectTime + }) } func (p *partitionProducer) runEventsLoop() { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index b58c0608a..24939a82b 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -73,7 +73,7 @@ func TestProducerConnectError(t *testing.T) { assert.Nil(t, producer) assert.NotNil(t, err) - assert.Equal(t, err.Error(), "connection error") + assert.ErrorContains(t, err, "connection error") } func TestProducerNoTopic(t *testing.T) { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index d2fa55978..836535704 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -221,7 +221,7 @@ func TestReaderConnectError(t *testing.T) { assert.Nil(t, reader) assert.NotNil(t, err) - assert.Equal(t, err.Error(), "connection error") + assert.ErrorContains(t, err, "connection error") } func TestReaderOnSpecificMessage(t *testing.T) { diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go index a9a67adb4..198371a34 100644 --- a/pulsar/retry_router.go +++ b/pulsar/retry_router.go @@ -21,6 +21,8 @@ import ( "context" "time" + "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/apache/pulsar-client-go/pulsar/backoff" "github.com/apache/pulsar-client-go/pulsar/log" @@ -135,7 +137,7 @@ func (r *retryRouter) getProducer() Producer { // Retry to create producer indefinitely bo := r.backOffPolicyFunc() - for { + opFn := func() (Producer, error) { opt := r.policy.ProducerOptions opt.Topic = r.policy.RetryLetterTopic // the origin code sets to LZ4 compression with no options @@ -144,14 +146,15 @@ func (r *retryRouter) getProducer() Producer { opt.CompressionType = LZ4 } - producer, err := r.client.CreateProducer(opt) - - if err != nil { - r.log.WithError(err).Error("Failed to create RLQ producer") - time.Sleep(bo.Next()) - continue - } - r.producer = producer - return producer + return r.client.CreateProducer(opt) + } + res, err := internal.Retry(context.Background(), opFn, func(err error) time.Duration { + r.log.WithError(err).Error("Failed to create RLQ producer") + return bo.Next() + }) + if err == nil { + r.producer = res } + + return res } diff --git a/pulsar/transaction_coordinator_client.go b/pulsar/transaction_coordinator_client.go index afde54278..e66b06ef9 100644 --- a/pulsar/transaction_coordinator_client.go +++ b/pulsar/transaction_coordinator_client.go @@ -147,41 +147,37 @@ func (t *transactionHandler) reconnectToBroker() { var delayReconnectTime time.Duration var defaultBackoff = backoff.DefaultBackoff{} - for { + opFn := func() (struct{}, error) { if t.getState() == txnHandlerClosed { // The handler is already closing t.log.Info("transaction handler is closed, exit reconnect") - return - } - - delayReconnectTime = defaultBackoff.Next() - - t.log.WithFields(log.Fields{ - "delayReconnectTime": delayReconnectTime, - }).Info("Transaction handler will reconnect to the transaction coordinator") - time.Sleep(delayReconnectTime) - - // double check - if t.getState() == txnHandlerClosed { - // Txn handler is already closing - t.log.Info("transaction handler is closed, exit reconnect") - return + return struct{}{}, nil } err := t.grabConn() if err == nil { // Successfully reconnected t.log.Info("Reconnected transaction handler to broker") - return + return struct{}{}, nil } + t.log.WithError(err).Error("Failed to create transaction handler at reconnect") errMsg := err.Error() if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. t.log.Warn("Topic Not Found") - break + return struct{}{}, nil } + return struct{}{}, err } + + _, _ = internal.Retry(context.Background(), opFn, func(_ error) time.Duration { + delayReconnectTime = defaultBackoff.Next() + t.log.WithFields(log.Fields{ + "delayReconnectTime": delayReconnectTime, + }).Info("Transaction handler will reconnect to the transaction coordinator") + return delayReconnectTime + }) } func (t *transactionHandler) checkRetriableError(err error, op any) bool {