Skip to content

Commit

Permalink
support app connection labels (#754)
Browse files Browse the repository at this point in the history
* 支持应用标签
  • Loading branch information
shiyiyue1102 authored May 27, 2024
1 parent 163ee14 commit 7d85b85
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 4 deletions.
4 changes: 3 additions & 1 deletion clients/config_client/config_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/stretchr/testify/assert"
)

var serverConfigWithOptions = constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848)
var serverConfigWithOptions = constant.NewServerConfig("127.0.0.1", 8848)

var clientConfigWithOptions = constant.NewClientConfig(
constant.WithTimeoutMs(10*1000),
Expand Down Expand Up @@ -81,6 +81,7 @@ func createConfigClientTestTls() *ConfigClient {
_ = nc.SetClientConfig(*clientTLsConfigWithOptions)
_ = nc.SetHttpAgent(&http_agent.HttpAgent{})
client, _ := NewConfigClient(&nc)
client.configProxy = &MockConfigProxy{}
return client
}

Expand All @@ -90,6 +91,7 @@ func createConfigClientCommon() *ConfigClient {
_ = nc.SetClientConfig(*clientConfigWithOptions)
_ = nc.SetHttpAgent(&http_agent.HttpAgent{})
client, _ := NewConfigClient(&nc)
client.configProxy = &MockConfigProxy{}
return client
}

Expand Down
2 changes: 1 addition & 1 deletion clients/config_client/config_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (cp *ConfigProxy) createRpcClient(ctx context.Context, taskId string, clien
"taskId": taskId,
}

iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg)
iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg, cp.clientConfig.AppConnLabels)
rpcClient := iRpcClient.GetRpcClient()
if rpcClient.IsInitialized() {
rpcClient.RegisterServerRequestHandler(func() rpc_request.IRequest {
Expand Down
2 changes: 1 addition & 1 deletion clients/naming_client/naming_grpc/naming_grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewNamingGrpcProxy(ctx context.Context, clientCfg constant.ClientConfig, na
constant.LABEL_MODULE: constant.LABEL_MODULE_NAMING,
}

iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg)
iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg, clientCfg.AppConnLabels)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions common/constant/client_config_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,9 @@ func WithTLS(tlsCfg TLSConfig) ClientOption {
config.TLSCfg = tlsCfg
}
}

func WithAppConnLabels(appConnLabels map[string]string) ClientOption {
return func(config *ClientConfig) {
config.AppConnLabels = appConnLabels
}
}
1 change: 1 addition & 0 deletions common/constant/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type ClientConfig struct {
EndpointContextPath string // the address server endpoint contextPath
EndpointQueryParams string // the address server endpoint query params
ClusterName string // the address server clusterName
AppConnLabels map[string]string // app conn labels
}

type ClientLogSamplingConfig struct {
Expand Down
105 changes: 104 additions & 1 deletion common/remote/rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package rpc

import (
"context"
"fmt"
"math"
"os"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -147,24 +150,124 @@ func getClient(clientName string) IRpcClient {
return clientMap[clientName]
}

func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig) (IRpcClient, error) {
func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig, appConnLabels map[string]string) (IRpcClient, error) {
cMux.Lock()
defer cMux.Unlock()
if _, ok := clientMap[clientName]; !ok {
logger.Infof("init rpc client for name ", clientName)
var rpcClient IRpcClient
if GRPC == connectionType {
rpcClient = NewGrpcClient(ctx, clientName, nacosServer, tlsConfig)
}
if rpcClient == nil {
return nil, errors.New("unsupported connection type")
}

logger.Infof("get app conn labels from client config %v ", appConnLabels)
appConnLabelsEnv := getAppLabelsFromEnv()
logger.Infof("get app conn labels from env %v ", appConnLabelsEnv)

appConnLabelsFinal := mergerAppLabels(appConnLabels, appConnLabelsEnv)
logger.Infof("final app conn labels : %v ", appConnLabelsFinal)

appConnLabelsFinal = addPrefixForEachKey(appConnLabelsFinal, "app_")
if len(appConnLabelsFinal) != 0 {
rpcClient.putAllLabels(appConnLabelsFinal)
}

rpcClient.putAllLabels(labels)
clientMap[clientName] = rpcClient
return rpcClient, nil
}
return clientMap[clientName], nil
}

func mergerAppLabels(appLabelsAppointed map[string]string, appLabelsEnv map[string]string) map[string]string {
preferred := strings.ToLower(os.Getenv("nacos_app_conn_labels_preferred"))

var preferFirst bool
if preferred != "env" {
preferFirst = true
} else {
preferFirst = false
}
return mergeMaps(appLabelsAppointed, appLabelsEnv, preferFirst)
}

func mergeMaps(map1, map2 map[string]string, preferFirst bool) map[string]string {
result := make(map[string]string, 8)

for k, v := range map1 {
result[k] = v
}

for k, v := range map2 {
_, ok := map1[k]
if preferFirst && ok {
continue
}
result[k] = v
}

return result
}

func getAppLabelsFromEnv() map[string]string {
configMap := make(map[string]string, 8)

// nacos_config_gray_label
grayLabel := os.Getenv("nacos_config_gray_label")
if grayLabel != "" {
configMap["nacos_config_gray_label"] = grayLabel
}

// nacos_app_conn_labels
connLabels := os.Getenv("nacos_app_conn_labels")
if connLabels != "" {
labelsMap := parseLabels(connLabels)
for k, v := range labelsMap {
configMap[k] = v
}
}

return configMap
}

func parseLabels(rawLabels string) map[string]string {
if strings.TrimSpace(rawLabels) == "" {
return make(map[string]string, 2)
}

resultMap := make(map[string]string, 2)
labels := strings.Split(rawLabels, ",")
for _, label := range labels {
if strings.TrimSpace(label) != "" {
kv := strings.Split(label, "=")
if len(kv) == 2 {
resultMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
} else {
fmt.Println("unknown label format:", label)
}
}
}
return resultMap
}

func addPrefixForEachKey(m map[string]string, prefix string) map[string]string {
if len(m) == 0 {
return m
}

newMap := make(map[string]string, len(m))
for k, v := range m {
if strings.TrimSpace(k) != "" {
newKey := prefix + k
newMap[newKey] = v
}
}
return newMap
}

func (r *RpcClient) Start() {
if ok := atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(INITIALIZED), (int32)(STARTING)); !ok {
return
Expand Down

0 comments on commit 7d85b85

Please sign in to comment.