diff --git a/core/manager/manager.go b/core/manager/manager.go index ed6619d..76d4710 100644 --- a/core/manager/manager.go +++ b/core/manager/manager.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "time" + "github.com/cloudwego/kitex/pkg/circuitbreak" "github.com/cloudwego/kitex/pkg/klog" "github.com/kitex-contrib/xds/core/xdsresource" @@ -47,6 +48,8 @@ type xdsResourceManager struct { // options opts *Options + + cbHandler func(configs map[string]circuitbreak.CBConfig) } // notifier is used to notify the resource update along with error @@ -117,6 +120,18 @@ func (m *xdsResourceManager) getFromCache(rType xdsresource.ResourceType, rName return nil, false } +func (m *xdsResourceManager) RegisterCircuitBreaker(handler func(configs map[string]circuitbreak.CBConfig)) { + m.mu.Lock() + defer m.mu.Unlock() + + m.cbHandler = handler + + res, ok := m.cache[xdsresource.ClusterType] + if ok { + m.updateCircuitPolicy(res) + } +} + // Get gets the specified resource from cache or from the control plane. // If the resource is not in the cache, it will be fetched from the control plane via client. // This will be a synchronous call. It uses the notifier to notify the resource update and return the resource. @@ -248,11 +263,43 @@ func (m *xdsResourceManager) updateMeta(rType xdsresource.ResourceType, version } } +func (m *xdsResourceManager) updateCircuitPolicy(res map[string]xdsresource.Resource) { + // update circuit break policy + if m.cbHandler == nil { + return + } + + policies := make(map[string]circuitbreak.CBConfig) + for key, resource := range res { + cluster, ok := resource.(*xdsresource.ClusterResource) + if !ok { + continue + } + if cluster.OutlierDetection == nil { + continue + } + cbconfig := circuitbreak.CBConfig{} + if cluster.OutlierDetection.FailurePercentageRequestVolume != 0 && cluster.OutlierDetection.FailurePercentageThreshold != 0 { + cbconfig.Enable = true + cbconfig.ErrRate = float64(cluster.OutlierDetection.FailurePercentageThreshold) / 100 + cbconfig.MinSample = int64(cluster.OutlierDetection.FailurePercentageRequestVolume) + } + policies[key] = cbconfig + } + m.cbHandler(policies) +} + // UpdateResource is invoked by client to update the cache func (m *xdsResourceManager) UpdateResource(rt xdsresource.ResourceType, up map[string]xdsresource.Resource, version string) { m.mu.Lock() defer m.mu.Unlock() + // should update circuit policy first, as it may affect the traffic when the + // circuit break policy is updated at the first time. + if rt == xdsresource.ClusterType { + m.updateCircuitPolicy(up) + } + for name, res := range up { if _, ok := m.cache[rt]; !ok { m.cache[rt] = make(map[string]xdsresource.Resource) diff --git a/core/xdsresource/cds.go b/core/xdsresource/cds.go index e63663f..2721867 100644 --- a/core/xdsresource/cds.go +++ b/core/xdsresource/cds.go @@ -60,11 +60,17 @@ func (p ClusterLbPolicy) String() string { return "" } +type OutlierDetection struct { + FailurePercentageThreshold uint32 + FailurePercentageRequestVolume uint32 +} + type ClusterResource struct { - DiscoveryType ClusterDiscoveryType - LbPolicy ClusterLbPolicy - EndpointName string - InlineEndpoints *EndpointsResource + DiscoveryType ClusterDiscoveryType + LbPolicy ClusterLbPolicy + EndpointName string + InlineEndpoints *EndpointsResource + OutlierDetection *OutlierDetection } func (c *ClusterResource) MarshalJSON() ([]byte, error) { @@ -99,6 +105,12 @@ func unmarshalCluster(r *any.Any) (string, *ClusterResource, error) { LbPolicy: convertLbPolicy(c.GetLbPolicy()), EndpointName: c.Name, } + if c.OutlierDetection != nil { + ret.OutlierDetection = &OutlierDetection{ + FailurePercentageRequestVolume: c.OutlierDetection.FailurePercentageRequestVolume.GetValue(), + FailurePercentageThreshold: c.OutlierDetection.FailurePercentageThreshold.GetValue(), + } + } if n := c.GetEdsClusterConfig().GetServiceName(); n != "" { ret.EndpointName = n } diff --git a/xdssuite/circuit.go b/xdssuite/circuit.go new file mode 100644 index 0000000..6de73bd --- /dev/null +++ b/xdssuite/circuit.go @@ -0,0 +1,84 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xdssuite + +import ( + "sync/atomic" + + "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/circuitbreak" + "github.com/cloudwego/kitex/pkg/rpcinfo" +) + +type circuitBreaker struct { + lastPolicies atomic.Value + cb *circuitbreak.CBSuite +} + +func (cb *circuitBreaker) updateAllCircuitConfigs(configs map[string]circuitbreak.CBConfig) { + if cb.cb == nil { + return + } + policies := make(map[string]struct{}) + for k, v := range configs { + cb.cb.UpdateServiceCBConfig(k, v) + policies[k] = struct{}{} + } + + defer cb.lastPolicies.Store(policies) + + val := cb.lastPolicies.Load() + if val == nil { + return + } + lastPolicies, ok := val.(map[string]struct{}) + if !ok { + return + } + // disable the old policies that are not in the new configs. + for k := range lastPolicies { + if _, ok := policies[k]; !ok { + cb.cb.UpdateServiceCBConfig(k, circuitbreak.CBConfig{ + Enable: false, + }) + } + } +} + +// NewCircuitBreaker integrate xds config and kitex circuitbreaker +func NewCircuitBreaker() client.Option { + m := xdsResourceManager.getManager() + if m == nil { + panic("xds resource manager has not been initialized") + } + + cb := &circuitBreaker{ + cb: circuitbreak.NewCBSuite(genServiceCBKey), + } + m.RegisterCircuitBreaker(cb.updateAllCircuitConfigs) + return client.WithCircuitBreaker(cb.cb) +} + +// keep consistent when initialising the circuit breaker suit and updating +// the circuit breaker policy. +func genServiceCBKey(ri rpcinfo.RPCInfo) string { + if ri == nil { + return "" + } + key, _ := ri.To().Tag(RouterClusterKey) + return key +} diff --git a/xdssuite/xds.go b/xdssuite/xds.go index 8e7d9db..18255e3 100644 --- a/xdssuite/xds.go +++ b/xdssuite/xds.go @@ -20,6 +20,8 @@ import ( "context" "sync" + "github.com/cloudwego/kitex/pkg/circuitbreak" + "github.com/kitex-contrib/xds/core/xdsresource" ) @@ -41,6 +43,7 @@ func (m *singletonManager) getManager() XDSResourceManager { // Get() returns error when the fetching fails or the resource is not found in the latest update. type XDSResourceManager interface { Get(ctx context.Context, resourceType xdsresource.ResourceType, resourceName string) (interface{}, error) + RegisterCircuitBreaker(func(configs map[string]circuitbreak.CBConfig)) } func XDSInited() bool {