Skip to content

Commit

Permalink
add default circuit breaker
Browse files Browse the repository at this point in the history
Signed-off-by: artem_danilov <[email protected]>
  • Loading branch information
artem_danilov committed Nov 26, 2024
1 parent 98312a9 commit 911cb56
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
8 changes: 8 additions & 0 deletions client/circuit_breaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ type Settings struct {
HalfOpenSuccessCount uint32
}

var AlwaysOpenSettings = Settings{
ErrorRateThresholdPct: 0, // never trips
ErrorRateWindow: 10 * time.Second, // effectively results in testing for new settings every 10 seconds
MinQPSForOpen: 10,
CoolDownInterval: 10 * time.Second,
HalfOpenSuccessCount: 1,
}

// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker[T any] struct {
config *Settings
Expand Down
8 changes: 3 additions & 5 deletions client/circuit_breaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,10 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) {

func TestCircuitBreaker_ChangeSettings(t *testing.T) {
re := require.New(t)
disabledSettings := settings
disabledSettings.ErrorRateThresholdPct = 0

cb := NewCircuitBreaker[int]("test_cb", disabledSettings)
driveQPS(cb, minCountToOpen, Yes, re)
cb.advance(settings.ErrorRateWindow)
cb := NewCircuitBreaker[int]("test_cb", AlwaysOpenSettings)
driveQPS(cb, int(AlwaysOpenSettings.MinQPSForOpen*uint32(AlwaysOpenSettings.ErrorRateWindow.Seconds())), Yes, re)
cb.advance(AlwaysOpenSettings.ErrorRateWindow)
assertSucceeds(cb, re)
re.Equal(StateClosed, cb.state.stateType)

Expand Down
2 changes: 1 addition & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (ci *clientInner) doRequest(
req.Header.Set(xCallerIDKey, callerID)
start := time.Now()
var resp *http.Response
if _, exists := regionRequestNames[reqInfo.name]; exists {
if _, exists := regionRequestNames[reqInfo.name]; exists && ci.regionMetaCircuitBreaker != nil {
resp, err = ci.regionMetaCircuitBreaker.Execute(func() (*http.Response, error, cb.Overloading) {
resp, err := ci.cli.Do(req)
return resp, err, isOverloaded(resp)
Expand Down
9 changes: 5 additions & 4 deletions client/opt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ type Option struct {
// NewOption creates a new PD client option with the default values set.
func NewOption() *Option {
co := &Option{
Timeout: defaultPDTimeout,
MaxRetryTimes: maxInitClusterRetries,
EnableTSOFollowerProxyCh: make(chan struct{}, 1),
InitMetrics: true,
Timeout: defaultPDTimeout,
MaxRetryTimes: maxInitClusterRetries,
EnableTSOFollowerProxyCh: make(chan struct{}, 1),
InitMetrics: true,
RegionMetaCircuitBreakerSettings: cb.AlwaysOpenSettings,
}

co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
Expand Down

0 comments on commit 911cb56

Please sign in to comment.