Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support modification of prefix #34

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@ package etcd

import "fmt"

const (
etcdPrefixTpl = "kitex/registry-etcd/%v/"
)

func serviceKeyPrefix(serviceName string) string {
return fmt.Sprintf(etcdPrefixTpl, serviceName)
func serviceKeyPrefix(prefix string, serviceName string) string {
prefix = prefix + "/%v/"
li-jin-gou marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Sprintf(prefix, serviceName)
}

// serviceKey generates the key used to stored in etcd.
func serviceKey(serviceName, addr string) string {
return serviceKeyPrefix(serviceName) + addr
func serviceKey(prefix string, serviceName, addr string) string {
return serviceKeyPrefix(prefix, serviceName) + addr
}

// instanceInfo used to stored service basic info in etcd.
Expand Down
30 changes: 19 additions & 11 deletions etcd_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type etcdRegistry struct {
retryConfig *retry.Config
stop chan struct{}
address net.Addr
prefix string
}

type registerMeta struct {
Expand All @@ -55,13 +56,16 @@ type registerMeta struct {

// NewEtcdRegistry creates an etcd based registry.
func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdConfig: &clientv3.Config{
Endpoints: endpoints,
},
Prefix: "kitex/registry-etcd",
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdConfig)
if err != nil {
return nil, err
}
Expand All @@ -71,6 +75,7 @@ func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, err
leaseTTL: getTTL(),
retryConfig: retryConfig,
stop: make(chan struct{}, 1),
prefix: cfg.Prefix,
}, nil
}

Expand All @@ -86,13 +91,16 @@ func SetFixedAddress(r registry.Registry, address net.Addr) {

// NewEtcdRegistryWithRetry creates an etcd based registry with given custom retry configs
func NewEtcdRegistryWithRetry(endpoints []string, retryConfig *retry.Config, opts ...Option) (registry.Registry, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdConfig: &clientv3.Config{
Endpoints: endpoints,
},
Prefix: "kitex/registry-etcd",
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,14 +189,14 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID))
_, err = e.etcdClient.Put(ctx, serviceKey(e.prefix, info.ServiceName, addr), string(val), clientv3.WithLease(leaseID))
if err != nil {
return err
}

go func(key, val string) {
e.keepRegister(key, val, e.retryConfig)
}(serviceKey(info.ServiceName, addr), string(val))
}(serviceKey(e.prefix, info.ServiceName, addr), string(val))

return nil
}
Expand Down Expand Up @@ -262,7 +270,7 @@ func (e *etcdRegistry) deregister(info *registry.Info) error {
if err != nil {
return err
}
_, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr))
_, err = e.etcdClient.Delete(ctx, serviceKey(e.prefix, info.ServiceName, addr))
if err != nil {
return err
}
Expand Down
18 changes: 13 additions & 5 deletions etcd_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,27 @@ const (
// etcdResolver is a resolver using etcd.
type etcdResolver struct {
etcdClient *clientv3.Client
prefix string
}

// NewEtcdResolver creates a etcd based resolver.
func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
cfg := &Config{
EtcdConfig: &clientv3.Config{
Endpoints: endpoints,
},
Prefix: "kitex/registry-etcd",
}
for _, opt := range opts {
opt(&cfg)
opt(cfg)
}
etcdClient, err := clientv3.New(cfg)
etcdClient, err := clientv3.New(*cfg.EtcdConfig)
if err != nil {
return nil, err
}
return &etcdResolver{
etcdClient: etcdClient,
prefix: cfg.Prefix,
}, nil
}

Expand All @@ -75,7 +80,7 @@ func (e *etcdResolver) Target(ctx context.Context, target rpcinfo.EndpointInfo)

// Resolve implements the Resolver interface.
func (e *etcdResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) {
prefix := serviceKeyPrefix(desc)
prefix := serviceKeyPrefix(e.prefix, desc)
resp, err := e.etcdClient.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return discovery.Result{}, err
Expand Down Expand Up @@ -113,3 +118,6 @@ func (e *etcdResolver) Diff(cacheKey string, prev, next discovery.Result) (disco
func (e *etcdResolver) Name() string {
return "etcd"
}
func (e *etcdResolver) GetPrefix() string {
return e.prefix
}
120 changes: 119 additions & 1 deletion etcd_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"io/ioutil"
"io/ioutil" //nolint
"math/big"
"net"
"net/url"
Expand Down Expand Up @@ -511,3 +511,121 @@ func teardownEmbedEtcd(s *embed.Etcd) {
s.Close()
_ = os.RemoveAll(s.Config().Dir)
}
func TestEtcdResolverWithEtcdPrefix(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)
tpl := "etcd/v1"
rg, err := NewEtcdRegistry([]string{endpoint}, WithEtcdConfigAndPrefix(tpl))
require.Nil(t, err)
rs, err := NewEtcdResolver([]string{endpoint}, WithEtcdConfigAndPrefix(tpl))
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: "registry-etcd-test-suffix",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
{
ServiceName: "registry-etcd-test",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
}

// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)

desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: info.ServiceName,
Instances: []discovery.Instance{
discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags),
},
}
require.Equal(t, expected, result)
prefix := serviceKeyPrefix(rs.(*etcdResolver).GetPrefix(), info.ServiceName)
println(prefix)
require.Equal(t, fmt.Sprintf(tpl+"/%v/", info.ServiceName), prefix)
}
}

// test deregister service
{
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
_, err = rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

teardownEmbedEtcd(s)
}

func TestEtcdResolverWithEtcdPrefix2(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)
rg, err := NewEtcdRegistry([]string{endpoint})
require.Nil(t, err)
rs, err := NewEtcdResolver([]string{endpoint})
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: "registry-etcd-test-suffix",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
{
ServiceName: "registry-etcd-test",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"),
Weight: 66,
Tags: map[string]string{"hello": "world"},
},
}

// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)

desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: info.ServiceName,
Instances: []discovery.Instance{
discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags),
},
}
require.Equal(t, expected, result)
prefix := serviceKeyPrefix(rs.(*etcdResolver).GetPrefix(), info.ServiceName)
println(prefix)
require.Equal(t, fmt.Sprintf("kitex/registry-etcd/%v/", info.ServiceName), prefix)
}
}

// test deregister service
{
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
_, err = rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

teardownEmbedEtcd(s)
}
33 changes: 22 additions & 11 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,43 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"io/ioutil"
"time"

"github.com/cloudwego/kitex/pkg/klog"
clientv3 "go.etcd.io/etcd/client/v3"
"io/ioutil" //nolint
"time"
)

// Option sets options such as username, tls etc.
type Option func(cfg *clientv3.Config)
type Option func(cfg *Config)

type Config struct {
EtcdConfig *clientv3.Config
Prefix string
}

// WithTLSOpt returns a option that authentication by tls/ssl.
func WithTLSOpt(certFile, keyFile, caFile string) Option {
return func(cfg *clientv3.Config) {
return func(cfg *Config) {
tlsCfg, err := newTLSConfig(certFile, keyFile, caFile, "")
if err != nil {
klog.Errorf("tls failed with err: %v , skipping tls.", err)
}
cfg.TLS = tlsCfg
cfg.EtcdConfig.TLS = tlsCfg
}
}

// WithAuthOpt returns a option that authentication by usernane and password.
func WithAuthOpt(username, password string) Option {
return func(cfg *clientv3.Config) {
cfg.Username = username
cfg.Password = password
return func(cfg *Config) {
cfg.EtcdConfig.Username = username
cfg.EtcdConfig.Password = password
}
}

// WithDialTimeoutOpt returns a option set dialTimeout
func WithDialTimeoutOpt(dialTimeout time.Duration) Option {
return func(cfg *clientv3.Config) {
cfg.DialTimeout = dialTimeout
return func(cfg *Config) {
cfg.EtcdConfig.DialTimeout = dialTimeout
}
}

Expand All @@ -74,3 +78,10 @@ func newTLSConfig(certFile, keyFile, caFile, serverName string) (*tls.Config, er
}
return cfg, nil
}

// WithEtcdConfigAndPrefix returns an option that sets the Prefix field in the Config struct
func WithEtcdConfigAndPrefix(prefix string) Option {
li-jin-gou marked this conversation as resolved.
Show resolved Hide resolved
return func(c *Config) {
c.Prefix = prefix
}
}
Loading