From 911cb56f4c4177fa44e9b15314a3d758d5b586a0 Mon Sep 17 00:00:00 2001 From: artem_danilov Date: Mon, 25 Nov 2024 22:06:10 -0800 Subject: [PATCH] add default circuit breaker Signed-off-by: artem_danilov --- client/circuit_breaker/circuit_breaker.go | 8 ++++++++ client/circuit_breaker/circuit_breaker_test.go | 8 +++----- client/http/client.go | 2 +- client/opt/option.go | 9 +++++---- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/client/circuit_breaker/circuit_breaker.go b/client/circuit_breaker/circuit_breaker.go index 38a88a7cb36..c24b894071b 100644 --- a/client/circuit_breaker/circuit_breaker.go +++ b/client/circuit_breaker/circuit_breaker.go @@ -53,6 +53,14 @@ type Settings struct { HalfOpenSuccessCount uint32 } +var AlwaysOpenSettings = Settings{ + ErrorRateThresholdPct: 0, // never trips + ErrorRateWindow: 10 * time.Second, // effectively results in testing for new settings every 10 seconds + MinQPSForOpen: 10, + CoolDownInterval: 10 * time.Second, + HalfOpenSuccessCount: 1, +} + // CircuitBreaker is a state machine to prevent sending requests that are likely to fail. type CircuitBreaker[T any] struct { config *Settings diff --git a/client/circuit_breaker/circuit_breaker_test.go b/client/circuit_breaker/circuit_breaker_test.go index 84b60c30f89..1c47b6c1327 100644 --- a/client/circuit_breaker/circuit_breaker_test.go +++ b/client/circuit_breaker/circuit_breaker_test.go @@ -156,12 +156,10 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) { func TestCircuitBreaker_ChangeSettings(t *testing.T) { re := require.New(t) - disabledSettings := settings - disabledSettings.ErrorRateThresholdPct = 0 - cb := NewCircuitBreaker[int]("test_cb", disabledSettings) - driveQPS(cb, minCountToOpen, Yes, re) - cb.advance(settings.ErrorRateWindow) + cb := NewCircuitBreaker[int]("test_cb", AlwaysOpenSettings) + driveQPS(cb, int(AlwaysOpenSettings.MinQPSForOpen*uint32(AlwaysOpenSettings.ErrorRateWindow.Seconds())), Yes, re) + cb.advance(AlwaysOpenSettings.ErrorRateWindow) assertSucceeds(cb, re) re.Equal(StateClosed, cb.state.stateType) diff --git a/client/http/client.go b/client/http/client.go index 5195005d2d9..31836c1f0c5 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -230,7 +230,7 @@ func (ci *clientInner) doRequest( req.Header.Set(xCallerIDKey, callerID) start := time.Now() var resp *http.Response - if _, exists := regionRequestNames[reqInfo.name]; exists { + if _, exists := regionRequestNames[reqInfo.name]; exists && ci.regionMetaCircuitBreaker != nil { resp, err = ci.regionMetaCircuitBreaker.Execute(func() (*http.Response, error, cb.Overloading) { resp, err := ci.cli.Do(req) return resp, err, isOverloaded(resp) diff --git a/client/opt/option.go b/client/opt/option.go index 8be77e4660f..73f7a3a440d 100644 --- a/client/opt/option.go +++ b/client/opt/option.go @@ -75,10 +75,11 @@ type Option struct { // NewOption creates a new PD client option with the default values set. func NewOption() *Option { co := &Option{ - Timeout: defaultPDTimeout, - MaxRetryTimes: maxInitClusterRetries, - EnableTSOFollowerProxyCh: make(chan struct{}, 1), - InitMetrics: true, + Timeout: defaultPDTimeout, + MaxRetryTimes: maxInitClusterRetries, + EnableTSOFollowerProxyCh: make(chan struct{}, 1), + InitMetrics: true, + RegionMetaCircuitBreakerSettings: cb.AlwaysOpenSettings, } co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)