Skip to content

Commit

Permalink
Fix bug with naocs
Browse files Browse the repository at this point in the history
  • Loading branch information
zhufuyi committed Dec 9, 2023
1 parent 2393f09 commit e2c4e45
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 74 deletions.
2 changes: 1 addition & 1 deletion internal/config/serverNameExample.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func Show(hiddenFields ...string) string {

func Get() *Config {
if config == nil {
panic("config is nil")
panic("config is nil, please call config.Init() first")
}
return config
}
Expand Down
6 changes: 3 additions & 3 deletions internal/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *grpcServer) secureServerOption() grpc.ServerOption {
if err != nil {
panic(err)
}
logger.Info("rpc security type: sever-side certification")
logger.Info("grpc security type: sever-side certification")
return grpc.Creds(credentials)

case "two-way": // both client and server side certification
Expand All @@ -136,11 +136,11 @@ func (s *grpcServer) secureServerOption() grpc.ServerOption {
if err != nil {
panic(err)
}
logger.Info("rpc security type: both client-side and server-side certification")
logger.Info("grpc security type: both client-side and server-side certification")
return grpc.Creds(credentials)
}

logger.Info("rpc security type: insecure")
logger.Info("grpc security type: insecure")
return nil
}

Expand Down
110 changes: 52 additions & 58 deletions internal/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -30,7 +31,7 @@ func TestRegisterAllService(t *testing.T) {
})
}

// The default is to connect to the local grpc service, if you want to connect to a remote grpc service,
// The default is to connect to the local grpc server, if you want to connect to a remote grpc server,
// pass in the parameter grpcClient.
func getRPCClientConnForTest(grpcClient ...config.GrpcClient) *grpc.ClientConn {
err := config.Init(configs.Path("serverNameExample.yml"))
Expand All @@ -40,21 +41,22 @@ func getRPCClientConnForTest(grpcClient ...config.GrpcClient) *grpc.ClientConn {
var grpcClientCfg config.GrpcClient

if len(grpcClient) == 0 {
// default config from configuration file serverNameExample.yml
grpcClientCfg = config.GrpcClient{
Host: "127.0.0.1",
Host: config.Get().App.Host,
Port: config.Get().Grpc.Port,

Name: "",
EnableLoadBalance: false,
RegistryDiscoveryType: "",
ClientSecure: config.ClientSecure{},
ClientToken: config.ClientToken{},
// If RegistryDiscoveryType is not empty, service discovery is used, and Host and Port values are invalid
RegistryDiscoveryType: config.Get().App.RegistryDiscoveryType, // supports consul, etcd and nacos
Name: config.Get().App.Name,
}
if grpcClientCfg.RegistryDiscoveryType != "" {
grpcClientCfg.EnableLoadBalance = true
}
} else {
// custom config
grpcClientCfg = grpcClient[0]
}

endpoint := grpcClientCfg.Host + ":" + utils.IntToStr(grpcClientCfg.Port)
var cliOptions []grpccli.Option

// load balance
Expand Down Expand Up @@ -83,66 +85,58 @@ func getRPCClientConnForTest(grpcClient ...config.GrpcClient) *grpc.ClientConn {
grpccli.WithEnableLog(logger.Get()),
)

isUseDiscover := false
if config.Get().App.RegistryDiscoveryType != "" {
var iDiscovery registry.Discovery
endpoint = "discovery:///" + config.Get().App.Name // Connecting to grpc services by service name

// Use consul service discovery, note that the host field in the configuration file serverNameExample.yml
// needs to be filled with the local ip, not 127.0.0.1, to do the health check
if config.Get().App.RegistryDiscoveryType == "consul" {
cli, err := consulcli.Init(config.Get().Consul.Addr, consulcli.WithWaitTime(time.Second*2))
if err != nil {
panic(err)
}
iDiscovery = consul.New(cli)
isUseDiscover = true
}
var (
endpoint string
isUseDiscover bool
iDiscovery registry.Discovery
)

// Use etcd service discovery, use the command etcdctl get / --prefix to see if the service is registered before testing,
// note: the IDE using a proxy may cause the connection to the etcd service to fail
if config.Get().App.RegistryDiscoveryType == "etcd" {
cli, err := etcdcli.Init(config.Get().Etcd.Addrs, etcdcli.WithDialTimeout(time.Second*2))
if err != nil {
panic(err)
}
iDiscovery = etcd.New(cli)
isUseDiscover = true
switch grpcClientCfg.RegistryDiscoveryType {
case "consul":
endpoint = "discovery:///" + grpcClientCfg.Name // Connecting to grpc services by service name
cli, err := consulcli.Init(config.Get().Consul.Addr, consulcli.WithWaitTime(time.Second*2))
if err != nil {
panic(err)
}

// Use nacos service discovery
if config.Get().App.RegistryDiscoveryType == "nacos" {
// example: endpoint = "discovery:///serverName.scheme"
endpoint = "discovery:///" + config.Get().App.Name + ".grpc"
cli, err := nacoscli.NewNamingClient(
config.Get().NacosRd.IPAddr,
config.Get().NacosRd.Port,
config.Get().NacosRd.NamespaceID)
if err != nil {
panic(err)
}
iDiscovery = nacos.New(cli)
isUseDiscover = true
iDiscovery = consul.New(cli)
isUseDiscover = true

case "etcd":
endpoint = "discovery:///" + grpcClientCfg.Name // Connecting to grpc services by service name
cli, err := etcdcli.Init(config.Get().Etcd.Addrs, etcdcli.WithDialTimeout(time.Second*2))
if err != nil {
panic(err)
}
iDiscovery = etcd.New(cli)
isUseDiscover = true
case "nacos":
// example: endpoint = "discovery:///serverName.scheme"
endpoint = "discovery:///" + grpcClientCfg.Name + ".grpc"
cli, err := nacoscli.NewNamingClient(
config.Get().NacosRd.IPAddr,
config.Get().NacosRd.Port,
config.Get().NacosRd.NamespaceID)
if err != nil {
panic(err)
}
iDiscovery = nacos.New(cli)
isUseDiscover = true

cliOptions = append(cliOptions, grpccli.WithDiscovery(iDiscovery))
default:
endpoint = grpcClientCfg.Host + ":" + strconv.Itoa(grpcClientCfg.Port)
iDiscovery = nil
isUseDiscover = false
}

if config.Get().App.EnableTrace {
cliOptions = append(cliOptions, grpccli.WithEnableTrace())
}
if config.Get().App.EnableCircuitBreaker {
cliOptions = append(cliOptions, grpccli.WithEnableCircuitBreaker())
}
if config.Get().App.EnableMetrics {
cliOptions = append(cliOptions, grpccli.WithEnableMetrics())
if iDiscovery != nil {
cliOptions = append(cliOptions, grpccli.WithDiscovery(iDiscovery))
}

msg := "dialing grpc server"
if isUseDiscover {
msg += " with discovery from " + config.Get().App.RegistryDiscoveryType
msg += " with discovery from " + grpcClientCfg.RegistryDiscoveryType
}
logger.Info(msg, logger.String("name", config.Get().App.Name), logger.String("endpoint", endpoint))
logger.Info(msg, logger.String("name", grpcClientCfg.Name), logger.String("endpoint", endpoint))

conn, err := grpccli.Dial(context.Background(), endpoint, cliOptions...)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/consulcli/consulcli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func TestInit(t *testing.T) {
WithScheme("http"),
WithWaitTime(time.Second*2),
WithDatacenter(""),
WithToken("your-token"),
)
t.Log(err, cli)

Expand Down
8 changes: 8 additions & 0 deletions pkg/consulcli/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type options struct {
scheme string
waitTime time.Duration
datacenter string
token string

// if you set this parameter, all fields above are invalid
config *api.Config
Expand Down Expand Up @@ -52,6 +53,13 @@ func WithDatacenter(datacenter string) Option {
}
}

// WithToken set token
func WithToken(token string) Option {
return func(o *options) {
o.token = token
}
}

// WithConfig set consul config
func WithConfig(c *api.Config) Option {
return func(o *options) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/grpc/grpccli/dail.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

// Dial to rpc server
// Dial to grpc server
func Dial(ctx context.Context, endpoint string, opts ...Option) (*grpc.ClientConn, error) {
o := defaultOptions()
o.apply(opts...)
Expand Down
6 changes: 4 additions & 2 deletions pkg/nacoscli/nacos.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type Params struct {
IPAddr string // server address
Port uint64 // port
Scheme string // http or https
Scheme string // http or grpc
ContextPath string // path
// if you set this parameter, the above fields(IPAddr, Port, Scheme, ContextPath) are invalid
serverConfigs []constant.ServerConfig
Expand Down Expand Up @@ -70,6 +70,8 @@ func setParams(params *Params, opts ...Option) {
NotLoadCacheAtStart: true,
LogDir: os.TempDir() + "/nacos/log",
CacheDir: os.TempDir() + "/nacos/cache",
Username: o.username,
Password: o.password,
}
}

Expand All @@ -86,7 +88,7 @@ func setParams(params *Params, opts ...Option) {
}
}

// Init get configuration from nacos and parse to struct
// Init get configuration from nacos and parse to struct, use for configuration centre
func Init(obj interface{}, params *Params, opts ...Option) error {
err := params.valid()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/nacoscli/nacos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestParse(t *testing.T) {
err := Init(conf, params,
WithClientConfig(clientConfig),
WithServerConfigs(serverConfigs),
WithAuth("foo", "bar"),
)
t.Log(err, conf)
})
Expand Down
12 changes: 12 additions & 0 deletions pkg/nacoscli/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
)

type options struct {
username string
password string

// if set the clientConfig, the above fields(username, password) are invalid
clientConfig *constant.ClientConfig
serverConfigs []constant.ServerConfig
}
Expand All @@ -25,6 +29,14 @@ func (o *options) apply(opts ...Option) {
}
}

// WithAuth set authentication
func WithAuth(username string, password string) Option {
return func(o *options) {
o.username = username
o.password = password
}
}

// WithClientConfig set nacos client config
func WithClientConfig(clientConfig *constant.ClientConfig) Option {
return func(o *options) {
Expand Down
17 changes: 13 additions & 4 deletions pkg/servicerd/registry/nacos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (r *Registry) Register(_ context.Context, si *registry.ServiceInstance) err
var rmd map[string]string
if si.Metadata == nil {
rmd = map[string]string{
"id": si.ID,
"kind": u.Scheme,
"version": si.Version,
}
Expand All @@ -124,6 +125,7 @@ func (r *Registry) Register(_ context.Context, si *registry.ServiceInstance) err
for k, v := range si.Metadata {
rmd[k] = v
}
rmd["id"] = si.ID
rmd["kind"] = u.Scheme
rmd["version"] = si.Version
}
Expand All @@ -140,7 +142,7 @@ func (r *Registry) Register(_ context.Context, si *registry.ServiceInstance) err
GroupName: r.opts.group,
})
if e != nil {
return fmt.Errorf("RegisterInstance err %v,%v", e, endpoint)
return fmt.Errorf("RegisterInstance err %v, id = %s", e, si.ID)
}
}
return nil
Expand Down Expand Up @@ -193,11 +195,18 @@ func (r *Registry) GetService(_ context.Context, serviceName string) ([]*registr
items := make([]*registry.ServiceInstance, 0, len(res))
for _, in := range res {
kind := r.opts.kind
if k, ok := in.Metadata["kind"]; ok {
kind = k
id := in.InstanceId
if in.Metadata != nil {
if k, ok := in.Metadata["kind"]; ok {
kind = k
}
if v, ok := in.Metadata["id"]; ok {
id = v
delete(in.Metadata, "id")
}
}
items = append(items, &registry.ServiceInstance{
ID: in.InstanceId,
ID: id,
Name: in.ServiceName,
Version: in.Metadata["version"],
Metadata: in.Metadata,
Expand Down
17 changes: 12 additions & 5 deletions pkg/servicerd/registry/nacos/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func newWatcher(ctx context.Context, cli naming_client.INamingClient, serviceNam

e := w.cli.Subscribe(&vo.SubscribeParam{
ServiceName: serviceName,
Clusters: clusters,
GroupName: groupName,
//Clusters: clusters, // if set the clusters, subscription messages cannot be received
SubscribeCallback: func(services []model.Instance, err error) {
w.watchChan <- struct{}{}
},
Expand All @@ -55,19 +55,26 @@ func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
res, err := w.cli.GetService(vo.GetServiceParam{
ServiceName: w.serviceName,
GroupName: w.groupName,
Clusters: w.clusters,
//Clusters: w.clusters, // if cluster is set, the latest service is not obtained.
})
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0, len(res.Hosts))
for _, in := range res.Hosts {
kind := w.kind
if k, ok := in.Metadata["kind"]; ok {
kind = k
id := in.InstanceId
if in.Metadata != nil {
if k, ok := in.Metadata["kind"]; ok {
kind = k
}
if v, ok := in.Metadata["id"]; ok {
id = v
delete(in.Metadata, "id")
}
}
items = append(items, &registry.ServiceInstance{
ID: in.InstanceId,
ID: id,
Name: res.Name,
Version: in.Metadata["version"],
Metadata: in.Metadata,
Expand Down

0 comments on commit e2c4e45

Please sign in to comment.