Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support setting default weight #40

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,47 @@ func main() {
}
```

## Default Weight

The weighted load balancing algorithm can only handle positive weights, and will be filtered when the weight is if 0 or negative. Setting default weights can avoid filtering.

### Default Config

| Config Name | Default Value | Description |
|:------------------|:--------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| WithDefaultWeight | 10 | Used to set the default wight of instances, if 0 or negative, it means instances with 0 or negative weight will be filtered |

### Example

```go
package main

import (
...
"github.com/cloudwego/kitex/client"
etcd "github.com/kitex-contrib/registry-etcd"
)

func main() {
// creates a etcd based resolver with default weight
r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"}, etcd.WithDefaultWeight(10))
if err != nil {
log.Fatal(err)
}
client := hello.MustNewClient("Hello", client.WithResolver(r))
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
resp, err := client.Echo(ctx, &api.Request{Message: "Hello"})
cancel()
if err != nil {
log.Fatal(err)
}
log.Println(resp)
time.Sleep(time.Second)
}
}
```

## How to Dynamically specify ip and port
To dynamically specify an IP and port, one should first set the environment variables KITEX_IP_TO_REGISTRY and KITEX_PORT_TO_REGISTRY. If these variables are not set, the system defaults to using the service's listening IP and port. Notably, if the service's listening IP is either not set or set to "::", the system will automatically retrieve and use the machine's IPV4 address.

Expand Down
20 changes: 14 additions & 6 deletions etcd_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ const (

// etcdResolver is a resolver using etcd.
type etcdResolver struct {
etcdClient *clientv3.Client
prefix string
etcdClient *clientv3.Client
prefix string
defaultWeight int
}

// NewEtcdResolver creates a etcd based resolver.
Expand All @@ -42,7 +43,8 @@ func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, er
EtcdConfig: &clientv3.Config{
Endpoints: endpoints,
},
Prefix: "kitex/registry-etcd",
Prefix: "kitex/registry-etcd",
DefaultWeight: defaultWeight,
}
for _, opt := range opts {
opt(cfg)
Expand All @@ -52,8 +54,9 @@ func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, er
return nil, err
}
return &etcdResolver{
etcdClient: etcdClient,
prefix: cfg.Prefix,
etcdClient: etcdClient,
prefix: cfg.Prefix,
defaultWeight: cfg.DefaultWeight,
}, nil
}

Expand Down Expand Up @@ -95,7 +98,7 @@ func (e *etcdResolver) Resolve(ctx context.Context, desc string) (discovery.Resu
}
weight := info.Weight
if weight <= 0 {
weight = defaultWeight
weight = e.defaultWeight
}
eps = append(eps, discovery.NewInstance(info.Network, info.Address, weight, info.Tags))
}
Expand All @@ -118,6 +121,11 @@ func (e *etcdResolver) Diff(cacheKey string, prev, next discovery.Result) (disco
func (e *etcdResolver) Name() string {
return "etcd"
}

func (e *etcdResolver) GetPrefix() string {
return e.prefix
}

func (e *etcdResolver) GetDefaultWeight() int {
return e.defaultWeight
}
91 changes: 91 additions & 0 deletions etcd_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,94 @@ func TestEtcdResolverWithEtcdPrefix2(t *testing.T) {

teardownEmbedEtcd(s)
}

func TestEtcdResolverWithDefaultWeight(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)

rg, err := NewEtcdRegistry([]string{endpoint})
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: serviceName,
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"),
Weight: 66,
Tags: map[string]string{
"hello": "world",
},
},
{
ServiceName: serviceName,
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"),
Weight: 0,
Tags: map[string]string{
"hello": "world",
},
},
}

f := func(rs discovery.Resolver, defaultWeight int) {
// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)
}
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(serviceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: serviceName,
Instances: []discovery.Instance{},
}
for _, info := range infoList {
if info.Weight > 0 {
expected.Instances = append(expected.Instances, discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags))
} else {
expected.Instances = append(expected.Instances, discovery.NewInstance(info.Addr.Network(), info.Addr.String(), defaultWeight, info.Tags))
}
}
require.Equal(t, expected, result)
}

// test deregister service
{
require.Nil(t, err)
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
}
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(serviceName, "", nil, nil))
_, err = rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

var rsList []struct {
rs discovery.Resolver
w int
}

rs, err := NewEtcdResolver([]string{endpoint})
require.Nil(t, err)
rsList = append(rsList, struct {
rs discovery.Resolver
w int
}{rs, defaultWeight})

for _, w := range []int{-1, 0, 10, 100} {
rs, err := NewEtcdResolver([]string{endpoint}, WithDefaultWeight(w))
require.Nil(t, err)
rsList = append(rsList, struct {
rs discovery.Resolver
w int
}{rs, w})
}

for _, rs := range rsList {
f(rs.rs, rs.w)
}

teardownEmbedEtcd(s)
}
17 changes: 13 additions & 4 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"github.com/cloudwego/kitex/pkg/klog"
clientv3 "go.etcd.io/etcd/client/v3"
"io/ioutil" //nolint
"time"

"github.com/cloudwego/kitex/pkg/klog"
clientv3 "go.etcd.io/etcd/client/v3"
)

// Option sets options such as username, tls etc.
type Option func(cfg *Config)

type Config struct {
EtcdConfig *clientv3.Config
Prefix string
EtcdConfig *clientv3.Config
Prefix string
DefaultWeight int
}

// WithTLSOpt returns a option that authentication by tls/ssl.
Expand Down Expand Up @@ -85,3 +87,10 @@ func WithEtcdServicePrefix(prefix string) Option {
c.Prefix = prefix
}
}

// WithDefaultWeight returns an option that sets the DefaultWeight field in the Config struct
func WithDefaultWeight(defaultWeight int) Option {
return func(cfg *Config) {
cfg.DefaultWeight = defaultWeight
}
}
Loading