Skip to content

Commit

Permalink
feat(circuitbreak): support config circuitbreak policy with xds
Browse files Browse the repository at this point in the history
  • Loading branch information
whalecold committed Mar 7, 2024
1 parent 64bbf75 commit af11ff8
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 4 deletions.
47 changes: 47 additions & 0 deletions core/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"

"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/klog"

"github.com/kitex-contrib/xds/core/xdsresource"
Expand All @@ -47,6 +48,8 @@ type xdsResourceManager struct {

// options
opts *Options

cbHandler func(configs map[string]circuitbreak.CBConfig)
}

// notifier is used to notify the resource update along with error
Expand Down Expand Up @@ -117,6 +120,18 @@ func (m *xdsResourceManager) getFromCache(rType xdsresource.ResourceType, rName
return nil, false
}

func (m *xdsResourceManager) RegisterCircuitBreaker(handler func(configs map[string]circuitbreak.CBConfig)) {
m.mu.Lock()
defer m.mu.Unlock()

m.cbHandler = handler

res, ok := m.cache[xdsresource.ClusterType]
if ok {
m.updateCircuitPolicy(res)
}
}

// Get gets the specified resource from cache or from the control plane.
// If the resource is not in the cache, it will be fetched from the control plane via client.
// This will be a synchronous call. It uses the notifier to notify the resource update and return the resource.
Expand Down Expand Up @@ -248,11 +263,43 @@ func (m *xdsResourceManager) updateMeta(rType xdsresource.ResourceType, version
}
}

func (m *xdsResourceManager) updateCircuitPolicy(res map[string]xdsresource.Resource) {
// update circuit break policy
if m.cbHandler == nil {
return
}

policies := make(map[string]circuitbreak.CBConfig)
for key, resource := range res {
cluster, ok := resource.(*xdsresource.ClusterResource)
if !ok {
continue
}
if cluster.OutlierDetection == nil {
continue
}
cbconfig := circuitbreak.CBConfig{}
if cluster.OutlierDetection.FailurePercentageRequestVolume != 0 && cluster.OutlierDetection.FailurePercentageThreshold != 0 {
cbconfig.Enable = true
cbconfig.ErrRate = float64(cluster.OutlierDetection.FailurePercentageThreshold) / 100
cbconfig.MinSample = int64(cluster.OutlierDetection.FailurePercentageRequestVolume)
}
policies[key] = cbconfig
}
m.cbHandler(policies)
}

// UpdateResource is invoked by client to update the cache
func (m *xdsResourceManager) UpdateResource(rt xdsresource.ResourceType, up map[string]xdsresource.Resource, version string) {
m.mu.Lock()
defer m.mu.Unlock()

// should update circuit policy first, as it may affect the traffic when the
// circuit break policy is updated at the first time.
if rt == xdsresource.ClusterType {
m.updateCircuitPolicy(up)
}

for name, res := range up {
if _, ok := m.cache[rt]; !ok {
m.cache[rt] = make(map[string]xdsresource.Resource)
Expand Down
20 changes: 16 additions & 4 deletions core/xdsresource/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,17 @@ func (p ClusterLbPolicy) String() string {
return ""
}

type OutlierDetection struct {
FailurePercentageThreshold uint32
FailurePercentageRequestVolume uint32
}

type ClusterResource struct {
DiscoveryType ClusterDiscoveryType
LbPolicy ClusterLbPolicy
EndpointName string
InlineEndpoints *EndpointsResource
DiscoveryType ClusterDiscoveryType
LbPolicy ClusterLbPolicy
EndpointName string
InlineEndpoints *EndpointsResource
OutlierDetection *OutlierDetection
}

func (c *ClusterResource) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -99,6 +105,12 @@ func unmarshalCluster(r *any.Any) (string, *ClusterResource, error) {
LbPolicy: convertLbPolicy(c.GetLbPolicy()),
EndpointName: c.Name,
}
if c.OutlierDetection != nil {
ret.OutlierDetection = &OutlierDetection{
FailurePercentageRequestVolume: c.OutlierDetection.FailurePercentageRequestVolume.GetValue(),
FailurePercentageThreshold: c.OutlierDetection.FailurePercentageThreshold.GetValue(),
}
}
if n := c.GetEdsClusterConfig().GetServiceName(); n != "" {
ret.EndpointName = n
}
Expand Down
84 changes: 84 additions & 0 deletions xdssuite/circuit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package xdssuite

import (
"sync/atomic"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/rpcinfo"
)

type circuitBreaker struct {
lastPolicies atomic.Value
cb *circuitbreak.CBSuite
}

func (cb *circuitBreaker) updateAllCircuitConfigs(configs map[string]circuitbreak.CBConfig) {
if cb.cb == nil {
return
}
policies := make(map[string]struct{})
for k, v := range configs {
cb.cb.UpdateServiceCBConfig(k, v)
policies[k] = struct{}{}
}

defer cb.lastPolicies.Store(policies)

val := cb.lastPolicies.Load()
if val == nil {
return
}
lastPolicies, ok := val.(map[string]struct{})
if !ok {
return
}
// disable the old policies that are not in the new configs.
for k := range lastPolicies {
if _, ok := policies[k]; !ok {
cb.cb.UpdateServiceCBConfig(k, circuitbreak.CBConfig{
Enable: false,
})
}
}
}

// NewCircuitBreaker integrate xds config and kitex circuitbreaker
func NewCircuitBreaker() client.Option {
m := xdsResourceManager.getManager()
if m == nil {
panic("xds resource manager has not been initialized")
}

cb := &circuitBreaker{
cb: circuitbreak.NewCBSuite(genServiceCBKey),
}
m.RegisterCircuitBreaker(cb.updateAllCircuitConfigs)
return client.WithCircuitBreaker(cb.cb)
}

// keep consistent when initialising the circuit breaker suit and updating
// the circuit breaker policy.
func genServiceCBKey(ri rpcinfo.RPCInfo) string {
if ri == nil {
return ""
}
key, _ := ri.To().Tag(RouterClusterKey)
return key
}
3 changes: 3 additions & 0 deletions xdssuite/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"sync"

"github.com/cloudwego/kitex/pkg/circuitbreak"

"github.com/kitex-contrib/xds/core/xdsresource"
)

Expand All @@ -41,6 +43,7 @@ func (m *singletonManager) getManager() XDSResourceManager {
// Get() returns error when the fetching fails or the resource is not found in the latest update.
type XDSResourceManager interface {
Get(ctx context.Context, resourceType xdsresource.ResourceType, resourceName string) (interface{}, error)
RegisterCircuitBreaker(func(configs map[string]circuitbreak.CBConfig))
}

func XDSInited() bool {
Expand Down

0 comments on commit af11ff8

Please sign in to comment.