Skip to content

Commit

Permalink
feat: support Mixed Retry which integrating Failure Retry and Backup …
Browse files Browse the repository at this point in the history
…Request
  • Loading branch information
YangruiEmma committed Aug 27, 2024
1 parent d967b72 commit 99e0160
Show file tree
Hide file tree
Showing 18 changed files with 2,227 additions and 1,065 deletions.
23 changes: 15 additions & 8 deletions client/callopt/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,28 @@ func WithTag(key, val string) Option {
}

// WithRetryPolicy sets the retry policy for a RPC call.
// Build retry.Policy with retry.BuildFailurePolicy or retry.BuildBackupRequest instead of building retry.Policy directly.
// Build retry.Policy with retry.BuildFailurePolicy or retry.BuildBackupRequest or retry.BuildMixedPolicy
// instead of building retry.Policy directly.
//
// Demos are provided below:
//
// demo1. call with failure retry policy, default retry error is Timeout
// `resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildFailurePolicy(retry.NewFailurePolicy())))`
// demo2. call with backup request policy
// `bp := retry.NewBackupPolicy(10)
// bp.WithMaxRetryTimes(1)
// resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildBackupRequest(bp)))`
// demo1. call with failure retry policy, default retry error is Timeout
// `resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildFailurePolicy(retry.NewFailurePolicy())))`
// demo2. call with backup request policy
// `bp := retry.NewBackupPolicy(10)
// `bp.WithMaxRetryTimes(1)`
// `resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildBackupRequest(bp)))`
// demo2. call with miexed request policy
// `bp := retry.BuildMixedPolicy(10)
// `resp, err := cli.Mock(ctx, req, callopt.WithRetryPolicy(retry.BuildMixedPolicy(retry.NewMixedPolicy(10))))`
func WithRetryPolicy(p retry.Policy) Option {
return Option{f: func(o *CallOptions, di *strings.Builder) {
if !p.Enable {
return
}
if p.Type == retry.BackupType {
if p.Type == retry.MixedType {
di.WriteString("WithMixedRetry")
} else if p.Type == retry.BackupType {
di.WriteString("WithBackupRequest")
} else {
di.WriteString("WithFailureRetry")
Expand Down
12 changes: 12 additions & 0 deletions client/callopt/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ func TestApply(t *testing.T) {
test.Assert(t, co.RetryPolicy.Enable)
test.Assert(t, co.RetryPolicy.FailurePolicy != nil)

// WithRetryPolicy
option = WithRetryPolicy(retry.BuildMixedPolicy(retry.NewMixedPolicy(10)))
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
test.Assert(t, co.RetryPolicy.Enable)
test.Assert(t, co.RetryPolicy.MixedPolicy != nil)

// WithRetryPolicy
option = WithRetryPolicy(retry.BuildBackupRequest(retry.NewBackupPolicy(10)))
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
test.Assert(t, co.RetryPolicy.Enable)
test.Assert(t, co.RetryPolicy.BackupPolicy != nil)

// WithRetryPolicy pass empty struct
option = WithRetryPolicy(retry.Policy{})
_, co = Apply([]Option{option}, rpcConfig, remoteInfo, client.NewConfigLocks(), http.NewDefaultResolver())
Expand Down
29 changes: 23 additions & 6 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,13 @@ func WithFailureRetry(p *retry.FailurePolicy) Option {
if p == nil {
return
}
di.Push(fmt.Sprintf("WithFailureRetry(%+v)", *p))
di.Push(fmt.Sprintf("WithFailureRetry(%+v)", p))
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
if o.RetryMethodPolicies[retry.Wildcard].BackupPolicy != nil {
panic("BackupPolicy has been setup, cannot support Failure Retry at same time")
if o.RetryMethodPolicies[retry.Wildcard].MixedPolicy != nil ||
o.RetryMethodPolicies[retry.Wildcard].BackupPolicy != nil {
panic("MixedPolicy or BackupPolicy has been setup, cannot support Failure Retry at same time")
}
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildFailurePolicy(p)
}}
Expand All @@ -342,17 +343,33 @@ func WithBackupRequest(p *retry.BackupPolicy) Option {
if p == nil {
return
}
di.Push(fmt.Sprintf("WithBackupRequest(%+v)", *p))
di.Push(fmt.Sprintf("WithBackupRequest(%+v)", p))
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
if o.RetryMethodPolicies[retry.Wildcard].FailurePolicy != nil {
panic("BackupPolicy has been setup, cannot support Failure Retry at same time")
if o.RetryMethodPolicies[retry.Wildcard].MixedPolicy != nil ||
o.RetryMethodPolicies[retry.Wildcard].FailurePolicy != nil {
panic("MixedPolicy or BackupPolicy has been setup, cannot support Failure Retry at same time")
}
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildBackupRequest(p)
}}
}

// WithMixedRetry sets the mixed retry policy for client, it will take effect for all methods.
func WithMixedRetry(p *retry.MixedPolicy) Option {
return Option{F: func(o *client.Options, di *utils.Slice) {
if p == nil {
return
}
di.Push(fmt.Sprintf("WithMixedRetry(%+v)", p))
if o.RetryMethodPolicies == nil {
o.RetryMethodPolicies = make(map[string]retry.Policy)
}
// no need to check if BackupPolicy or FailurePolicy are been setup, just let mixed retry replace it
o.RetryMethodPolicies[retry.Wildcard] = retry.BuildMixedPolicy(p)
}}
}

// WithRetryMethodPolicies sets the retry policy for method.
// The priority is higher than WithFailureRetry and WithBackupRequest. Only the methods which are not included by
// this config will use the policy that is configured by WithFailureRetry or WithBackupRequest .
Expand Down
121 changes: 82 additions & 39 deletions client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client
import (
"context"
"crypto/tls"
"errors"
"fmt"
"reflect"
"testing"
Expand Down Expand Up @@ -53,46 +54,76 @@ import (
)

func TestRetryOptionDebugInfo(t *testing.T) {
fp := retry.NewFailurePolicy()
fp.WithDDLStop()
expectPolicyStr := "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr := fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithFixedBackOff(10)
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:fixed CfgItems:map[fix_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithRandomBackOff(10, 20)
fp.DisableChainRetryStop()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithRetrySameNode()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)

fp.WithSpecifiedResultRetry(&retry.ShouldResultRetry{ErrorRetry: func(err error, ri rpcinfo.RPCInfo) bool {
return false
}})
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:true, RespRetry:false}})"
policyStr = fmt.Sprintf("WithFailureRetry(%+v)", fp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)
t.Run("FailurePolicy", func(t *testing.T) {
fp := retry.NewFailurePolicy()
fp.WithDDLStop()
expectPolicyStr := "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt := WithFailureRetry(fp)
err := checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

fp.WithFixedBackOff(10)
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:false DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:fixed CfgItems:map[fix_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt = WithFailureRetry(fp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

fp.WithRandomBackOff(10, 20)
fp.DisableChainRetryStop()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:false ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt = WithFailureRetry(fp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

fp.WithRetrySameNode()
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt = WithFailureRetry(fp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

fp.WithSpecifiedResultRetry(&retry.ShouldResultRetry{ErrorRetryWithCtx: func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
return false
}})
expectPolicyStr = "WithFailureRetry({StopPolicy:{MaxRetryTimes:2 MaxDurationMS:0 DisableChainStop:true DDLStop:true " +
"CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:random CfgItems:map[max_ms:20 min_ms:10]} RetrySameNode:true ShouldResultRetry:{ErrorRetry:true, RespRetry:false}})"
opt = WithFailureRetry(fp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)
})

bp := retry.NewBackupPolicy(20)
expectPolicyStr = "WithBackupRequest({RetryDelayMS:20 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:false CBPolicy:{ErrorRate:0.1}} RetrySameNode:false})"
policyStr = fmt.Sprintf("WithBackupRequest(%+v)", bp)
test.Assert(t, policyStr == expectPolicyStr, policyStr)
WithBackupRequest(bp)
t.Run("FailurePolicy", func(t *testing.T) {
bp := retry.NewBackupPolicy(20)
expectPolicyStr := "WithBackupRequest({RetryDelayMS:20 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:false CBPolicy:{ErrorRate:0.1}} RetrySameNode:false})"
opt := WithBackupRequest(bp)
err := checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)
})

t.Run("MixedPolicy", func(t *testing.T) {
mp := retry.NewMixedPolicy(100)
mp.WithDDLStop()
expectPolicyStr := "WithMixedRetry({RetryDelayMS:100 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:true CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false " +
"ShouldResultRetry:{ErrorRetry:false, RespRetry:false}})"
opt := WithMixedRetry(mp)
err := checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)

mp.WithSpecifiedResultRetry(&retry.ShouldResultRetry{ErrorRetryWithCtx: func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool {
return false
}})
expectPolicyStr = "WithMixedRetry({RetryDelayMS:100 StopPolicy:{MaxRetryTimes:1 MaxDurationMS:0 DisableChainStop:false " +
"DDLStop:true CBPolicy:{ErrorRate:0.1}} BackOffPolicy:&{BackOffType:none CfgItems:map[]} RetrySameNode:false " +
"ShouldResultRetry:{ErrorRetry:true, RespRetry:false}})"
opt = WithMixedRetry(mp)
err = checkOneOptionDebugInfo(t, opt, expectPolicyStr)
test.Assert(t, err == nil, err)
})
}

func TestRetryOption(t *testing.T) {
Expand Down Expand Up @@ -708,3 +739,15 @@ func TestWithGRPCTLSConfig(t *testing.T) {
opts := client.NewOptions([]client.Option{WithGRPCTLSConfig(cfg)})
test.Assert(t, opts.GRPCConnectOpts != nil)
}

func checkOneOptionDebugInfo(t *testing.T, opt Option, expectStr string) error {
o := &Options{}
o.Apply([]Option{opt})
if len(o.DebugInfo) != 1 {
return errors.New("length of DebugInfo is unexpected")
}
if o.DebugInfo[0] != expectStr {
return fmt.Errorf("DebugInfo not match with expect str:\n debugInfo=%s", o.DebugInfo[0])
}
return nil
}
39 changes: 37 additions & 2 deletions pkg/retry/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"fmt"
)

const maxBackupRetryTimes = 2
const (
maxBackupRetryTimes = 2
defaultBackupRetryTimes = 1
)

// NewBackupPolicy init backup request policy
// the param delayMS is suggested to set as TP99
Expand All @@ -31,7 +34,7 @@ func NewBackupPolicy(delayMS uint32) *BackupPolicy {
p := &BackupPolicy{
RetryDelayMS: delayMS,
StopPolicy: StopPolicy{
MaxRetryTimes: 1,
MaxRetryTimes: defaultBackupRetryTimes,
DisableChainStop: false,
CBPolicy: CBPolicy{
ErrorRate: defaultCBErrRate,
Expand Down Expand Up @@ -71,3 +74,35 @@ func (p *BackupPolicy) WithRetrySameNode() {
func (p *BackupPolicy) String() string {
return fmt.Sprintf("{RetryDelayMS:%+v StopPolicy:%+v RetrySameNode:%+v}", p.RetryDelayMS, p.StopPolicy, p.RetrySameNode)
}

// Equals to check if BackupPolicy is equal
func (p *BackupPolicy) Equals(np *BackupPolicy) bool {
if p == nil {
return np == nil
}
if np == nil {
return false
}
if p.RetryDelayMS != np.RetryDelayMS {
return false
}
if p.StopPolicy != np.StopPolicy {
return false
}
if p.RetrySameNode != np.RetrySameNode {
return false
}

return true
}

func (p *BackupPolicy) DeepCopy() *BackupPolicy {
if p == nil {
return nil
}
return &BackupPolicy{
RetryDelayMS: p.RetryDelayMS,
StopPolicy: p.StopPolicy, // not a pointer, will copy the value here
RetrySameNode: p.RetrySameNode,
}
}
Loading

0 comments on commit 99e0160

Please sign in to comment.