Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support global ratelimiter #56

Merged
merged 25 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9750bde
feat(limiter): add ratelimiter types
xuqingyun Nov 15, 2023
b041e34
feat(limiter): add static crd
xuqingyun Nov 15, 2023
bf25671
feat(limiter): add ratelimter option and feature gate
xuqingyun Nov 29, 2023
a8c6810
feat(limiter): add ratelimiter rest api
xuqingyun Nov 29, 2023
e5985de
feat(limiter): update limiter api types
xuqingyun Dec 9, 2023
7539674
feat(limiter): add ratelimiter server
xuqingyun Dec 9, 2023
e500c23
feat(limiter): add rate limiter clientset
xuqingyun Nov 15, 2023
ceca5cb
feat(limiter): proxy support rate limiter
xuqingyun Dec 9, 2023
f8c480a
fix(limiter): add validation for global schema config
xuqingyun Dec 11, 2023
283bd75
fix(limiter): ensure client when ratelimiter leader change
xuqingyun Dec 14, 2023
a509a87
fix(limiter): fix empty metric tag
xuqingyun Dec 14, 2023
03d9741
feat(limiter): start control plane in ratelimter
xuqingyun Nov 28, 2023
ec44c29
fix(build): fix post-build copy dir
xuqingyun Dec 15, 2023
7af5b92
feat(limiter): fix ratelimter control plane
xuqingyun Dec 16, 2023
a8575bd
feat(limiter): reset global counter when error
xuqingyun Dec 19, 2023
b63e6b0
fix(limiter): add authorizer to fix create-on-update error
xuqingyun Dec 21, 2023
5612384
fix(limiter): correct log format
xuqingyun Jan 3, 2024
d3a34c1
feat(limiter): update allocate algorithm
xuqingyun Jan 12, 2024
d843634
feat: optimize proxy/termination log
xuqingyun Aug 21, 2023
5cb7c81
feat(limiter): wait acquire done when no tokens left
xuqingyun Dec 26, 2023
5c604b3
feat(limiter): do acquire concurrently
xuqingyun Jan 8, 2024
283b45b
feat(limiter/metric): add acquire metric
xuqingyun Jan 8, 2024
ea0032d
feat(limiter): refactor inflight meter
xuqingyun Jan 15, 2024
5046398
fix(limiter): fix store data inconsistency
xuqingyun Jan 22, 2024
d1d2f4c
feat: refactor flowcontrol meter
xuqingyun Jan 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions Makefile.expansion
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,33 @@ codegen: kube-codegen
--client-path=pkg/client \
--apis-path=pkg/apis \
--go-header-file=boilerplate.go.txt \
--generators="+openapi,+client,+lister,+informer,+protobuf"
--generators="+openapi,+client,+lister,+informer,+protobuf,+deepcopy"

.PHONY: verify-license
verify-license:
@addlicense -c "ByteDance and its affiliates" -y $(shell date +"%Y") -ignore **/*.yaml -check **

# find or download controller-gen
# download controller-gen if necessary
controller-gen:
ifeq (, $(shell which controller-gen))
@{ \
set -e ;\
CONTROLLER_GEN_TMP_DIR=$$(mktemp -d) ;\
cd $$CONTROLLER_GEN_TMP_DIR ;\
go mod init tmp ;\
go get sigs.k8s.io/controller-tools/cmd/[email protected] ;\
rm -rf $$CONTROLLER_GEN_TMP_DIR ;\
}
CONTROLLER_GEN=$(GOBIN)/controller-gen
else
CONTROLLER_GEN=$(shell which controller-gen)
endif

# Generate CRD
generate-crd: controller-gen
$(CONTROLLER_GEN) crd:crdVersions=v1 paths="./pkg/apis/..." output:crd:dir=./hack/crd

# Generate code
generate: generate-crd codegen

3 changes: 3 additions & 0 deletions cmd/kube-gateway/app/options/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ProxyOptions struct {
SecureServing *proxyoptions.SecureServingOptions
ProcessInfo *genericoptions.ProcessInfo
Logging *proxyoptions.LoggingOptions
RateLimiter *proxyoptions.RateLimiterOptions
}

func NewProxyOptions() *ProxyOptions {
Expand All @@ -36,6 +37,7 @@ func NewProxyOptions() *ProxyOptions {
SecureServing: proxyoptions.NewSecureServingOptions(),
ProcessInfo: genericoptions.NewProcessInfo("kube-gateway-proxy", "kube-system"),
Logging: proxyoptions.NewLoggingOptions(),
RateLimiter: proxyoptions.NewRateLimiterOptions(),
}
}

Expand All @@ -46,5 +48,6 @@ func (s *ProxyOptions) Flags() (fss cliflag.NamedFlagSets) {
s.Authorization.AddFlags(fs)
s.SecureServing.AddFlags(fs)
s.Logging.AddFlags(fs)
s.RateLimiter.AddFlags(fs)
return
}
5 changes: 4 additions & 1 deletion cmd/kube-gateway/app/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ func CreateProxyConfig(
log.SetOutput(proxyHTTPErrorLogWriter{})

// create upstream controller
clusterController := controllers.NewUpstreamClusterController(controlplaneServerConfig.ExtraConfig.GatewaySharedInformerFactory.Proxy().V1alpha1().UpstreamClusters())
clusterController := controllers.NewUpstreamClusterController(
controlplaneServerConfig.ExtraConfig.GatewaySharedInformerFactory.Proxy().V1alpha1().UpstreamClusters(),
o.RateLimiter,
)
// Dynamic SNI for upstream cluster
recommendedConfig.Config.SecureServing.DynamicClientConfig = clusterController
// Proxy handler
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-gateway/post-build
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
cp ${MAKE_RULES_WORKSPACE}/build/kube-gateway/* "${MAKE_RULES_WORKSPACE}/output"
cp -r ${MAKE_RULES_WORKSPACE}/build/kube-gateway/* "${MAKE_RULES_WORKSPACE}/output"
60 changes: 60 additions & 0 deletions cmd/kube-ratelimiter/app/options/debugging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"github.com/spf13/pflag"

componentbaseconfig "k8s.io/component-base/config"
)

// DebuggingOptions holds the Debugging options.
type DebuggingOptions struct {
*componentbaseconfig.DebuggingConfiguration
}

// RecommendedDebuggingOptions returns the currently recommended debugging options. These are subject to change
// between releases as we add options and decide which features should be exposed or not by default.
func RecommendedDebuggingOptions() *DebuggingOptions {
return &DebuggingOptions{
DebuggingConfiguration: &componentbaseconfig.DebuggingConfiguration{
EnableProfiling: true, // profile debugging is cheap to have exposed and standard on kube binaries
},
}
}

// AddFlags adds flags related to debugging for controller manager to the specified FlagSet.
func (o *DebuggingOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}

fs.BoolVar(&o.EnableProfiling, "profiling", o.EnableProfiling,
"Enable profiling via web interface host:port/debug/pprof/")
fs.BoolVar(&o.EnableContentionProfiling, "contention-profiling", o.EnableContentionProfiling,
"Enable lock contention profiling, if profiling is enabled")
}

// Validate checks validation of DebuggingOptions.
func (o *DebuggingOptions) Validate() []error {
if o == nil {
return nil
}

errs := []error{}
return errs
}
252 changes: 252 additions & 0 deletions cmd/kube-ratelimiter/app/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright 2022 ByteDance and its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package options

import (
"net"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
cliflag "k8s.io/component-base/cli/flag"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog"

kubewharfoptions "github.com/kubewharf/apiserver-runtime/pkg/server/options"
"github.com/kubewharf/kubegateway/cmd/kube-gateway/app"
options2 "github.com/kubewharf/kubegateway/cmd/kube-gateway/app/options"
gatewayclientset "github.com/kubewharf/kubegateway/pkg/client/kubernetes"
controleplaneoptions "github.com/kubewharf/kubegateway/pkg/gateway/controlplane/options"
limitconfig "github.com/kubewharf/kubegateway/pkg/ratelimiter/config"
"github.com/kubewharf/kubegateway/pkg/ratelimiter/limiter"
"github.com/kubewharf/kubegateway/pkg/ratelimiter/options"
)

type Options struct {
ControlPlane *controleplaneoptions.ControlPlaneOptions

// ClientConnection specifies the kubeconfig file and client connection
// settings for the proxy server to use when communicating with the apiserver.
ClientConnection componentbaseconfig.ClientConnectionConfiguration
// LimitServer defines the configuration of leader election client.
LimitServer options.RateLimitOptions

SecureServing *options.SecureServingOptions
// TODO: remove insecure serving mode
InsecureServing *apiserveroptions.DeprecatedInsecureServingOptions
Authentication *apiserveroptions.DelegatingAuthenticationOptions
Authorization *options.AuthorizationOptions

Debugging *DebuggingOptions

Master string

ShardingIndex int
}

func NewOptions() *Options {
o := Options{
ControlPlane: func() *controleplaneoptions.ControlPlaneOptions {
opt := controleplaneoptions.NewControlPlaneOptions()
recommended := kubewharfoptions.NewRecommendedOptions().
WithEtcd("/registry/KubeGateway", nil).
WithServerRun().
WithProcessInfo(apiserveroptions.NewProcessInfo("kube-gateway", "kube-system"))
opt.RecommendedOptions = recommended
opt.SecureServing.BindPort = 0
return opt
}(),
SecureServing: options.NewSecureServingOptions(),
InsecureServing: &apiserveroptions.DeprecatedInsecureServingOptions{
BindAddress: net.ParseIP("0.0.0.0"),
BindPort: 18080,
BindNetwork: "tcp",
},
Authentication: func() *apiserveroptions.DelegatingAuthenticationOptions {
opt := apiserveroptions.NewDelegatingAuthenticationOptions()
opt.RemoteKubeConfigFileOptional = true
return opt
}(),
Authorization: options.NewAuthorizationOptions(),
Debugging: RecommendedDebuggingOptions(),
LimitServer: options.RateLimitOptions{
ShardingCount: 8,
LimitStore: "local",
//Identity: "", // TODO
K8sStoreSyncPeriod: time.Second * 30,
LeaderElectionConfiguration: componentbaseconfig.LeaderElectionConfiguration{
ResourceName: "kube-gateway-ratelimiter",
ResourceNamespace: "kube-gateway",
ResourceLock: "leases",
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: time.Millisecond * 3000}, // TODO
RenewDeadline: metav1.Duration{Duration: time.Millisecond * 2800},
RetryPeriod: metav1.Duration{Duration: time.Millisecond * 900},
},
},
}

// Set the PairName but leave certificate directory blank to generate in-memory by default
o.ControlPlane.SecureServing.ServerCert.CertDirectory = ""
o.ControlPlane.SecureServing.ServerCert.PairName = "kube-gateway-ratelimiter"

o.ClientConnection.QPS = 10000
o.ClientConnection.Burst = 10000

return &o
}

func (o *Options) Flags() cliflag.NamedFlagSets {

fss := o.ControlPlane.Flags()

secureServing := fss.FlagSet("ratelimiter secure serving")
o.SecureServing.AddFlags(secureServing)

o.InsecureServing.AddUnqualifiedFlags(fss.FlagSet("insecure serving"))
o.Authentication.AddFlags(fss.FlagSet("authentication"))
o.Authorization.AddFlags(fss.FlagSet("authorization"))

o.LimitServer.AddFlags(fss.FlagSet("rate limiter"))

o.Debugging.AddFlags(fss.FlagSet("debugging"))

genericfs := fss.FlagSet("generic")
genericfs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).")
genericfs.StringVar(&o.ClientConnection.Kubeconfig, "kubeconfig", o.ClientConnection.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
genericfs.Float32Var(&o.ClientConnection.QPS, "kube-api-qps", o.ClientConnection.QPS, "QPS to use while talking with clientset apiserver.")
genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with clientset apiserver.")

return fss
}

func (o *Options) Config() (*limitconfig.Config, error) {
c := &limitconfig.Config{Debugging: o.Debugging.DebuggingConfiguration}

// ControlPlane config
var loopbackClientConfig *restclient.Config
if o.LimitServer.EnableControlPlane {
if err := o.ControlPlane.RecommendedOptions.Complete(); err != nil {
return nil, err
}

// add authorization mode option
o.ControlPlane.Authorization = kubewharfoptions.NewAuthorizationOptions()
o.ControlPlane.Authorization.Modes = o.Authorization.Modes

controlPlaneServerRunOptions := options2.ControlPlaneServerRunOptions{
ControlPlaneOptions: o.ControlPlane,
}
controlPlaneConfig, err := app.CreateControlPlaneConfig(&controlPlaneServerRunOptions)
if err != nil {
return nil, err
}
c.ControlPlaneConfig = controlPlaneConfig.Complete()

loopbackClientConfig = controlPlaneConfig.RecommendedConfig.LoopbackClientConfig
}

// Prepare kube clients.
gatewayClient, client, err := createClients(loopbackClientConfig, o.ClientConnection, o.Master, o.LimitServer.RenewDeadline.Duration)
if err != nil {
return nil, err
}
c.Client = client
c.GatewayClientset = gatewayClient

informerFactory := informers.NewSharedInformerFactory(client, 0)
c.InformerFactory = informerFactory

c.RateLimiter, err = limiter.NewRateLimiter(gatewayClient, client, o.LimitServer)
if err != nil {
return nil, err
}
if err := o.ApplyTo(c); err != nil {
return nil, err
}

return c, nil
}

func (o *Options) ApplyTo(c *limitconfig.Config) error {
if err := o.InsecureServing.ApplyTo(&c.InsecureServing); err != nil {
return err
}
if err := o.SecureServing.ApplyTo(&c.SecureServing, o.ControlPlane.SecureServing.SecureServingOptions); err != nil {
return err
}
if o.SecureServing.SecurePort != 0 || c.SecureServing.Listener != nil {
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
}
if err := o.Authorization.ApplyTo(&c.Authorization, c.InformerFactory); err != nil {
return err
}
}
return nil
}

func (o *Options) Validate() []error {
errors := []error{}
errors = append(errors, o.SecureServing.ValidateWith(o.ControlPlane.SecureServing.SecureServingOptions, o.LimitServer.EnableControlPlane)...)
errors = append(errors, o.InsecureServing.Validate()...)
errors = append(errors, o.Debugging.Validate()...)
errors = append(errors, o.Authentication.Validate()...)
errors = append(errors, o.Authorization.Validate()...)

return errors
}

// createClients creates a kube client and an event client from the given limitconfig and masterOverride.
func createClients(loopbackClientConfig *restclient.Config, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string, timeout time.Duration) (gatewayclientset.Interface, clientset.Interface, error) {
if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
klog.Warningf("Neither --kubeconfig nor --master was specified. Using default API gatewayClient. This might not work.")
}

kubeConfig := loopbackClientConfig
if kubeConfig == nil {
// This creates a gatewayClient, first loading any specified kubeconfig
// file, and then overriding the Master flag, if non-empty.
var err error
kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.Kubeconfig},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
if err != nil {
return nil, nil, err
}

kubeConfig.DisableCompression = true
kubeConfig.AcceptContentTypes = config.AcceptContentTypes
kubeConfig.ContentType = config.ContentType
kubeConfig.QPS = config.QPS
kubeConfig.Burst = int(config.Burst)
}
gatewayClient, err := gatewayclientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "kube-gateway-rate-limiter"))
if err != nil {
return nil, nil, err
}

client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "kube-gateway-rate-limiter"))
if err != nil {
return nil, nil, err
}

return gatewayClient, client, nil
}
Loading