Skip to content

Commit

Permalink
feat: support mixed retry
Browse files Browse the repository at this point in the history
  • Loading branch information
YangruiEmma committed Aug 26, 2024
1 parent e735af6 commit 19c8f57
Show file tree
Hide file tree
Showing 10 changed files with 870 additions and 734 deletions.
67 changes: 67 additions & 0 deletions pkg/retry/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
package retry

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/stats"
"github.com/cloudwego/thriftgo/pkg/test"
)

Expand All @@ -31,3 +36,65 @@ func TestBackupPolicy_String(t *testing.T) {
"DisableChainStop:false DDLStop:false CBPolicy:{ErrorRate:0.3}} RetrySameNode:true}"
test.Assert(t, r.String() == msg)
}

// test BackupPolicy call while rpcTime > delayTime
func TestBackupPolicyCall(t *testing.T) {
ctx := context.Background()
rc := NewRetryContainer()
err := rc.Init(map[string]Policy{Wildcard: {
Enable: true,
Type: 1,
BackupPolicy: &BackupPolicy{
RetryDelayMS: 30,
StopPolicy: StopPolicy{
MaxRetryTimes: 2,
DisableChainStop: false,
CBPolicy: CBPolicy{
ErrorRate: defaultCBErrRate,
},
},
},
}}, nil)
test.Assert(t, err == nil, err)

callTimes := int32(0)
firstRI := genRPCInfo()
secondRI := genRPCInfoWithRemoteTag(remoteTags)
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, firstRI)
ri, ok, err := rc.WithRetryIfNeeded(ctx, &Policy{}, func(ctx context.Context, r Retryer) (rpcinfo.RPCInfo, interface{}, error) {
atomic.AddInt32(&callTimes, 1)
if atomic.LoadInt32(&callTimes) == 1 {
// mock timeout for the first request and get the response of the backup request.
time.Sleep(time.Millisecond * 50)
return firstRI, nil, nil
}
return secondRI, nil, nil
}, firstRI, nil)
test.Assert(t, err == nil, err)
test.Assert(t, atomic.LoadInt32(&callTimes) == 2)
test.Assert(t, !ok)
v, ok := ri.To().Tag(remoteTagKey)
test.Assert(t, ok)
test.Assert(t, v == remoteTagValue)
}

func TestBackupRetryWithRPCInfo(t *testing.T) {
// backup retry
ctx := context.Background()
rc := NewRetryContainer()

ri := genRPCInfo()
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)
rpcinfo.Record(ctx, ri, stats.RPCStart, nil)

// call with retry policy
var callTimes int32
policy := BuildBackupRequest(NewBackupPolicy(10))
ri, ok, err := rc.WithRetryIfNeeded(ctx, &policy, retryCall(&callTimes, ri, true), ri, nil)
test.Assert(t, err == nil, err)
test.Assert(t, !ok)
test.Assert(t, ri.Stats().GetEvent(stats.RPCStart).Status() == stats.StatusInfo)
test.Assert(t, ri.Stats().GetEvent(stats.RPCFinish).Status() == stats.StatusInfo)
test.Assert(t, ri.To().Address().String() == "10.20.30.40:8888")
test.Assert(t, atomic.LoadInt32(&callTimes) == 2)
}
40 changes: 26 additions & 14 deletions pkg/retry/failure_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

func newFailureRetryer(policy Policy, r *ShouldResultRetry, cbC *cbContainer) (Retryer, error) {
fr := &failureRetryer{failureCommon: &failureCommon{specifiedResultRetry: r}, cbContainer: cbC}
fr := &failureRetryer{failureCommon: &failureCommon{specifiedResultRetry: r, cbContainer: cbC}}
if err := fr.UpdatePolicy(policy); err != nil {
return nil, fmt.Errorf("newfailureRetryer failed, err=%w", err)
}
Expand All @@ -43,27 +43,20 @@ func newFailureRetryer(policy Policy, r *ShouldResultRetry, cbC *cbContainer) (R
type failureRetryer struct {
enable bool
*failureCommon
policy *FailurePolicy
cbContainer *cbContainer
policy *FailurePolicy
sync.RWMutex
errMsg string
}

// ShouldRetry implements the Retryer interface.
// ShouldRetry to check if retry request can be called, it is checked in retryer.Do.
// If not satisfy will return the reason message
func (r *failureRetryer) ShouldRetry(ctx context.Context, err error, callTimes int, req interface{}, cbKey string) (string, bool) {
r.RLock()
defer r.RUnlock()
if !r.enable {
return "", false
}
if stop, msg := circuitBreakerStop(ctx, r.policy.StopPolicy, r.cbContainer, req, cbKey); stop {
return msg, false
}
if stop, msg := ddlStop(ctx, r.policy.StopPolicy); stop {
return msg, false
}
r.backOff.Wait(callTimes)
return "", true
return r.shouldRetry(ctx, callTimes, req, cbKey, r.policy)
}

// AllowRetry implements the Retryer interface.
Expand Down Expand Up @@ -108,8 +101,8 @@ func (r *failureRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rp
if i == 0 {
callStart = startTime
} else if i > 0 {
if maxDuration > 0 && time.Since(startTime) > maxDuration {
err = makeRetryErr(ctx, "exceed max duration", callTimes)
if ret, e := isExceedMaxDuration(ctx, startTime, maxDuration, callTimes); ret {
err = e
break
}
if msg, ok := r.ShouldRetry(ctx, err, i, req, cbKey); !ok {
Expand Down Expand Up @@ -216,6 +209,7 @@ func (r *failureRetryer) Dump() map[string]interface{} {
type failureCommon struct {
backOff BackOff
specifiedResultRetry *ShouldResultRetry
cbContainer *cbContainer
}

func (f *failureCommon) setSpecifiedResultRetryIfNeeded(rr *ShouldResultRetry, fp *FailurePolicy) {
Expand Down Expand Up @@ -255,6 +249,17 @@ func (r *failureCommon) isRetryErr(ctx context.Context, err error, ri rpcinfo.RP
return false
}

func (r *failureCommon) shouldRetry(ctx context.Context, callTimes int, req interface{}, cbKey string, fp *FailurePolicy) (string, bool) {
if stop, msg := circuitBreakerStop(ctx, fp.StopPolicy, r.cbContainer, req, cbKey); stop {
return msg, false
}
if stop, msg := ddlStop(ctx, fp.StopPolicy); stop {
return msg, false
}
r.backOff.Wait(callTimes)
return "", true
}

// isRetryResult to check if the result need to do retry
// Version Change Note:
// < v0.11.0 if the last result still failed, then wrap the error as RetryErr
Expand Down Expand Up @@ -316,3 +321,10 @@ func initBackOff(policy *BackOffPolicy) (bo BackOff, err error) {
}
return
}

func isExceedMaxDuration(ctx context.Context, start time.Time, maxDuration time.Duration, callTimes int32) (bool, error) {
if maxDuration > 0 && time.Since(start) > maxDuration {
return true, makeRetryErr(ctx, fmt.Sprintf("exceed max duration[%v]", maxDuration), callTimes)
}
return false, nil
}
Loading

0 comments on commit 19c8f57

Please sign in to comment.