Skip to content

Commit

Permalink
optimize(retry): optimize UpdatePolicy and add test cases to check in…
Browse files Browse the repository at this point in the history
…valid retry policy
  • Loading branch information
YangruiEmma committed Nov 18, 2024
1 parent 8e9df9a commit e3f6048
Show file tree
Hide file tree
Showing 8 changed files with 603 additions and 176 deletions.
20 changes: 9 additions & 11 deletions pkg/retry/backup_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,29 +173,27 @@ func (r *backupRetryer) Prepare(ctx context.Context, prevRI, retryRI rpcinfo.RPC

// UpdatePolicy implements the Retryer interface.
func (r *backupRetryer) UpdatePolicy(rp Policy) (err error) {
r.Lock()
defer r.Unlock()
if !rp.Enable {
r.Lock()
r.enable = rp.Enable
r.Unlock()
return nil
}
if rp.BackupPolicy == nil || rp.Type != BackupType {
err = errors.New("BackupPolicy is nil or retry type not match, cannot do update in backupRetryer")
r.errMsg = err.Error()
return err
}
if err == nil && rp.BackupPolicy.RetryDelayMS == 0 {
if rp.BackupPolicy.RetryDelayMS == 0 {
err = errors.New("invalid retry delay duration in backupRetryer")
r.errMsg = err.Error()
return err
}
if err == nil {
err = checkStopPolicy(&rp.BackupPolicy.StopPolicy, maxBackupRetryTimes, r)
}

r.Lock()
defer r.Unlock()
r.enable = rp.Enable
if err != nil {
if err = checkStopPolicy(&rp.BackupPolicy.StopPolicy, maxBackupRetryTimes, r); err != nil {
r.errMsg = err.Error()
return err
}
r.enable = rp.Enable
r.policy = rp.BackupPolicy
r.retryDelay = time.Duration(rp.BackupPolicy.RetryDelayMS) * time.Millisecond
return nil
Expand Down
76 changes: 76 additions & 0 deletions pkg/retry/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/bytedance/sonic"
"github.com/cloudwego/thriftgo/pkg/test"

"github.com/cloudwego/kitex/pkg/rpcinfo"
Expand Down Expand Up @@ -99,3 +100,78 @@ func TestBackupRetryWithRPCInfo(t *testing.T) {
test.Assert(t, ri.To().Address().String() == "10.20.30.40:8888")
test.Assert(t, atomic.LoadInt32(&callTimes) == 2)
}

func TestNewRetryer4BackupRequest(t *testing.T) {
t.Run("normal backup request", func(t *testing.T) {
jsonRet := `{"enable":true,"type":1,
"backup_policy":{"retry_delay_ms":10,"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err == nil, err)
test.Assert(t, r.(*backupRetryer).enable)
})

t.Run("type is mixedRetry but policy is empty", func(t *testing.T) {
jsonRet := `{"enable":true,"type":2, "backup_policy":{}}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err != nil, err)
test.Assert(t, r == nil)
})

t.Run("type is backup but no policy", func(t *testing.T) {
jsonRet := `{"enable":true,"type":1}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err != nil, err)
test.Assert(t, r == nil)
})

t.Run("type is failure retry but policy is backupRequest", func(t *testing.T) {
jsonRet := `{"enable":true,"type":0,
"backup_policy":{"retry_delay_ms":10,"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err != nil, err)
test.Assert(t, r == nil)
})

t.Run("type is backup but has multi-policies", func(t *testing.T) {
jsonRet := `{"enable":true,"type":1,
"backup_policy":{"retry_delay_ms":10,"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}},
"failure_policy":{"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}
}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err == nil, err)
test.Assert(t, r.(*backupRetryer).enable)
})

t.Run("type is backup but policy json has extra configuration", func(t *testing.T) {
jsonRet := `{"enable":true,"type":1,
"backup_policy":{"retry_delay_ms":10,"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}
}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err == nil, err)
test.Assert(t, r.(*backupRetryer).enable)
})
}
17 changes: 7 additions & 10 deletions pkg/retry/failure_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,30 +142,27 @@ func (r *failureRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rp

// UpdatePolicy implements the Retryer interface.
func (r *failureRetryer) UpdatePolicy(rp Policy) (err error) {
r.Lock()
defer r.Unlock()
if !rp.Enable {
r.Lock()
r.enable = rp.Enable
r.Unlock()
return nil
}
if rp.FailurePolicy == nil || rp.Type != FailureType {
err = errors.New("FailurePolicy is nil or retry type not match, cannot do update in failureRetryer")
r.errMsg = err.Error()
return err
}
if err == nil {
err = checkStopPolicy(&rp.FailurePolicy.StopPolicy, maxFailureRetryTimes, r)
}
r.Lock()
defer r.Unlock()
r.enable = rp.Enable
if err != nil {
if err = checkStopPolicy(&rp.FailurePolicy.StopPolicy, maxFailureRetryTimes, r); err != nil {
r.errMsg = err.Error()
return err
}
r.enable = rp.Enable
r.policy = rp.FailurePolicy
r.setSpecifiedResultRetryIfNeeded(r.specifiedResultRetry, r.policy)
if bo, e := initBackOff(rp.FailurePolicy.BackOffPolicy); e != nil {
r.errMsg = fmt.Sprintf("failureRetryer update BackOffPolicy failed, err=%s", e.Error())
klog.Warnf(r.errMsg)
klog.Warnf("KITEX: %s", r.errMsg)
} else {
r.backOff = bo
}
Expand Down
108 changes: 108 additions & 0 deletions pkg/retry/failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/bytedance/sonic"

"github.com/cloudwego/kitex/internal/test"
"github.com/cloudwego/kitex/pkg/kerrors"
"github.com/cloudwego/kitex/pkg/remote"
Expand Down Expand Up @@ -272,6 +274,25 @@ func TestSpecifiedErrorRetry(t *testing.T) {
test.Assert(t, ok)
test.Assert(t, v == remoteTagValue)
})

// case5: all error retry and trigger circuit breaker
t.Run("case5", func(t *testing.T) {
rc := NewRetryContainerWithPercentageLimit()
p := BuildFailurePolicy(NewFailurePolicyWithResultRetry(AllErrorRetry()))
ri = genRPCInfo()

for i := 0; i < 10; i++ {
// failure rate is 50%
_, _, err := rc.WithRetryIfNeeded(ctx, &p, retryWithTransError(0, transErrCode), ri, nil)
if i < 5 {
// i < 5, total request < 10
test.Assert(t, err == nil, err, i)
} else {
// the minSimple is 10, total request > 10 then trigger cb
test.Assert(t, err != nil)
}
}
})
}

// test specified resp to retry
Expand Down Expand Up @@ -691,3 +712,90 @@ var retryWithTransError = func(callTimes, transErrCode int32) RPCCallFunc {
}
}
}

func TestNewRetryer4FailureRetry(t *testing.T) {
t.Run("normal failure retry", func(t *testing.T) {
jsonRet := `{"enable":true,"type":0,
"failure_policy":{"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err == nil, err)
test.Assert(t, r.(*failureRetryer).enable)
})

t.Run("type is failure retry but policy is empty", func(t *testing.T) {
jsonRet := `{"enable":true,"type":0, "failure_policy":{}}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err == nil, err)
test.Assert(t, r.(*failureRetryer).enable)
})

t.Run("type is failure retry but policy is mixed", func(t *testing.T) {
jsonRet := `{"enable":true,"type":0,
"mixed_policy":{"retry_delay_ms":10,"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err != nil, err)
test.Assert(t, r == nil)
})

t.Run("type is failure retry but no policy", func(t *testing.T) {
jsonRet := `{"enable":true,"type":0}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err != nil, err)
test.Assert(t, r == nil)
})

t.Run("type is backup but policy is failure retry", func(t *testing.T) {
jsonRet := `{"enable":true,"type":1,
"failure_policy":{"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err != nil, err)
test.Assert(t, r == nil)
})

t.Run("type is failure retry but has multi-policies", func(t *testing.T) {
jsonRet := `{"enable":true,"type":0,
"backup_policy":{"retry_delay_ms":10,"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}},
"failure_policy":{"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}
}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err == nil, err)
test.Assert(t, r.(*failureRetryer).enable)
})

t.Run("type is failure retry but policy json has retry_delay_ms", func(t *testing.T) {
jsonRet := `{"enable":true,"type":0,
"failure_policy":{"stop_policy":{"max_retry_times":2,"max_duration_ms":0,"disable_chain_stop":false,"ddl_stop":false,"cb_policy":{"error_rate":0.1,"min_sample":200}},"backoff_policy":{"backoff_type":"none"},"retry_same_node":false,"extra":"{\"not_retry_for_timeout\":false,\"rpc_retry_code\":{\"all_error_code\":false,\"error_codes\":[103,1204]},\"biz_retry_code\":{\"all_error_code\":false,\"error_codes\":[]}}","extra_struct":{"not_retry_for_timeout":false,"rpc_retry_code":{"all_error_code":false,"error_codes":[103,1204]},"biz_retry_code":{"all_error_code":false,"error_codes":[]}}}
}`
var p Policy
err := sonic.UnmarshalString(jsonRet, &p)
test.Assert(t, err == nil, err)

r, err := NewRetryer(p, nil, nil)
test.Assert(t, err == nil, err)
test.Assert(t, r.(*failureRetryer).enable)
})
}
19 changes: 9 additions & 10 deletions pkg/retry/mixed_retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,28 +201,27 @@ func (r *mixedRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpci

// UpdatePolicy implements the Retryer interface.
func (r *mixedRetryer) UpdatePolicy(rp Policy) (err error) {
r.Lock()
defer r.Unlock()
if !rp.Enable {
r.Lock()
r.enable = rp.Enable
r.Unlock()
return nil
}
if rp.MixedPolicy == nil || rp.Type != MixedType {
err = errors.New("MixedPolicy is nil or retry type not match, cannot do update in mixedRetryer")
r.errMsg = err.Error()
return err
}
if err == nil && rp.MixedPolicy.RetryDelayMS == 0 {
if rp.MixedPolicy.RetryDelayMS == 0 {
err = errors.New("invalid retry delay duration in mixedRetryer")
r.errMsg = err.Error()
return err
}
if err == nil {
err = checkStopPolicy(&rp.MixedPolicy.StopPolicy, maxMixRetryTimes, r)
}
r.Lock()
defer r.Unlock()
r.enable = rp.Enable
if err != nil {
if err = checkStopPolicy(&rp.MixedPolicy.StopPolicy, maxMixRetryTimes, r); err != nil {
r.errMsg = err.Error()
return err
}
r.enable = rp.Enable
r.policy = rp.MixedPolicy
r.retryDelay = time.Duration(rp.MixedPolicy.RetryDelayMS) * time.Millisecond
r.setSpecifiedResultRetryIfNeeded(r.specifiedResultRetry, &r.policy.FailurePolicy)
Expand Down
Loading

0 comments on commit e3f6048

Please sign in to comment.