diff --git a/clients/config_client/config_client_test.go b/clients/config_client/config_client_test.go index c73c983d..dc86c068 100644 --- a/clients/config_client/config_client_test.go +++ b/clients/config_client/config_client_test.go @@ -34,7 +34,7 @@ import ( "github.com/stretchr/testify/assert" ) -var serverConfigWithOptions = constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848) +var serverConfigWithOptions = constant.NewServerConfig("127.0.0.1", 8848) var clientConfigWithOptions = constant.NewClientConfig( constant.WithTimeoutMs(10*1000), @@ -81,6 +81,7 @@ func createConfigClientTestTls() *ConfigClient { _ = nc.SetClientConfig(*clientTLsConfigWithOptions) _ = nc.SetHttpAgent(&http_agent.HttpAgent{}) client, _ := NewConfigClient(&nc) + client.configProxy = &MockConfigProxy{} return client } @@ -90,6 +91,7 @@ func createConfigClientCommon() *ConfigClient { _ = nc.SetClientConfig(*clientConfigWithOptions) _ = nc.SetHttpAgent(&http_agent.HttpAgent{}) client, _ := NewConfigClient(&nc) + client.configProxy = &MockConfigProxy{} return client } diff --git a/clients/config_client/config_proxy.go b/clients/config_client/config_proxy.go index b3038840..f21a1394 100644 --- a/clients/config_client/config_proxy.go +++ b/clients/config_client/config_proxy.go @@ -166,7 +166,7 @@ func (cp *ConfigProxy) createRpcClient(ctx context.Context, taskId string, clien "taskId": taskId, } - iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg) + iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg, cp.clientConfig.AppConnLabels) rpcClient := iRpcClient.GetRpcClient() if rpcClient.IsInitialized() { rpcClient.RegisterServerRequestHandler(func() rpc_request.IRequest { diff --git a/clients/naming_client/naming_grpc/naming_grpc_proxy.go b/clients/naming_client/naming_grpc/naming_grpc_proxy.go index 2bf017a7..39fd85f5 100644 --- a/clients/naming_client/naming_grpc/naming_grpc_proxy.go +++ b/clients/naming_client/naming_grpc/naming_grpc_proxy.go @@ -61,7 +61,7 @@ func NewNamingGrpcProxy(ctx context.Context, clientCfg constant.ClientConfig, na constant.LABEL_MODULE: constant.LABEL_MODULE_NAMING, } - iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg) + iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg, clientCfg.AppConnLabels) if err != nil { return nil, err } diff --git a/common/constant/client_config_options.go b/common/constant/client_config_options.go index 50e8cd60..7befcff3 100644 --- a/common/constant/client_config_options.go +++ b/common/constant/client_config_options.go @@ -226,3 +226,9 @@ func WithTLS(tlsCfg TLSConfig) ClientOption { config.TLSCfg = tlsCfg } } + +func WithAppConnLabels(appConnLabels map[string]string) ClientOption { + return func(config *ClientConfig) { + config.AppConnLabels = appConnLabels + } +} diff --git a/common/constant/config.go b/common/constant/config.go index 202234b7..0d5998ed 100644 --- a/common/constant/config.go +++ b/common/constant/config.go @@ -58,6 +58,7 @@ type ClientConfig struct { EndpointContextPath string // the address server endpoint contextPath EndpointQueryParams string // the address server endpoint query params ClusterName string // the address server clusterName + AppConnLabels map[string]string // app conn labels } type ClientLogSamplingConfig struct { diff --git a/common/remote/rpc/rpc_client.go b/common/remote/rpc/rpc_client.go index a1502fc4..65b17c12 100644 --- a/common/remote/rpc/rpc_client.go +++ b/common/remote/rpc/rpc_client.go @@ -18,8 +18,11 @@ package rpc import ( "context" + "fmt" "math" + "os" "reflect" + "strings" "sync" "sync/atomic" "time" @@ -147,10 +150,11 @@ func getClient(clientName string) IRpcClient { return clientMap[clientName] } -func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig) (IRpcClient, error) { +func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig, appConnLabels map[string]string) (IRpcClient, error) { cMux.Lock() defer cMux.Unlock() if _, ok := clientMap[clientName]; !ok { + logger.Infof("init rpc client for name ", clientName) var rpcClient IRpcClient if GRPC == connectionType { rpcClient = NewGrpcClient(ctx, clientName, nacosServer, tlsConfig) @@ -158,6 +162,19 @@ func CreateClient(ctx context.Context, clientName string, connectionType Connect if rpcClient == nil { return nil, errors.New("unsupported connection type") } + + logger.Infof("get app conn labels from client config %v ", appConnLabels) + appConnLabelsEnv := getAppLabelsFromEnv() + logger.Infof("get app conn labels from env %v ", appConnLabelsEnv) + + appConnLabelsFinal := mergerAppLabels(appConnLabels, appConnLabelsEnv) + logger.Infof("final app conn labels : %v ", appConnLabelsFinal) + + appConnLabelsFinal = addPrefixForEachKey(appConnLabelsFinal, "app_") + if len(appConnLabelsFinal) != 0 { + rpcClient.putAllLabels(appConnLabelsFinal) + } + rpcClient.putAllLabels(labels) clientMap[clientName] = rpcClient return rpcClient, nil @@ -165,6 +182,92 @@ func CreateClient(ctx context.Context, clientName string, connectionType Connect return clientMap[clientName], nil } +func mergerAppLabels(appLabelsAppointed map[string]string, appLabelsEnv map[string]string) map[string]string { + preferred := strings.ToLower(os.Getenv("nacos_app_conn_labels_preferred")) + + var preferFirst bool + if preferred != "env" { + preferFirst = true + } else { + preferFirst = false + } + return mergeMaps(appLabelsAppointed, appLabelsEnv, preferFirst) +} + +func mergeMaps(map1, map2 map[string]string, preferFirst bool) map[string]string { + result := make(map[string]string, 8) + + for k, v := range map1 { + result[k] = v + } + + for k, v := range map2 { + _, ok := map1[k] + if preferFirst && ok { + continue + } + result[k] = v + } + + return result +} + +func getAppLabelsFromEnv() map[string]string { + configMap := make(map[string]string, 8) + + // nacos_config_gray_label + grayLabel := os.Getenv("nacos_config_gray_label") + if grayLabel != "" { + configMap["nacos_config_gray_label"] = grayLabel + } + + // nacos_app_conn_labels + connLabels := os.Getenv("nacos_app_conn_labels") + if connLabels != "" { + labelsMap := parseLabels(connLabels) + for k, v := range labelsMap { + configMap[k] = v + } + } + + return configMap +} + +func parseLabels(rawLabels string) map[string]string { + if strings.TrimSpace(rawLabels) == "" { + return make(map[string]string, 2) + } + + resultMap := make(map[string]string, 2) + labels := strings.Split(rawLabels, ",") + for _, label := range labels { + if strings.TrimSpace(label) != "" { + kv := strings.Split(label, "=") + if len(kv) == 2 { + resultMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) + } else { + fmt.Println("unknown label format:", label) + } + } + } + return resultMap +} + +func addPrefixForEachKey(m map[string]string, prefix string) map[string]string { + if len(m) == 0 { + return m + } + + newMap := make(map[string]string, len(m)) + for k, v := range m { + if strings.TrimSpace(k) != "" { + newKey := prefix + k + newMap[newKey] = v + } + } + return newMap +} + func (r *RpcClient) Start() { if ok := atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(INITIALIZED), (int32)(STARTING)); !ok { return