diff --git a/common.go b/common.go index 1f30f1e..533c38d 100644 --- a/common.go +++ b/common.go @@ -16,17 +16,14 @@ package etcd import "fmt" -const ( - etcdPrefixTpl = "kitex/registry-etcd/%v/" -) - -func serviceKeyPrefix(serviceName string) string { - return fmt.Sprintf(etcdPrefixTpl, serviceName) +func serviceKeyPrefix(prefix string, serviceName string) string { + prefix = prefix + "/%v/" + return fmt.Sprintf(prefix, serviceName) } // serviceKey generates the key used to stored in etcd. -func serviceKey(serviceName, addr string) string { - return serviceKeyPrefix(serviceName) + addr +func serviceKey(prefix string, serviceName, addr string) string { + return serviceKeyPrefix(prefix, serviceName) + addr } // instanceInfo used to stored service basic info in etcd. diff --git a/etcd_registry.go b/etcd_registry.go index 5e3fa78..09b0d4c 100644 --- a/etcd_registry.go +++ b/etcd_registry.go @@ -45,6 +45,7 @@ type etcdRegistry struct { retryConfig *retry.Config stop chan struct{} address net.Addr + prefix string } type registerMeta struct { @@ -55,13 +56,16 @@ type registerMeta struct { // NewEtcdRegistry creates an etcd based registry. func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, error) { - cfg := clientv3.Config{ - Endpoints: endpoints, + cfg := &Config{ + EtcdConfig: &clientv3.Config{ + Endpoints: endpoints, + }, + Prefix: "kitex/registry-etcd", } for _, opt := range opts { - opt(&cfg) + opt(cfg) } - etcdClient, err := clientv3.New(cfg) + etcdClient, err := clientv3.New(*cfg.EtcdConfig) if err != nil { return nil, err } @@ -71,6 +75,7 @@ func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, err leaseTTL: getTTL(), retryConfig: retryConfig, stop: make(chan struct{}, 1), + prefix: cfg.Prefix, }, nil } @@ -86,13 +91,16 @@ func SetFixedAddress(r registry.Registry, address net.Addr) { // NewEtcdRegistryWithRetry creates an etcd based registry with given custom retry configs func NewEtcdRegistryWithRetry(endpoints []string, retryConfig *retry.Config, opts ...Option) (registry.Registry, error) { - cfg := clientv3.Config{ - Endpoints: endpoints, + cfg := &Config{ + EtcdConfig: &clientv3.Config{ + Endpoints: endpoints, + }, + Prefix: "kitex/registry-etcd", } for _, opt := range opts { - opt(&cfg) + opt(cfg) } - etcdClient, err := clientv3.New(cfg) + etcdClient, err := clientv3.New(*cfg.EtcdConfig) if err != nil { return nil, err } @@ -181,14 +189,14 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e } ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - _, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID)) + _, err = e.etcdClient.Put(ctx, serviceKey(e.prefix, info.ServiceName, addr), string(val), clientv3.WithLease(leaseID)) if err != nil { return err } go func(key, val string) { e.keepRegister(key, val, e.retryConfig) - }(serviceKey(info.ServiceName, addr), string(val)) + }(serviceKey(e.prefix, info.ServiceName, addr), string(val)) return nil } @@ -262,7 +270,7 @@ func (e *etcdRegistry) deregister(info *registry.Info) error { if err != nil { return err } - _, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr)) + _, err = e.etcdClient.Delete(ctx, serviceKey(e.prefix, info.ServiceName, addr)) if err != nil { return err } diff --git a/etcd_resolver.go b/etcd_resolver.go index 91e53b4..2fd2a57 100644 --- a/etcd_resolver.go +++ b/etcd_resolver.go @@ -33,22 +33,27 @@ const ( // etcdResolver is a resolver using etcd. type etcdResolver struct { etcdClient *clientv3.Client + prefix string } // NewEtcdResolver creates a etcd based resolver. func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, error) { - cfg := clientv3.Config{ - Endpoints: endpoints, + cfg := &Config{ + EtcdConfig: &clientv3.Config{ + Endpoints: endpoints, + }, + Prefix: "kitex/registry-etcd", } for _, opt := range opts { - opt(&cfg) + opt(cfg) } - etcdClient, err := clientv3.New(cfg) + etcdClient, err := clientv3.New(*cfg.EtcdConfig) if err != nil { return nil, err } return &etcdResolver{ etcdClient: etcdClient, + prefix: cfg.Prefix, }, nil } @@ -75,7 +80,7 @@ func (e *etcdResolver) Target(ctx context.Context, target rpcinfo.EndpointInfo) // Resolve implements the Resolver interface. func (e *etcdResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) { - prefix := serviceKeyPrefix(desc) + prefix := serviceKeyPrefix(e.prefix, desc) resp, err := e.etcdClient.Get(ctx, prefix, clientv3.WithPrefix()) if err != nil { return discovery.Result{}, err @@ -113,3 +118,6 @@ func (e *etcdResolver) Diff(cacheKey string, prev, next discovery.Result) (disco func (e *etcdResolver) Name() string { return "etcd" } +func (e *etcdResolver) GetPrefix() string { + return e.prefix +} diff --git a/etcd_resolver_test.go b/etcd_resolver_test.go index b58468a..35ded9a 100644 --- a/etcd_resolver_test.go +++ b/etcd_resolver_test.go @@ -23,7 +23,7 @@ import ( "crypto/x509/pkix" "encoding/pem" "fmt" - "io/ioutil" + "io/ioutil" //nolint "math/big" "net" "net/url" @@ -511,3 +511,121 @@ func teardownEmbedEtcd(s *embed.Etcd) { s.Close() _ = os.RemoveAll(s.Config().Dir) } +func TestEtcdResolverWithEtcdPrefix(t *testing.T) { + s, endpoint := setupEmbedEtcd(t) + tpl := "etcd/v1" + rg, err := NewEtcdRegistry([]string{endpoint}, WithEtcdConfigAndPrefix(tpl)) + require.Nil(t, err) + rs, err := NewEtcdResolver([]string{endpoint}, WithEtcdConfigAndPrefix(tpl)) + require.Nil(t, err) + + infoList := []registry.Info{ + { + ServiceName: "registry-etcd-test-suffix", + Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"), + Weight: 66, + Tags: map[string]string{"hello": "world"}, + }, + { + ServiceName: "registry-etcd-test", + Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"), + Weight: 66, + Tags: map[string]string{"hello": "world"}, + }, + } + + // test register service + { + for _, info := range infoList { + err = rg.Register(&info) + require.Nil(t, err) + + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil)) + result, err := rs.Resolve(context.TODO(), desc) + require.Nil(t, err) + expected := discovery.Result{ + Cacheable: true, + CacheKey: info.ServiceName, + Instances: []discovery.Instance{ + discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags), + }, + } + require.Equal(t, expected, result) + prefix := serviceKeyPrefix(rs.(*etcdResolver).GetPrefix(), info.ServiceName) + println(prefix) + require.Equal(t, fmt.Sprintf(tpl+"/%v/", info.ServiceName), prefix) + } + } + + // test deregister service + { + for _, info := range infoList { + err = rg.Deregister(&info) + require.Nil(t, err) + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil)) + _, err = rs.Resolve(context.TODO(), desc) + require.NotNil(t, err) + } + } + + teardownEmbedEtcd(s) +} + +func TestEtcdResolverWithEtcdPrefix2(t *testing.T) { + s, endpoint := setupEmbedEtcd(t) + rg, err := NewEtcdRegistry([]string{endpoint}) + require.Nil(t, err) + rs, err := NewEtcdResolver([]string{endpoint}) + require.Nil(t, err) + + infoList := []registry.Info{ + { + ServiceName: "registry-etcd-test-suffix", + Addr: utils.NewNetAddr("tcp", "127.0.0.1:8888"), + Weight: 66, + Tags: map[string]string{"hello": "world"}, + }, + { + ServiceName: "registry-etcd-test", + Addr: utils.NewNetAddr("tcp", "127.0.0.1:8889"), + Weight: 66, + Tags: map[string]string{"hello": "world"}, + }, + } + + // test register service + { + for _, info := range infoList { + err = rg.Register(&info) + require.Nil(t, err) + + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil)) + result, err := rs.Resolve(context.TODO(), desc) + require.Nil(t, err) + expected := discovery.Result{ + Cacheable: true, + CacheKey: info.ServiceName, + Instances: []discovery.Instance{ + discovery.NewInstance(info.Addr.Network(), info.Addr.String(), info.Weight, info.Tags), + }, + } + require.Equal(t, expected, result) + prefix := serviceKeyPrefix(rs.(*etcdResolver).GetPrefix(), info.ServiceName) + println(prefix) + require.Equal(t, fmt.Sprintf("kitex/registry-etcd/%v/", info.ServiceName), prefix) + } + } + + // test deregister service + { + for _, info := range infoList { + err = rg.Deregister(&info) + require.Nil(t, err) + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil)) + _, err = rs.Resolve(context.TODO(), desc) + require.NotNil(t, err) + } + } + + teardownEmbedEtcd(s) +} diff --git a/option.go b/option.go index e004a34..dcce768 100644 --- a/option.go +++ b/option.go @@ -18,39 +18,43 @@ import ( "crypto/tls" "crypto/x509" "errors" - "io/ioutil" - "time" - "github.com/cloudwego/kitex/pkg/klog" clientv3 "go.etcd.io/etcd/client/v3" + "io/ioutil" //nolint + "time" ) // Option sets options such as username, tls etc. -type Option func(cfg *clientv3.Config) +type Option func(cfg *Config) + +type Config struct { + EtcdConfig *clientv3.Config + Prefix string +} // WithTLSOpt returns a option that authentication by tls/ssl. func WithTLSOpt(certFile, keyFile, caFile string) Option { - return func(cfg *clientv3.Config) { + return func(cfg *Config) { tlsCfg, err := newTLSConfig(certFile, keyFile, caFile, "") if err != nil { klog.Errorf("tls failed with err: %v , skipping tls.", err) } - cfg.TLS = tlsCfg + cfg.EtcdConfig.TLS = tlsCfg } } // WithAuthOpt returns a option that authentication by usernane and password. func WithAuthOpt(username, password string) Option { - return func(cfg *clientv3.Config) { - cfg.Username = username - cfg.Password = password + return func(cfg *Config) { + cfg.EtcdConfig.Username = username + cfg.EtcdConfig.Password = password } } // WithDialTimeoutOpt returns a option set dialTimeout func WithDialTimeoutOpt(dialTimeout time.Duration) Option { - return func(cfg *clientv3.Config) { - cfg.DialTimeout = dialTimeout + return func(cfg *Config) { + cfg.EtcdConfig.DialTimeout = dialTimeout } } @@ -74,3 +78,10 @@ func newTLSConfig(certFile, keyFile, caFile, serverName string) (*tls.Config, er } return cfg, nil } + +// WithEtcdConfigAndPrefix returns an option that sets the Prefix field in the Config struct +func WithEtcdConfigAndPrefix(prefix string) Option { + return func(c *Config) { + c.Prefix = prefix + } +}