Skip to content

Commit

Permalink
feat(circuitbreak): support config circuitbreak policy with xds
Browse files Browse the repository at this point in the history
  • Loading branch information
whalecold committed Mar 7, 2024
1 parent 64bbf75 commit 295581c
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 4 deletions.
41 changes: 41 additions & 0 deletions core/manager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kitex-contrib/xds/core/xdsresource"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Expand Down Expand Up @@ -131,6 +132,10 @@ type xdsClient struct {
closeCh chan struct{}

mu sync.RWMutex

cblock sync.RWMutex
lastPolicies map[string]struct{}
cb *circuitbreak.CBSuite
}

// newXdsClient constructs a new xdsClient, which is used to get xds resources from the xds server.
Expand All @@ -147,6 +152,8 @@ func newXdsClient(bCfg *BootstrapConfig, ac ADSClient, updater *xdsResourceManag
closeCh: make(chan struct{}),
streamCh: make(chan ADSStream, 1),
reqCh: make(chan *discoveryv3.DiscoveryRequest, 1024),
lastPolicies: make(map[string]struct{}),
cb: circuitbreak.NewCBSuite(genServiceCBKeyWithRPCInfo),
}
cli.run()
return cli, nil
Expand Down Expand Up @@ -324,6 +331,7 @@ func (c *xdsClient) close() {
default:
close(c.closeCh)
}
c.cb.Close()
}

// connect construct a new stream that connects to the xds server
Expand Down Expand Up @@ -493,11 +501,44 @@ func (c *xdsClient) handleCDS(resp *discoveryv3.DiscoveryResponse) error {
}
}
c.mu.RUnlock()
// update circuit break policy
c.updateCircuitPolicy(res)
// update to cache
c.resourceUpdater.UpdateResource(xdsresource.ClusterType, res, resp.GetVersionInfo())
return nil
}

func (c *xdsClient) updateCircuitPolicy(res map[string]xdsresource.Resource) {
// update circuit break policy
c.cblock.Lock()
defer c.cblock.Unlock()
policies := make(map[string]struct{})
for key, resource := range res {
cluster, ok := resource.(*xdsresource.ClusterResource)
if !ok {
continue
}
if cluster.OutlierDetection == nil {
continue
}
cbconfig := circuitbreak.CBConfig{}
if cluster.OutlierDetection.FailurePercentageRequestVolume != 0 && cluster.OutlierDetection.FailurePercentageThreshold != 0 {
cbconfig.Enable = true
cbconfig.ErrRate = float64(cluster.OutlierDetection.FailurePercentageThreshold) / 100
cbconfig.MinSample = int64(cluster.OutlierDetection.FailurePercentageRequestVolume)
}
c.cb.UpdateServiceCBConfig(key, cbconfig)
policies[key] = struct{}{}
}
// disable the policy if the resource is not in the watched list now
for key := range c.lastPolicies {
if _, ok := policies[key]; !ok {
c.cb.UpdateServiceCBConfig(key, circuitbreak.CBConfig{Enable: false})
}
}
c.lastPolicies = policies
}

// handleEDS handles the eds response
func (c *xdsClient) handleEDS(resp *discoveryv3.DiscoveryResponse) error {
res, err := xdsresource.UnmarshalEDS(resp.GetResources())
Expand Down
18 changes: 18 additions & 0 deletions core/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"sync/atomic"
"time"

"github.com/cloudwego/kitex/pkg/circuitbreak"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"

"github.com/kitex-contrib/xds/core/xdsresource"
)
Expand Down Expand Up @@ -60,6 +62,16 @@ func (n *notifier) notify(err error) {
close(n.ch)
}

// keep consistent when initialising the circuit breaker suit and updating
// the circuit breaker policy.
func genServiceCBKeyWithRPCInfo(ri rpcinfo.RPCInfo) string {
if ri == nil {
return ""
}
key, _ := ri.To().Tag("XDS_Route_Picked_Cluster")
return key
}

// NewXDSResourceManager creates a new xds resource manager
func NewXDSResourceManager(bootstrapConfig *BootstrapConfig, opts ...Option) (*xdsResourceManager, error) {
// load bootstrap config
Expand Down Expand Up @@ -117,6 +129,10 @@ func (m *xdsResourceManager) getFromCache(rType xdsresource.ResourceType, rName
return nil, false
}

func (m *xdsResourceManager) GetCircuitBreaker() *circuitbreak.CBSuite {
return m.client.cb
}

// Get gets the specified resource from cache or from the control plane.
// If the resource is not in the cache, it will be fetched from the control plane via client.
// This will be a synchronous call. It uses the notifier to notify the resource update and return the resource.
Expand Down Expand Up @@ -277,4 +293,6 @@ func (m *xdsResourceManager) UpdateResource(rt xdsresource.ResourceType, up map[
}
// update meta
m.updateMeta(rt, version)

// update circuit policy
}
20 changes: 16 additions & 4 deletions core/xdsresource/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,17 @@ func (p ClusterLbPolicy) String() string {
return ""
}

type OutlierDetection struct {
FailurePercentageThreshold uint32
FailurePercentageRequestVolume uint32
}

type ClusterResource struct {
DiscoveryType ClusterDiscoveryType
LbPolicy ClusterLbPolicy
EndpointName string
InlineEndpoints *EndpointsResource
DiscoveryType ClusterDiscoveryType
LbPolicy ClusterLbPolicy
EndpointName string
InlineEndpoints *EndpointsResource
OutlierDetection *OutlierDetection
}

func (c *ClusterResource) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -99,6 +105,12 @@ func unmarshalCluster(r *any.Any) (string, *ClusterResource, error) {
LbPolicy: convertLbPolicy(c.GetLbPolicy()),
EndpointName: c.Name,
}
if c.OutlierDetection != nil {
ret.OutlierDetection = &OutlierDetection{
FailurePercentageRequestVolume: c.OutlierDetection.FailurePercentageRequestVolume.GetValue(),
FailurePercentageThreshold: c.OutlierDetection.FailurePercentageThreshold.GetValue(),
}
}
if n := c.GetEdsClusterConfig().GetServiceName(); n != "" {
ret.EndpointName = n
}
Expand Down
28 changes: 28 additions & 0 deletions xdssuite/circuit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package xdssuite

import "github.com/cloudwego/kitex/client"

// NewCircuitBreaker integrate xds config and kitex circuitbreaker
func NewCircuitBreaker() client.Option {
m := xdsResourceManager.getManager()
if m == nil {
panic("xds resource manager has not been initialized")
}
return client.WithCircuitBreaker(m.GetCircuitBreaker())
}
2 changes: 2 additions & 0 deletions xdssuite/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"sync"

"github.com/cloudwego/kitex/pkg/circuitbreak"

Check failure on line 23 in xdssuite/xds.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed with -local github.com/kitex-contrib/xds (goimports)
"github.com/kitex-contrib/xds/core/xdsresource"
)

Expand All @@ -41,6 +42,7 @@ func (m *singletonManager) getManager() XDSResourceManager {
// Get() returns error when the fetching fails or the resource is not found in the latest update.
type XDSResourceManager interface {
Get(ctx context.Context, resourceType xdsresource.ResourceType, resourceName string) (interface{}, error)
GetCircuitBreaker() *circuitbreak.CBSuite
}

func XDSInited() bool {
Expand Down

0 comments on commit 295581c

Please sign in to comment.