From 6f5c4c1381def84a170294b63f1d78c557dc1907 Mon Sep 17 00:00:00 2001 From: whalecold Date: Sun, 10 Mar 2024 10:15:30 +0800 Subject: [PATCH] feat(limit): support limit config --- core/manager/client.go | 42 ++++++++++----- core/manager/manager.go | 94 ++++++++++++++++++++++++++++++--- core/xdsresource/lds.go | 45 +++++++++------- core/xdsresource/lds_test.go | 29 ++++++++++ core/xdsresource/rds.go | 1 + core/xdsresource/xdsresource.go | 5 ++ go.mod | 2 +- xdssuite/limiter.go | 58 ++++++++++++++++++++ xdssuite/option.go | 1 + xdssuite/xds.go | 2 + 10 files changed, 240 insertions(+), 39 deletions(-) create mode 100644 xdssuite/limiter.go diff --git a/core/manager/client.go b/core/manager/client.go index d6d0bc1..00097ca 100644 --- a/core/manager/client.go +++ b/core/manager/client.go @@ -22,6 +22,7 @@ import ( "runtime/debug" "strings" "sync" + "sync/atomic" "github.com/cenkalti/backoff/v4" @@ -129,6 +130,9 @@ type xdsClient struct { // channel for stop closeCh chan struct{} + // indicates ready for listener + inboundInitRequestCh chan struct{} + closedInboundInitCh atomic.Bool mu sync.RWMutex } @@ -136,17 +140,18 @@ type xdsClient struct { // newXdsClient constructs a new xdsClient, which is used to get xds resources from the xds server. func newXdsClient(bCfg *BootstrapConfig, ac ADSClient, updater *xdsResourceManager) (*xdsClient, error) { cli := &xdsClient{ - config: bCfg, - adsClient: ac, - connectBackoff: backoff.NewExponentialBackOff(), - watchedResource: make(map[xdsresource.ResourceType]map[string]bool), - cipResolver: newNdsResolver(), - versionMap: make(map[xdsresource.ResourceType]string), - nonceMap: make(map[xdsresource.ResourceType]string), - resourceUpdater: updater, - closeCh: make(chan struct{}), - streamCh: make(chan ADSStream, 1), - reqCh: make(chan *discoveryv3.DiscoveryRequest, 1024), + config: bCfg, + adsClient: ac, + connectBackoff: backoff.NewExponentialBackOff(), + watchedResource: make(map[xdsresource.ResourceType]map[string]bool), + cipResolver: newNdsResolver(), + versionMap: make(map[xdsresource.ResourceType]string), + nonceMap: make(map[xdsresource.ResourceType]string), + resourceUpdater: updater, + closeCh: make(chan struct{}), + inboundInitRequestCh: make(chan struct{}), + streamCh: make(chan ADSStream, 1), + reqCh: make(chan *discoveryv3.DiscoveryRequest, 1024), } cli.run() return cli, nil @@ -292,6 +297,14 @@ func (c *xdsClient) ndsWarmup() { // watch the NameTable when init the xds client c.Watch(xdsresource.NameTableType, "", false) <-c.cipResolver.initRequestCh + klog.Infof("KITEX: [XDS] nds, warmup done") +} + +func (c *xdsClient) listenerWarmup() { + // watch the NameTable when init the xds client + c.Watch(xdsresource.ListenerType, reservedLdsResourceName, false) + <-c.inboundInitRequestCh + klog.Infof("KITEX: [XDS] lds, warmup done") } // warmup sends the requests (NDS) to the xds server and waits for the response to set the lookup table. @@ -299,6 +312,7 @@ func (c *xdsClient) warmup() { if c.ndsRequired() { c.ndsWarmup() } + c.listenerWarmup() // TODO: maybe need to watch the listener klog.Infof("KITEX: [XDS] client, warmup done") } @@ -436,7 +450,7 @@ func (c *xdsClient) handleLDS(resp *discoveryv3.DiscoveryResponse) error { c.mu.RLock() filteredRes := make(map[string]xdsresource.Resource) for n := range c.watchedResource[xdsresource.ListenerType] { - if c.ndsRequired() { + if c.ndsRequired() && n != reservedLdsResourceName { ln, err := c.getListenerName(n) if err != nil || ln == "" { klog.Warnf("KITEX: [XDS] get listener name %s failed, err: %v", n, err) @@ -452,6 +466,10 @@ func (c *xdsClient) handleLDS(resp *discoveryv3.DiscoveryResponse) error { c.mu.RUnlock() // update to cache c.resourceUpdater.UpdateResource(xdsresource.ListenerType, filteredRes, resp.GetVersionInfo()) + + if c.closedInboundInitCh.CompareAndSwap(false, true) { + close(c.inboundInitRequestCh) + } return nil } diff --git a/core/manager/manager.go b/core/manager/manager.go index ed6619d..26fa057 100644 --- a/core/manager/manager.go +++ b/core/manager/manager.go @@ -20,16 +20,27 @@ import ( "context" "encoding/json" "fmt" + "math" "os" "sync" "sync/atomic" "time" "github.com/cloudwego/kitex/pkg/klog" + "github.com/cloudwego/kitex/pkg/limit" "github.com/kitex-contrib/xds/core/xdsresource" ) +const ( + // reservedLdsResourceName the virtualInbound is used for server side configuration, should + // initialize it in the first and reserved for the all lifecycle. + reservedLdsResourceName = "virtualInbound" + + // defaultServerPort if not set port for the server, it will use this. + defaultServerPort = 0 +) + // xdsResourceManager manages all the xds resources in the cache and export Get function for resource retrieve. // It uses client to fetch the resources from the control plane. // It cleans the expired resource periodically. @@ -47,6 +58,9 @@ type xdsResourceManager struct { // options opts *Options + + // one server may has multiple listed port, each port should has individual limiter policy + limiterHandlers map[uint32]xdsresource.UpdateLimiterCallback } // notifier is used to notify the resource update along with error @@ -65,12 +79,13 @@ func NewXDSResourceManager(bootstrapConfig *BootstrapConfig, opts ...Option) (*x // load bootstrap config var err error m := &xdsResourceManager{ - cache: map[xdsresource.ResourceType]map[string]xdsresource.Resource{}, - meta: make(map[xdsresource.ResourceType]map[string]*xdsresource.ResourceMeta), - notifierMap: make(map[xdsresource.ResourceType]map[string]*notifier), - mu: sync.RWMutex{}, - opts: NewOptions(opts), - closeCh: make(chan struct{}), + cache: map[xdsresource.ResourceType]map[string]xdsresource.Resource{}, + meta: make(map[xdsresource.ResourceType]map[string]*xdsresource.ResourceMeta), + notifierMap: make(map[xdsresource.ResourceType]map[string]*notifier), + mu: sync.RWMutex{}, + opts: NewOptions(opts), + closeCh: make(chan struct{}), + limiterHandlers: make(map[uint32]xdsresource.UpdateLimiterCallback), } // Initial xds client if bootstrapConfig == nil { @@ -117,6 +132,17 @@ func (m *xdsResourceManager) getFromCache(rType xdsresource.ResourceType, rName return nil, false } +// RegisterLimiter registers the limiter handler to resourceManager. The config stores in ListenerType xDS resource, +// If the config changed, manager will invoke the handler. Every port has individual limiter policy, it will use +// default limiter pilocy if not set port, +func (m *xdsResourceManager) RegisterLimiter(port uint32, handler xdsresource.UpdateLimiterCallback) { + m.mu.Lock() + defer m.mu.Unlock() + m.limiterHandlers[port] = handler + + m.updateLimit(m.cache[xdsresource.ListenerType]) +} + // Get gets the specified resource from cache or from the control plane. // If the resource is not in the cache, it will be fetched from the control plane via client. // This will be a synchronous call. It uses the notifier to notify the resource update and return the resource. @@ -183,7 +209,8 @@ func (m *xdsResourceManager) cleaner() { if !ok { continue } - if time.Since(t) > defaultCacheExpireTime { + // should not delete the reserved resource + if time.Since(t) > defaultCacheExpireTime && !isReservedResource(rt, rName) { delete(m.meta[rt], rName) if m.cache[rt] != nil { delete(m.cache[rt], rName) @@ -248,11 +275,60 @@ func (m *xdsResourceManager) updateMeta(rType xdsresource.ResourceType, version } } +// the routeConfigName +func setLimitOption(token uint32) *limit.Option { + maxQPS := int(token) + // if the token is zero, set the value to Max to disable the limiter + if 0 == maxQPS { + maxQPS = math.MaxInt + } + return &limit.Option{ + MaxQPS: maxQPS, + MaxConnections: math.MaxInt, + } +} + +func (m *xdsResourceManager) getLimiterPolicy(up map[string]xdsresource.Resource) map[uint32]uint32 { + val, ok := up[reservedLdsResourceName] + if !ok { + return nil + } + lds, ok := val.(*xdsresource.ListenerResource) + if !ok { + return nil + } + maxTokens := make(map[uint32]uint32) + for _, lis := range lds.NetworkFilters { + if lis.InlineRouteConfig != nil { + maxTokens[lis.RoutePort] = lis.InlineRouteConfig.MaxTokens + } + } + return maxTokens +} + +func (m *xdsResourceManager) updateLimit(up map[string]xdsresource.Resource) { + tokens := m.getLimiterPolicy(up) + for port, handler := range m.limiterHandlers { + if mt, ok := tokens[port]; ok { + handler(setLimitOption(mt)) + } else if mt, ok := tokens[defaultServerPort]; ok { + // if not find the port, use the default server port + handler(setLimitOption(mt)) + } else { + handler(setLimitOption(0)) + } + } +} + // UpdateResource is invoked by client to update the cache func (m *xdsResourceManager) UpdateResource(rt xdsresource.ResourceType, up map[string]xdsresource.Resource, version string) { m.mu.Lock() defer m.mu.Unlock() + if rt == xdsresource.ListenerType { + m.updateLimit(up) + } + for name, res := range up { if _, ok := m.cache[rt]; !ok { m.cache[rt] = make(map[string]xdsresource.Resource) @@ -278,3 +354,7 @@ func (m *xdsResourceManager) UpdateResource(rt xdsresource.ResourceType, up map[ // update meta m.updateMeta(rt, version) } + +func isReservedResource(resourceType xdsresource.ResourceType, resourceName string) bool { + return resourceType == xdsresource.ListenerType && resourceName == reservedLdsResourceName +} diff --git a/core/xdsresource/lds.go b/core/xdsresource/lds.go index 246e8fa..1e43bb3 100644 --- a/core/xdsresource/lds.go +++ b/core/xdsresource/lds.go @@ -32,7 +32,6 @@ import ( // Only includes NetworkFilters now type ListenerResource struct { NetworkFilters []*NetworkFilter - MaxTokens uint32 } func (r *ListenerResource) ResourceType() ResourceType { @@ -53,6 +52,7 @@ const ( type NetworkFilter struct { FilterType NetworkFilterType RouteConfigName string + RoutePort uint32 InlineRouteConfig *RouteConfigResource } @@ -91,7 +91,6 @@ func UnmarshalLDS(rawResources []*any.Any) (map[string]*ListenerResource, error) } } - var maxTokens uint32 if fc := lis.DefaultFilterChain; fc != nil { res, err := unmarshalFilterChain(fc) if err != nil { @@ -115,6 +114,7 @@ func UnmarshalLDS(rawResources []*any.Any) (map[string]*ListenerResource, error) // unmarshalFilterChain unmarshalls the filter chain. // Only process HttpConnectionManager and ThriftProxy now. func unmarshalFilterChain(fc *v3listenerpb.FilterChain) ([]*NetworkFilter, error) { + matchPort := fc.GetFilterChainMatch().GetDestinationPort().GetValue() var filters []*NetworkFilter var errSlice []error for _, f := range fc.Filters { @@ -140,6 +140,7 @@ func unmarshalFilterChain(fc *v3listenerpb.FilterChain) ([]*NetworkFilter, error filters = append(filters, &NetworkFilter{ FilterType: NetworkFilterTypeHTTP, RouteConfigName: n, + RoutePort: matchPort, InlineRouteConfig: r, }) } @@ -226,28 +227,34 @@ func unmarshallHTTPConnectionManager(rawResources *any.Any) (string, *RouteConfi if err != nil { return "", nil, err } + maxTokens, err := getLocalRateLimitFromHttpConnectionManager(httpConnMng) + if err != nil { + return "", nil, err + } + inlineRouteConfig.MaxTokens = maxTokens return httpConnMng.GetRouteConfig().GetName(), inlineRouteConfig, nil } - if httpConnMng.StatPrefix == "InboundPassthroughClusterIpv4" { - for _, filter := range httpConnMng.HttpFilters { - switch filter.ConfigType.(type) { - case *v3httppb.HttpFilter_TypedConfig: - if filter.GetTypedConfig() == nil { - return "", nil, fmt.Errorf("no TypedConfig in the HttpFilter") + return "", nil, nil +} + +func getLocalRateLimitFromHttpConnectionManager(hcm *v3httppb.HttpConnectionManager) (uint32, error) { + for _, filter := range hcm.HttpFilters { + switch filter.ConfigType.(type) { + case *v3httppb.HttpFilter_TypedConfig: + if filter.GetTypedConfig() == nil { + continue + } + if filter.GetTypedConfig().TypeUrl == "type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit" || + filter.Name == "envoy.filters.http.local_ratelimit" { + lrl := &ratelimitv3.LocalRateLimit{} + if err := proto.Unmarshal(filter.GetTypedConfig().GetValue(), lrl); err != nil { + return 0, fmt.Errorf("unmarshal LocalRateLimit failed: %s", err) } - if filter.GetTypedConfig().TypeUrl == "type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit" { - lrl := &ratelimitv3.LocalRateLimit{} - if err := proto.Unmarshal(filter.GetTypedConfig().GetValue(), lrl); err != nil { - return "", nil, fmt.Errorf("unmarshal HttpConnectionManager failed: %s", err) - } - if lrl.TokenBucket != nil { - return lrl.TokenBucket.MaxTokens - } + if lrl.TokenBucket != nil { + return lrl.TokenBucket.MaxTokens, nil } } - } - return "InboundPassthroughClusterIpv4", nil, nil } - return "", nil, nil + return 0, nil } diff --git a/core/xdsresource/lds_test.go b/core/xdsresource/lds_test.go index cdcc1d9..93e59cf 100644 --- a/core/xdsresource/lds_test.go +++ b/core/xdsresource/lds_test.go @@ -20,6 +20,10 @@ import ( "reflect" "testing" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + ratelimitv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/local_ratelimit/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/stretchr/testify/assert" "github.com/golang/protobuf/ptypes/any" @@ -131,3 +135,28 @@ func TestUnmarshallLDSThriftProxy(t *testing.T) { f(lis.NetworkFilters[0]) f(lis.NetworkFilters[1]) } + +func TestGetLocalRateLimitFromHttpConnectionManager(t *testing.T) { + rateLimit := &ratelimitv3.LocalRateLimit{ + TokenBucket: &v3.TokenBucket{ + MaxTokens: 10, + }, + } + hcm := &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: "InboundPassthroughClusterIpv4", + }, + }, + HttpFilters: []*v3httppb.HttpFilter{ + { + ConfigType: &v3httppb.HttpFilter_TypedConfig{ + TypedConfig: MarshalAny(rateLimit), + }, + }, + }, + } + token, err := getLocalRateLimitFromHttpConnectionManager(hcm) + assert.Equal(t, err, nil) + assert.Equal(t, token, uint32(10)) +} diff --git a/core/xdsresource/rds.go b/core/xdsresource/rds.go index 64c18f2..376b5ac 100644 --- a/core/xdsresource/rds.go +++ b/core/xdsresource/rds.go @@ -33,6 +33,7 @@ import ( type RouteConfigResource struct { HTTPRouteConfig *HTTPRouteConfig ThriftRouteConfig *ThriftRouteConfig + MaxTokens uint32 } type HTTPRouteConfig struct { diff --git a/core/xdsresource/xdsresource.go b/core/xdsresource/xdsresource.go index 7f30159..bedcf8d 100644 --- a/core/xdsresource/xdsresource.go +++ b/core/xdsresource/xdsresource.go @@ -19,6 +19,8 @@ package xdsresource import ( "sync/atomic" "time" + + "github.com/cloudwego/kitex/pkg/limit" ) type Resource interface{} @@ -89,3 +91,6 @@ var ResourceTypeToName = map[ResourceType]string{ EndpointsType: "ClusterLoadAssignment", NameTableType: "NameTable", } + +// UpdateLimiterCallback is the callback function for limiter policy when a resource is updated. +type UpdateLimiterCallback func(*limit.Option) diff --git a/go.mod b/go.mod index 7b747e4..7875616 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b github.com/cenkalti/backoff/v4 v4.1.0 github.com/cloudwego/kitex v0.7.3 + github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 github.com/envoyproxy/go-control-plane v0.11.1 github.com/golang/protobuf v1.5.3 github.com/stretchr/testify v1.8.3 @@ -26,7 +27,6 @@ require ( github.com/cloudwego/localsession v0.0.2 // indirect github.com/cloudwego/netpoll v0.5.1 // indirect github.com/cloudwego/thriftgo v0.3.2-0.20230828085742-edaddf2c17af // indirect - github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/envoyproxy/protoc-gen-validate v1.0.1 // indirect github.com/fatih/structtag v1.2.0 // indirect diff --git a/xdssuite/limiter.go b/xdssuite/limiter.go new file mode 100644 index 0000000..73a3bbb --- /dev/null +++ b/xdssuite/limiter.go @@ -0,0 +1,58 @@ +package xdssuite + +import ( + "sync/atomic" + + "github.com/cloudwego/kitex/pkg/klog" + "github.com/cloudwego/kitex/pkg/limit" + "github.com/cloudwego/kitex/server" +) + +type Limiter struct { + updater atomic.Value +} + +func (l *Limiter) updateLimiter(opt *limit.Option) { + u := l.updater.Load() + if u == nil { + klog.Warnf("[xds] server limiter config failed as the updater is empty") + return + } + if !u.(limit.Updater).UpdateLimit(opt) { + klog.Warnf("[xds] server limiter config: data %v may do not take affect", opt) + } +} + +func NewLimiter(opts ...Option) server.Option { + serverOpt := NewOptions(opts) + var updater atomic.Value + opt := &limit.Option{} + opt.UpdateControl = func(u limit.Updater) { + u.UpdateLimit(opt) + updater.Store(u) + } + m := xdsResourceManager.getManager() + if m == nil { + panic("xds resource manager has not been initialized") + } + m.RegisterLimiter(serverOpt.servicePort, func(chanegd *limit.Option) { + if chanegd == nil { + return + } + opt.MaxConnections = chanegd.MaxConnections + opt.MaxQPS = chanegd.MaxQPS + u := updater.Load() + if u == nil { + klog.Warnf("[xds] server limiter config failed as the updater is empty") + return + } + up, ok := u.(limit.Updater) + if !ok { + return + } + if !up.UpdateLimit(opt) { + klog.Warnf("[xds] server limiter config: data %s may do not take affect", chanegd) + } + }) + return server.WithLimit(opt) +} diff --git a/xdssuite/option.go b/xdssuite/option.go index ee40e08..14a9cd3 100644 --- a/xdssuite/option.go +++ b/xdssuite/option.go @@ -27,6 +27,7 @@ type routerMetaExtractor func(context.Context) map[string]string // Options for xds suite type Options struct { routerMetaExtractor routerMetaExtractor // use metainfo.GetAllValues by default. + servicePort uint32 } func (o *Options) Apply(opts []Option) { diff --git a/xdssuite/xds.go b/xdssuite/xds.go index 8e7d9db..42f6980 100644 --- a/xdssuite/xds.go +++ b/xdssuite/xds.go @@ -41,6 +41,8 @@ func (m *singletonManager) getManager() XDSResourceManager { // Get() returns error when the fetching fails or the resource is not found in the latest update. type XDSResourceManager interface { Get(ctx context.Context, resourceType xdsresource.ResourceType, resourceName string) (interface{}, error) + // RegisterLimiter registers the callback function for limiter + RegisterLimiter(port uint32, handler xdsresource.UpdateLimiterCallback) } func XDSInited() bool {