diff --git a/README.md b/README.md index f05efff..61f8f9a 100644 --- a/README.md +++ b/README.md @@ -188,6 +188,47 @@ func main() { } ``` +## Default Weight + +The weighted load balancing algorithm can only handle positive weights, and will be filtered when the weight is if 0 or negative. Setting default weights can avoid filtering. + +### Default Config + +| Config Name | Default Value | Description | +|:------------------|:--------------|:-------------------------------------------------------------------------------------------------------------------------------------| +| WithDefaultWeight | 10 | Used to set the default wight of instances, if 0 or negative, it means instances with 0 or negative weight will be filtered | + +### Example + +```go +package main + +import ( + ... + "github.com/cloudwego/kitex/client" + etcd "github.com/kitex-contrib/registry-etcd" +) + +func main() { + // creates a etcd based resolver with default weight + r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"}, "username", "password", etcd.WithDefaultWeight(10)) + if err != nil { + log.Fatal(err) + } + client := hello.MustNewClient("Hello", client.WithResolver(r)) + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + resp, err := client.Echo(ctx, &api.Request{Message: "Hello"}) + cancel() + if err != nil { + log.Fatal(err) + } + log.Println(resp) + time.Sleep(time.Second) + } +} +``` + ## How to Dynamically specify ip and port To dynamically specify an IP and port, one should first set the environment variables KITEX_IP_TO_REGISTRY and KITEX_PORT_TO_REGISTRY. If these variables are not set, the system defaults to using the service's listening IP and port. Notably, if the service's listening IP is either not set or set to "::", the system will automatically retrieve and use the machine's IPV4 address. diff --git a/etcd_resolver.go b/etcd_resolver.go index 2fd2a57..5d9a4b5 100644 --- a/etcd_resolver.go +++ b/etcd_resolver.go @@ -32,8 +32,9 @@ const ( // etcdResolver is a resolver using etcd. type etcdResolver struct { - etcdClient *clientv3.Client - prefix string + etcdClient *clientv3.Client + prefix string + defaultWeight int } // NewEtcdResolver creates a etcd based resolver. @@ -42,7 +43,8 @@ func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, er EtcdConfig: &clientv3.Config{ Endpoints: endpoints, }, - Prefix: "kitex/registry-etcd", + Prefix: "kitex/registry-etcd", + DefaultWeight: defaultWeight, } for _, opt := range opts { opt(cfg) @@ -52,8 +54,9 @@ func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, er return nil, err } return &etcdResolver{ - etcdClient: etcdClient, - prefix: cfg.Prefix, + etcdClient: etcdClient, + prefix: cfg.Prefix, + defaultWeight: cfg.DefaultWeight, }, nil } @@ -95,7 +98,7 @@ func (e *etcdResolver) Resolve(ctx context.Context, desc string) (discovery.Resu } weight := info.Weight if weight <= 0 { - weight = defaultWeight + weight = e.defaultWeight } eps = append(eps, discovery.NewInstance(info.Network, info.Address, weight, info.Tags)) } @@ -118,6 +121,11 @@ func (e *etcdResolver) Diff(cacheKey string, prev, next discovery.Result) (disco func (e *etcdResolver) Name() string { return "etcd" } + func (e *etcdResolver) GetPrefix() string { return e.prefix } + +func (e *etcdResolver) GetDefaultWeight() int { + return e.defaultWeight +} diff --git a/etcd_resolver_test.go b/etcd_resolver_test.go index b0d82fd..6b88846 100644 --- a/etcd_resolver_test.go +++ b/etcd_resolver_test.go @@ -629,3 +629,94 @@ func TestEtcdResolverWithEtcdPrefix2(t *testing.T) { teardownEmbedEtcd(s) } + +func TestEtcdResolverWithDefaultWeight(t *testing.T) { + s, endpoint := setupEmbedEtcd(t) + + rg, err := NewEtcdRegistry([]string{endpoint}) + require.Nil(t, err) + + infoList := []registry.Info{ + { + ServiceName: serviceName, + Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"), + Weight: 66, + Tags: map[string]string{ + "hello": "world", + }, + }, + { + ServiceName: serviceName, + Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"), + Weight: 0, + Tags: map[string]string{ + "hello": "world", + }, + }, + } + + f := func(rs discovery.Resolver, defaultWeight int) { + // test register service + { + for _, info := range infoList { + err = rg.Register(&info) + require.Nil(t, err) + } + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(serviceName, "", nil, nil)) + result, err := rs.Resolve(context.TODO(), desc) + require.Nil(t, err) + expected := discovery.Result{ + Cacheable: true, + CacheKey: serviceName, + Instances: []discovery.Instance{}, + } + for _, info := range infoList { + if info.Weight > 0 { + expected.Instances = append(expected.Instances, discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags)) + } else { + expected.Instances = append(expected.Instances, discovery.NewInstance(info.Addr.Network(), info.Addr.String(), defaultWeight, info.Tags)) + } + } + require.Equal(t, expected, result) + } + + // test deregister service + { + require.Nil(t, err) + for _, info := range infoList { + err = rg.Deregister(&info) + require.Nil(t, err) + } + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(serviceName, "", nil, nil)) + _, err = rs.Resolve(context.TODO(), desc) + require.NotNil(t, err) + } + } + + var rsList []struct { + rs discovery.Resolver + w int + } + + rs, err := NewEtcdResolver([]string{endpoint}) + require.Nil(t, err) + rsList = append(rsList, struct { + rs discovery.Resolver + w int + }{rs, defaultWeight}) + + for _, w := range []int{-1, 0, 10, 100} { + rs, err := NewEtcdResolver([]string{endpoint}, WithDefaultWeight(w)) + require.Nil(t, err) + rsList = append(rsList, struct { + rs discovery.Resolver + w int + }{rs, w}) + } + + for _, rs := range rsList { + f(rs.rs, rs.w) + } + + teardownEmbedEtcd(s) +} diff --git a/option.go b/option.go index c8a90ab..84b3634 100644 --- a/option.go +++ b/option.go @@ -18,18 +18,20 @@ import ( "crypto/tls" "crypto/x509" "errors" - "github.com/cloudwego/kitex/pkg/klog" - clientv3 "go.etcd.io/etcd/client/v3" "io/ioutil" //nolint "time" + + "github.com/cloudwego/kitex/pkg/klog" + clientv3 "go.etcd.io/etcd/client/v3" ) // Option sets options such as username, tls etc. type Option func(cfg *Config) type Config struct { - EtcdConfig *clientv3.Config - Prefix string + EtcdConfig *clientv3.Config + Prefix string + DefaultWeight int } // WithTLSOpt returns a option that authentication by tls/ssl. @@ -85,3 +87,10 @@ func WithEtcdServicePrefix(prefix string) Option { c.Prefix = prefix } } + +// WithDefaultWeight returns an option that sets the DefaultWeight field in the Config struct +func WithDefaultWeight(defaultWeight int) Option { + return func(cfg *Config) { + cfg.DefaultWeight = defaultWeight + } +}