Skip to content

Commit

Permalink
Merge pull request #40 from jizhuozhi/main
Browse files Browse the repository at this point in the history
feat: support setting default weight
  • Loading branch information
HeyJavaBean authored Jul 8, 2024
2 parents 5c2a4de + 531ad96 commit afcb798
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 10 deletions.
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,47 @@ func main() {
}
```

## Default Weight

The weighted load balancing algorithm can only handle positive weights, and will be filtered when the weight is if 0 or negative. Setting default weights can avoid filtering.

### Default Config

| Config Name | Default Value | Description |
|:------------------|:--------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| WithDefaultWeight | 10 | Used to set the default wight of instances, if 0 or negative, it means instances with 0 or negative weight will be filtered |

### Example

```go
package main

import (
...
"github.com/cloudwego/kitex/client"
etcd "github.com/kitex-contrib/registry-etcd"
)

func main() {
// creates a etcd based resolver with default weight
r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"}, etcd.WithDefaultWeight(10))
if err != nil {
log.Fatal(err)
}
client := hello.MustNewClient("Hello", client.WithResolver(r))
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
resp, err := client.Echo(ctx, &api.Request{Message: "Hello"})
cancel()
if err != nil {
log.Fatal(err)
}
log.Println(resp)
time.Sleep(time.Second)
}
}
```

## How to Dynamically specify ip and port
To dynamically specify an IP and port, one should first set the environment variables KITEX_IP_TO_REGISTRY and KITEX_PORT_TO_REGISTRY. If these variables are not set, the system defaults to using the service's listening IP and port. Notably, if the service's listening IP is either not set or set to "::", the system will automatically retrieve and use the machine's IPV4 address.

Expand Down
20 changes: 14 additions & 6 deletions etcd_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ const (

// etcdResolver is a resolver using etcd.
type etcdResolver struct {
etcdClient *clientv3.Client
prefix string
etcdClient *clientv3.Client
prefix string
defaultWeight int
}

// NewEtcdResolver creates a etcd based resolver.
Expand All @@ -42,7 +43,8 @@ func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, er
EtcdConfig: &clientv3.Config{
Endpoints: endpoints,
},
Prefix: "kitex/registry-etcd",
Prefix: "kitex/registry-etcd",
DefaultWeight: defaultWeight,
}
for _, opt := range opts {
opt(cfg)
Expand All @@ -52,8 +54,9 @@ func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, er
return nil, err
}
return &etcdResolver{
etcdClient: etcdClient,
prefix: cfg.Prefix,
etcdClient: etcdClient,
prefix: cfg.Prefix,
defaultWeight: cfg.DefaultWeight,
}, nil
}

Expand Down Expand Up @@ -95,7 +98,7 @@ func (e *etcdResolver) Resolve(ctx context.Context, desc string) (discovery.Resu
}
weight := info.Weight
if weight <= 0 {
weight = defaultWeight
weight = e.defaultWeight
}
eps = append(eps, discovery.NewInstance(info.Network, info.Address, weight, info.Tags))
}
Expand All @@ -118,6 +121,11 @@ func (e *etcdResolver) Diff(cacheKey string, prev, next discovery.Result) (disco
func (e *etcdResolver) Name() string {
return "etcd"
}

func (e *etcdResolver) GetPrefix() string {
return e.prefix
}

func (e *etcdResolver) GetDefaultWeight() int {
return e.defaultWeight
}
91 changes: 91 additions & 0 deletions etcd_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,94 @@ func TestEtcdResolverWithEtcdPrefix2(t *testing.T) {

teardownEmbedEtcd(s)
}

func TestEtcdResolverWithDefaultWeight(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)

rg, err := NewEtcdRegistry([]string{endpoint})
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: serviceName,
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"),
Weight: 66,
Tags: map[string]string{
"hello": "world",
},
},
{
ServiceName: serviceName,
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"),
Weight: 0,
Tags: map[string]string{
"hello": "world",
},
},
}

f := func(rs discovery.Resolver, defaultWeight int) {
// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)
}
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(serviceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: serviceName,
Instances: []discovery.Instance{},
}
for _, info := range infoList {
if info.Weight > 0 {
expected.Instances = append(expected.Instances, discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags))
} else {
expected.Instances = append(expected.Instances, discovery.NewInstance(info.Addr.Network(), info.Addr.String(), defaultWeight, info.Tags))
}
}
require.Equal(t, expected, result)
}

// test deregister service
{
require.Nil(t, err)
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
}
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(serviceName, "", nil, nil))
_, err = rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

var rsList []struct {
rs discovery.Resolver
w int
}

rs, err := NewEtcdResolver([]string{endpoint})
require.Nil(t, err)
rsList = append(rsList, struct {
rs discovery.Resolver
w int
}{rs, defaultWeight})

for _, w := range []int{-1, 0, 10, 100} {
rs, err := NewEtcdResolver([]string{endpoint}, WithDefaultWeight(w))
require.Nil(t, err)
rsList = append(rsList, struct {
rs discovery.Resolver
w int
}{rs, w})
}

for _, rs := range rsList {
f(rs.rs, rs.w)
}

teardownEmbedEtcd(s)
}
17 changes: 13 additions & 4 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"github.com/cloudwego/kitex/pkg/klog"
clientv3 "go.etcd.io/etcd/client/v3"
"io/ioutil" //nolint
"time"

"github.com/cloudwego/kitex/pkg/klog"
clientv3 "go.etcd.io/etcd/client/v3"
)

// Option sets options such as username, tls etc.
type Option func(cfg *Config)

type Config struct {
EtcdConfig *clientv3.Config
Prefix string
EtcdConfig *clientv3.Config
Prefix string
DefaultWeight int
}

// WithTLSOpt returns a option that authentication by tls/ssl.
Expand Down Expand Up @@ -85,3 +87,10 @@ func WithEtcdServicePrefix(prefix string) Option {
c.Prefix = prefix
}
}

// WithDefaultWeight returns an option that sets the DefaultWeight field in the Config struct
func WithDefaultWeight(defaultWeight int) Option {
return func(cfg *Config) {
cfg.DefaultWeight = defaultWeight
}
}

0 comments on commit afcb798

Please sign in to comment.