Skip to content

Commit

Permalink
feat(limit): support limit config
Browse files Browse the repository at this point in the history
  • Loading branch information
whalecold committed Mar 11, 2024
1 parent cf88df9 commit 6f5c4c1
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 39 deletions.
42 changes: 30 additions & 12 deletions core/manager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"runtime/debug"
"strings"
"sync"
"sync/atomic"

"github.com/cenkalti/backoff/v4"

Expand Down Expand Up @@ -129,24 +130,28 @@ type xdsClient struct {

// channel for stop
closeCh chan struct{}
// indicates ready for listener
inboundInitRequestCh chan struct{}
closedInboundInitCh atomic.Bool

Check failure on line 135 in core/manager/client.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.17, X64)

undefined: atomic.Bool

Check failure on line 135 in core/manager/client.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.17, ARM64)

undefined: atomic.Bool

Check failure on line 135 in core/manager/client.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.18, X64)

undefined: atomic.Bool

Check failure on line 135 in core/manager/client.go

View workflow job for this annotation

GitHub Actions / unit-benchmark-test (1.18, ARM64)

undefined: atomic.Bool

mu sync.RWMutex
}

// newXdsClient constructs a new xdsClient, which is used to get xds resources from the xds server.
func newXdsClient(bCfg *BootstrapConfig, ac ADSClient, updater *xdsResourceManager) (*xdsClient, error) {
cli := &xdsClient{
config: bCfg,
adsClient: ac,
connectBackoff: backoff.NewExponentialBackOff(),
watchedResource: make(map[xdsresource.ResourceType]map[string]bool),
cipResolver: newNdsResolver(),
versionMap: make(map[xdsresource.ResourceType]string),
nonceMap: make(map[xdsresource.ResourceType]string),
resourceUpdater: updater,
closeCh: make(chan struct{}),
streamCh: make(chan ADSStream, 1),
reqCh: make(chan *discoveryv3.DiscoveryRequest, 1024),
config: bCfg,
adsClient: ac,
connectBackoff: backoff.NewExponentialBackOff(),
watchedResource: make(map[xdsresource.ResourceType]map[string]bool),
cipResolver: newNdsResolver(),
versionMap: make(map[xdsresource.ResourceType]string),
nonceMap: make(map[xdsresource.ResourceType]string),
resourceUpdater: updater,
closeCh: make(chan struct{}),
inboundInitRequestCh: make(chan struct{}),
streamCh: make(chan ADSStream, 1),
reqCh: make(chan *discoveryv3.DiscoveryRequest, 1024),
}
cli.run()
return cli, nil
Expand Down Expand Up @@ -292,13 +297,22 @@ func (c *xdsClient) ndsWarmup() {
// watch the NameTable when init the xds client
c.Watch(xdsresource.NameTableType, "", false)
<-c.cipResolver.initRequestCh
klog.Infof("KITEX: [XDS] nds, warmup done")
}

func (c *xdsClient) listenerWarmup() {
// watch the NameTable when init the xds client
c.Watch(xdsresource.ListenerType, reservedLdsResourceName, false)
<-c.inboundInitRequestCh
klog.Infof("KITEX: [XDS] lds, warmup done")
}

// warmup sends the requests (NDS) to the xds server and waits for the response to set the lookup table.
func (c *xdsClient) warmup() {
if c.ndsRequired() {
c.ndsWarmup()
}
c.listenerWarmup()
// TODO: maybe need to watch the listener
klog.Infof("KITEX: [XDS] client, warmup done")
}
Expand Down Expand Up @@ -436,7 +450,7 @@ func (c *xdsClient) handleLDS(resp *discoveryv3.DiscoveryResponse) error {
c.mu.RLock()
filteredRes := make(map[string]xdsresource.Resource)
for n := range c.watchedResource[xdsresource.ListenerType] {
if c.ndsRequired() {
if c.ndsRequired() && n != reservedLdsResourceName {
ln, err := c.getListenerName(n)
if err != nil || ln == "" {
klog.Warnf("KITEX: [XDS] get listener name %s failed, err: %v", n, err)
Expand All @@ -452,6 +466,10 @@ func (c *xdsClient) handleLDS(resp *discoveryv3.DiscoveryResponse) error {
c.mu.RUnlock()
// update to cache
c.resourceUpdater.UpdateResource(xdsresource.ListenerType, filteredRes, resp.GetVersionInfo())

if c.closedInboundInitCh.CompareAndSwap(false, true) {
close(c.inboundInitRequestCh)
}
return nil
}

Expand Down
94 changes: 87 additions & 7 deletions core/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,27 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"os"
"sync"
"sync/atomic"
"time"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/limit"

"github.com/kitex-contrib/xds/core/xdsresource"
)

const (
// reservedLdsResourceName the virtualInbound is used for server side configuration, should
// initialize it in the first and reserved for the all lifecycle.
reservedLdsResourceName = "virtualInbound"

// defaultServerPort if not set port for the server, it will use this.
defaultServerPort = 0
)

// xdsResourceManager manages all the xds resources in the cache and export Get function for resource retrieve.
// It uses client to fetch the resources from the control plane.
// It cleans the expired resource periodically.
Expand All @@ -47,6 +58,9 @@ type xdsResourceManager struct {

// options
opts *Options

// one server may has multiple listed port, each port should has individual limiter policy
limiterHandlers map[uint32]xdsresource.UpdateLimiterCallback
}

// notifier is used to notify the resource update along with error
Expand All @@ -65,12 +79,13 @@ func NewXDSResourceManager(bootstrapConfig *BootstrapConfig, opts ...Option) (*x
// load bootstrap config
var err error
m := &xdsResourceManager{
cache: map[xdsresource.ResourceType]map[string]xdsresource.Resource{},
meta: make(map[xdsresource.ResourceType]map[string]*xdsresource.ResourceMeta),
notifierMap: make(map[xdsresource.ResourceType]map[string]*notifier),
mu: sync.RWMutex{},
opts: NewOptions(opts),
closeCh: make(chan struct{}),
cache: map[xdsresource.ResourceType]map[string]xdsresource.Resource{},
meta: make(map[xdsresource.ResourceType]map[string]*xdsresource.ResourceMeta),
notifierMap: make(map[xdsresource.ResourceType]map[string]*notifier),
mu: sync.RWMutex{},
opts: NewOptions(opts),
closeCh: make(chan struct{}),
limiterHandlers: make(map[uint32]xdsresource.UpdateLimiterCallback),
}
// Initial xds client
if bootstrapConfig == nil {
Expand Down Expand Up @@ -117,6 +132,17 @@ func (m *xdsResourceManager) getFromCache(rType xdsresource.ResourceType, rName
return nil, false
}

// RegisterLimiter registers the limiter handler to resourceManager. The config stores in ListenerType xDS resource,
// If the config changed, manager will invoke the handler. Every port has individual limiter policy, it will use
// default limiter pilocy if not set port,
func (m *xdsResourceManager) RegisterLimiter(port uint32, handler xdsresource.UpdateLimiterCallback) {
m.mu.Lock()
defer m.mu.Unlock()
m.limiterHandlers[port] = handler

m.updateLimit(m.cache[xdsresource.ListenerType])
}

// Get gets the specified resource from cache or from the control plane.
// If the resource is not in the cache, it will be fetched from the control plane via client.
// This will be a synchronous call. It uses the notifier to notify the resource update and return the resource.
Expand Down Expand Up @@ -183,7 +209,8 @@ func (m *xdsResourceManager) cleaner() {
if !ok {
continue
}
if time.Since(t) > defaultCacheExpireTime {
// should not delete the reserved resource
if time.Since(t) > defaultCacheExpireTime && !isReservedResource(rt, rName) {
delete(m.meta[rt], rName)
if m.cache[rt] != nil {
delete(m.cache[rt], rName)
Expand Down Expand Up @@ -248,11 +275,60 @@ func (m *xdsResourceManager) updateMeta(rType xdsresource.ResourceType, version
}
}

// the routeConfigName
func setLimitOption(token uint32) *limit.Option {
maxQPS := int(token)
// if the token is zero, set the value to Max to disable the limiter
if 0 == maxQPS {
maxQPS = math.MaxInt
}
return &limit.Option{
MaxQPS: maxQPS,
MaxConnections: math.MaxInt,
}
}

func (m *xdsResourceManager) getLimiterPolicy(up map[string]xdsresource.Resource) map[uint32]uint32 {
val, ok := up[reservedLdsResourceName]
if !ok {
return nil
}
lds, ok := val.(*xdsresource.ListenerResource)
if !ok {
return nil
}
maxTokens := make(map[uint32]uint32)
for _, lis := range lds.NetworkFilters {
if lis.InlineRouteConfig != nil {
maxTokens[lis.RoutePort] = lis.InlineRouteConfig.MaxTokens
}
}
return maxTokens
}

func (m *xdsResourceManager) updateLimit(up map[string]xdsresource.Resource) {
tokens := m.getLimiterPolicy(up)
for port, handler := range m.limiterHandlers {
if mt, ok := tokens[port]; ok {
handler(setLimitOption(mt))
} else if mt, ok := tokens[defaultServerPort]; ok {
// if not find the port, use the default server port
handler(setLimitOption(mt))
} else {
handler(setLimitOption(0))
}
}
}

// UpdateResource is invoked by client to update the cache
func (m *xdsResourceManager) UpdateResource(rt xdsresource.ResourceType, up map[string]xdsresource.Resource, version string) {
m.mu.Lock()
defer m.mu.Unlock()

if rt == xdsresource.ListenerType {
m.updateLimit(up)
}

for name, res := range up {
if _, ok := m.cache[rt]; !ok {
m.cache[rt] = make(map[string]xdsresource.Resource)
Expand All @@ -278,3 +354,7 @@ func (m *xdsResourceManager) UpdateResource(rt xdsresource.ResourceType, up map[
// update meta
m.updateMeta(rt, version)
}

func isReservedResource(resourceType xdsresource.ResourceType, resourceName string) bool {
return resourceType == xdsresource.ListenerType && resourceName == reservedLdsResourceName
}
45 changes: 26 additions & 19 deletions core/xdsresource/lds.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
// Only includes NetworkFilters now
type ListenerResource struct {
NetworkFilters []*NetworkFilter
MaxTokens uint32
}

func (r *ListenerResource) ResourceType() ResourceType {
Expand All @@ -53,6 +52,7 @@ const (
type NetworkFilter struct {
FilterType NetworkFilterType
RouteConfigName string
RoutePort uint32
InlineRouteConfig *RouteConfigResource
}

Expand Down Expand Up @@ -91,7 +91,6 @@ func UnmarshalLDS(rawResources []*any.Any) (map[string]*ListenerResource, error)
}
}

var maxTokens uint32
if fc := lis.DefaultFilterChain; fc != nil {
res, err := unmarshalFilterChain(fc)
if err != nil {
Expand All @@ -115,6 +114,7 @@ func UnmarshalLDS(rawResources []*any.Any) (map[string]*ListenerResource, error)
// unmarshalFilterChain unmarshalls the filter chain.
// Only process HttpConnectionManager and ThriftProxy now.
func unmarshalFilterChain(fc *v3listenerpb.FilterChain) ([]*NetworkFilter, error) {
matchPort := fc.GetFilterChainMatch().GetDestinationPort().GetValue()
var filters []*NetworkFilter
var errSlice []error
for _, f := range fc.Filters {
Expand All @@ -140,6 +140,7 @@ func unmarshalFilterChain(fc *v3listenerpb.FilterChain) ([]*NetworkFilter, error
filters = append(filters, &NetworkFilter{
FilterType: NetworkFilterTypeHTTP,
RouteConfigName: n,
RoutePort: matchPort,
InlineRouteConfig: r,
})
}
Expand Down Expand Up @@ -226,28 +227,34 @@ func unmarshallHTTPConnectionManager(rawResources *any.Any) (string, *RouteConfi
if err != nil {
return "", nil, err
}
maxTokens, err := getLocalRateLimitFromHttpConnectionManager(httpConnMng)
if err != nil {
return "", nil, err
}
inlineRouteConfig.MaxTokens = maxTokens
return httpConnMng.GetRouteConfig().GetName(), inlineRouteConfig, nil
}
if httpConnMng.StatPrefix == "InboundPassthroughClusterIpv4" {
for _, filter := range httpConnMng.HttpFilters {
switch filter.ConfigType.(type) {
case *v3httppb.HttpFilter_TypedConfig:
if filter.GetTypedConfig() == nil {
return "", nil, fmt.Errorf("no TypedConfig in the HttpFilter")
return "", nil, nil
}

func getLocalRateLimitFromHttpConnectionManager(hcm *v3httppb.HttpConnectionManager) (uint32, error) {
for _, filter := range hcm.HttpFilters {
switch filter.ConfigType.(type) {
case *v3httppb.HttpFilter_TypedConfig:
if filter.GetTypedConfig() == nil {
continue
}
if filter.GetTypedConfig().TypeUrl == "type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit" ||
filter.Name == "envoy.filters.http.local_ratelimit" {
lrl := &ratelimitv3.LocalRateLimit{}
if err := proto.Unmarshal(filter.GetTypedConfig().GetValue(), lrl); err != nil {
return 0, fmt.Errorf("unmarshal LocalRateLimit failed: %s", err)
}
if filter.GetTypedConfig().TypeUrl == "type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit" {
lrl := &ratelimitv3.LocalRateLimit{}
if err := proto.Unmarshal(filter.GetTypedConfig().GetValue(), lrl); err != nil {
return "", nil, fmt.Errorf("unmarshal HttpConnectionManager failed: %s", err)
}
if lrl.TokenBucket != nil {
return lrl.TokenBucket.MaxTokens
}
if lrl.TokenBucket != nil {
return lrl.TokenBucket.MaxTokens, nil
}
}

}
return "InboundPassthroughClusterIpv4", nil, nil
}
return "", nil, nil
return 0, nil
}
29 changes: 29 additions & 0 deletions core/xdsresource/lds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"reflect"
"testing"

v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
ratelimitv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/local_ratelimit/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/stretchr/testify/assert"

"github.com/golang/protobuf/ptypes/any"
Expand Down Expand Up @@ -131,3 +135,28 @@ func TestUnmarshallLDSThriftProxy(t *testing.T) {
f(lis.NetworkFilters[0])
f(lis.NetworkFilters[1])
}

func TestGetLocalRateLimitFromHttpConnectionManager(t *testing.T) {
rateLimit := &ratelimitv3.LocalRateLimit{
TokenBucket: &v3.TokenBucket{
MaxTokens: 10,
},
}
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: "InboundPassthroughClusterIpv4",
},
},
HttpFilters: []*v3httppb.HttpFilter{
{
ConfigType: &v3httppb.HttpFilter_TypedConfig{
TypedConfig: MarshalAny(rateLimit),
},
},
},
}
token, err := getLocalRateLimitFromHttpConnectionManager(hcm)
assert.Equal(t, err, nil)
assert.Equal(t, token, uint32(10))
}
1 change: 1 addition & 0 deletions core/xdsresource/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type RouteConfigResource struct {
HTTPRouteConfig *HTTPRouteConfig
ThriftRouteConfig *ThriftRouteConfig
MaxTokens uint32
}

type HTTPRouteConfig struct {
Expand Down
Loading

0 comments on commit 6f5c4c1

Please sign in to comment.