Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: load yaml key case sensitive #9206

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package agentsynchronize

import (
"bytes"
"fmt"
"strings"
"time"

"github.com/golang/protobuf/proto"
"github.com/spf13/viper"
kyaml "github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/v2"
context "golang.org/x/net/context"
yaml "gopkg.in/yaml.v3"

api "github.com/deepflowio/deepflow/message/agent"
. "github.com/deepflowio/deepflow/server/controller/common"
Expand Down Expand Up @@ -59,42 +58,37 @@ func NewAgentEvent() *AgentEvent {
return &AgentEvent{}
}

func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, clusterID string, gAgentInfo *vtap.VTapInfo, orgID int) *viper.Viper {
func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, clusterID string, gAgentInfo *vtap.VTapInfo, orgID int) *koanf.Koanf {
userConfig := c.GetUserConfig()
viperConfig := viper.New()
viperConfig.SetConfigType("yaml")
if err := viperConfig.ReadConfig(bytes.NewBufferString(userConfig)); err != nil {
log.Errorf("viper read agent(%d) config yaml error: %v", c.GetVTapID(), err)
}

configTSDBIP := gAgentInfo.GetConfigTSDBIP()
if configTSDBIP != "" {
viperConfig.Set(CONFIG_KEY_INGESTER_IP, configTSDBIP)
userConfig.Set(CONFIG_KEY_INGESTER_IP, configTSDBIP)
}

natIPEnabled := viperConfig.GetBool("global.communication.request_via_nat_ip")
natIPEnabled := userConfig.Bool("global.communication.request_via_nat_ip")
if trisolaris.GetAllAgentConnectToNatIP() || natIPEnabled == true {
viperConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_IP, trisolaris.GetORGNodeInfo(orgID).GetControllerNatIP(c.GetControllerIP()))
viperConfig.Set(CONFIG_KEY_INGESTER_IP, trisolaris.GetORGNodeInfo(orgID).GetTSDBNatIP(c.GetTSDBIP()))
userConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_IP, trisolaris.GetORGNodeInfo(orgID).GetControllerNatIP(c.GetControllerIP()))
userConfig.Set(CONFIG_KEY_INGESTER_IP, trisolaris.GetORGNodeInfo(orgID).GetTSDBNatIP(c.GetTSDBIP()))
}

if isPodVTap(c.GetVTapType()) && gAgentInfo.IsTheSameCluster(clusterID) {
viperConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_IP, trisolaris.GetORGNodeInfo(orgID).GetControllerPodIP(c.GetControllerIP()))
viperConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_PORT, trisolaris.GetGrpcPort())
userConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_IP, trisolaris.GetORGNodeInfo(orgID).GetControllerPodIP(c.GetControllerIP()))
userConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_PORT, trisolaris.GetGrpcPort())

viperConfig.Set(CONFIG_KEY_INGESTER_IP, trisolaris.GetORGNodeInfo(orgID).GetTSDBPodIP(c.GetTSDBIP()))
viperConfig.Set(CONFIG_KEY_INGESTER_PORT, trisolaris.GetIngesterPort())
userConfig.Set(CONFIG_KEY_INGESTER_IP, trisolaris.GetORGNodeInfo(orgID).GetTSDBPodIP(c.GetTSDBIP()))
userConfig.Set(CONFIG_KEY_INGESTER_PORT, trisolaris.GetIngesterPort())
}
if viperConfig.GetString(CONFIG_KEY_PROXY_CONTROLLER_IP) == "" {
if userConfig.String(CONFIG_KEY_PROXY_CONTROLLER_IP) == "" {
log.Errorf("agent(%s) has no proxy_controller_ip, "+
"Please check whether the agent allocs controller IP or If nat-ip is enabled, whether the controller is configured with nat-ip", c.GetCtrlIP())
}

if c.GetVTapEnabled() == 0 {
viperConfig.Set(CONFIG_KEY_HYPERVISOR_RESOURCE_ENABLED, false)
userConfig.Set(CONFIG_KEY_HYPERVISOR_RESOURCE_ENABLED, false)
}

return viperConfig
return userConfig
}

func (e *AgentEvent) generateDynamicConfig(clusterID string, c *vtap.VTapCache) *api.DynamicConfig {
Expand Down Expand Up @@ -319,7 +313,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
}
}
userConfig := e.generateUserConfig(vtapCache, clusterID, gAgentInfo, orgID)
if userConfig.GetString(CONFIG_KEY_INGESTER_IP) == "" {
if userConfig.String(CONFIG_KEY_INGESTER_IP) == "" {
dynamicConfig.Enabled = proto.Bool(false)
log.Errorf("agent(%s) has no ingester_ip, "+
"Please check whether the agent allocs tsdb IP or If nat-ip is enabled, whether the tsdb is configured with nat-ip", vtapCache.GetCtrlIP())
Expand All @@ -329,7 +323,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
if vtapCache.GetVTapEnabled() == 0 {
return &api.SyncResponse{
Status: &STATUS_SUCCESS,
UserConfig: proto.String(e.formateViperConfigToString(userConfig)),
UserConfig: proto.String(e.marshalUserConfig(userConfig)),
DynamicConfig: dynamicConfig,
}, nil
}
Expand All @@ -343,7 +337,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
Status: &STATUS_SUCCESS,
LocalSegments: localSegments,
RemoteSegments: remoteSegments,
UserConfig: proto.String(e.formateViperConfigToString(userConfig)),
UserConfig: proto.String(e.marshalUserConfig(userConfig)),
DynamicConfig: dynamicConfig,
PlatformData: platformData,
Groups: groups,
Expand All @@ -366,21 +360,17 @@ func (e *AgentEvent) generateNoAgentCacheDynamicConfig() *api.DynamicConfig {
}
}

func (e *AgentEvent) generateNoAgentCacheUserViperConfig(groupID string, orgID int) *viper.Viper {
func (e *AgentEvent) generateNoAgentCacheUserConfig(groupID string, orgID int) *koanf.Koanf {
vtapConfig := trisolaris.GetORGVTapInfo(orgID).GetVTapConfigFromShortID(groupID)
v := viper.New()
v.SetConfigType("yaml")
if vtapConfig == nil {
return v
}
if err := v.ReadConfig(bytes.NewBufferString(vtapConfig.GetUserConfig())); err != nil {
log.Error(err)
log.Warningf("not found vtap group short id (%s) config", groupID, logger.NewORGPrefix(orgID))
return koanf.New(".")
}
return v
return vtapConfig.GetUserConfig()
}

func (e *AgentEvent) formateViperConfigToString(userConfig *viper.Viper) string {
b, err := yaml.Marshal(userConfig.AllSettings())
func (e *AgentEvent) marshalUserConfig(userConfig *koanf.Koanf) string {
b, err := userConfig.Marshal(kyaml.Parser())
if err != nil {
log.Error(err)
return ""
Expand All @@ -397,7 +387,7 @@ func (e *AgentEvent) noAgentResponse(in *api.SyncRequest, orgID int) *api.SyncRe
k8sForceWatch := in.GetKubernetesForceWatch()
k8sWatchPoilcy := in.GetKubernetesWatchPolicy()
dynamicConfigInfo := e.generateNoAgentCacheDynamicConfig()
viperConfig := e.generateNoAgentCacheUserViperConfig(in.GetAgentGroupIdRequest(), orgID)
userConfig := e.generateNoAgentCacheUserConfig(in.GetAgentGroupIdRequest(), orgID)
gAgentInfo := trisolaris.GetORGVTapInfo(orgID)
if clusterID != "" {
dynamicConfigInfo.AgentType = utils.Int2AgentTypePtr(VTAP_TYPE_POD_VM)
Expand All @@ -413,27 +403,27 @@ func (e *AgentEvent) noAgentResponse(in *api.SyncRequest, orgID int) *api.SyncRe

return &api.SyncResponse{
Status: &STATUS_SUCCESS,
UserConfig: proto.String(e.formateViperConfigToString(viperConfig)),
UserConfig: proto.String(e.marshalUserConfig(userConfig)),
DynamicConfig: dynamicConfigInfo,
}
}

agentTypeForUnknowAgent := gAgentInfo.GetTridentTypeForUnknowVTap()
if agentTypeForUnknowAgent != 0 {
dynamicConfigInfo.AgentType = utils.Int2AgentTypePtr(agentTypeForUnknowAgent)
viperConfig.Set(CONFIG_KEY_HYPERVISOR_RESOURCE_ENABLED, true)
userConfig.Set(CONFIG_KEY_HYPERVISOR_RESOURCE_ENABLED, true)

return &api.SyncResponse{
Status: &STATUS_SUCCESS,
DynamicConfig: dynamicConfigInfo,
UserConfig: proto.String(e.formateViperConfigToString(viperConfig)),
UserConfig: proto.String(e.marshalUserConfig(userConfig)),
}
}

return &api.SyncResponse{
Status: &STATUS_SUCCESS,
DynamicConfig: dynamicConfigInfo,
UserConfig: proto.String(e.formateViperConfigToString(viperConfig)),
UserConfig: proto.String(e.marshalUserConfig(userConfig)),
}
}

Expand Down Expand Up @@ -556,7 +546,7 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
}

userConfig := e.generateUserConfig(vtapCache, clusterID, gAgentInfo, orgID)
if userConfig.GetString(CONFIG_KEY_INGESTER_IP) == "" {
if userConfig.String(CONFIG_KEY_INGESTER_IP) == "" {
dynamicConfig.Enabled = proto.Bool(false)
log.Errorf("agent(%s) has no ingester_ip, "+
"Please check whether the agent allocs tsdb IP or If nat-ip is enabled, whether the tsdb is configured with nat-ip", vtapCache.GetCtrlIP())
Expand All @@ -566,7 +556,7 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
if vtapCache.GetVTapEnabled() == 0 {
return &api.SyncResponse{
Status: &STATUS_SUCCESS,
UserConfig: proto.String(e.formateViperConfigToString(userConfig)),
UserConfig: proto.String(e.marshalUserConfig(userConfig)),
DynamicConfig: dynamicConfig,
}, nil
}
Expand All @@ -580,7 +570,7 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
LocalSegments: localSegments,
RemoteSegments: remoteSegments,
DynamicConfig: dynamicConfig,
UserConfig: proto.String(e.formateViperConfigToString(userConfig)),
UserConfig: proto.String(e.marshalUserConfig(userConfig)),
PlatformData: platformData,
SkipInterface: skipInterface,
VersionPlatformData: proto.Uint64(versionPlatformData),
Expand Down
66 changes: 23 additions & 43 deletions server/controller/trisolaris/vtap/vtap_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package vtap

import (
"bytes"
"fmt"
"regexp"
"sort"
Expand All @@ -29,8 +28,10 @@ import (

mapset "github.com/deckarep/golang-set"
"github.com/golang/protobuf/proto"
kyaml "github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/knadh/koanf/v2"
"github.com/mohae/deepcopy"
"github.com/spf13/viper"
"gopkg.in/yaml.v2"

"github.com/deepflowio/deepflow/message/agent"
Expand All @@ -56,16 +57,11 @@ type VTapConfig struct {
ConvertedWasmPlugins []string
ConvertedSoPlugins []string
PluginNewUpdateTime uint32
UserConfig *viper.Viper
UserConfig *koanf.Koanf
}

func (f *VTapConfig) GetUserConfig() string {
b, err := yaml.Marshal(f.UserConfig.AllSettings())
if err != nil {
log.Error(err)
return ""
}
return string(b)
func (f *VTapConfig) GetUserConfig() *koanf.Koanf {
return f.UserConfig.Copy()
}

func (f *VTapConfig) convertData() {
Expand Down Expand Up @@ -137,32 +133,32 @@ func (f *VTapConfig) modifyUserConfig(c *VTapCache) {
log.Error("vtap configure is nil")
return
}
if !f.UserConfig.IsSet(CONFIG_KEY_PROXY_CONTROLLER_IP) {
if !f.UserConfig.Exists(CONFIG_KEY_PROXY_CONTROLLER_IP) {
f.UserConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_IP, c.GetControllerIP())
}
if !f.UserConfig.IsSet(CONFIG_KEY_INGESTER_IP) {
if !f.UserConfig.Exists(CONFIG_KEY_INGESTER_IP) {
f.UserConfig.Set(CONFIG_KEY_INGESTER_IP, c.GetTSDBIP())
}
domainFilters := f.UserConfig.GetIntSlice(CONFIG_KEY_DOMAIN_FILTER)
domainFilters := f.UserConfig.Strings(CONFIG_KEY_DOMAIN_FILTER)
if len(domainFilters) > 0 {
sort.Ints(domainFilters)
sort.Strings(domainFilters)
f.UserConfig.Set(CONFIG_KEY_DOMAIN_FILTER, domainFilters)
}
}

func (f *VTapConfig) getDomainFilters() []int {
func (f *VTapConfig) getDomainFilters() []string {
if f.UserConfig == nil {
return nil
}
return f.UserConfig.GetIntSlice(CONFIG_KEY_DOMAIN_FILTER)
return f.UserConfig.Strings(CONFIG_KEY_DOMAIN_FILTER)
}

func (f *VTapConfig) getPodClusterInternalIP() bool {
if f.UserConfig == nil {
return true
}

return f.UserConfig.GetBool("inputs.resources.pull_resource_from_controller.only_kubernetes_pod_ip_in_local_cluster")
return f.UserConfig.Bool("inputs.resources.pull_resource_from_controller.only_kubernetes_pod_ip_in_local_cluster")
}

func (f *VTapConfig) modifyConfig(v *VTapInfo) {
Expand All @@ -186,12 +182,11 @@ func (f *VTapConfig) modifyConfig(v *VTapInfo) {
func NewVTapConfig(config *agent_config.AgentGroupConfigModel, agentConfigYaml string) *VTapConfig {
vTapConfig := &VTapConfig{}
vTapConfig.AgentGroupConfigModel = *config
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewBufferString(agentConfigYaml)); err != nil {
k := koanf.New(".")
if err := k.Load(rawbytes.Provider([]byte(agentConfigYaml)), kyaml.Parser()); err != nil {
log.Error(err)
}
vTapConfig.UserConfig = v
vTapConfig.UserConfig = k
vTapConfig.convertData()
return vTapConfig
}
Expand Down Expand Up @@ -739,12 +734,11 @@ func (c *VTapCache) GetConfigSyncInterval() int {
return *config.SyncInterval
}

func (c *VTapCache) GetUserConfig() string {
func (c *VTapCache) GetUserConfig() *koanf.Koanf {
config := c.GetVTapConfig()
if config == nil {
return ""
return koanf.New(".")
}

return config.GetUserConfig()
}

Expand Down Expand Up @@ -1194,23 +1188,16 @@ func (c *VTapCache) initVTapConfig() {
realConfig := VTapConfig{}
vtapGroupLcuuid := c.GetVTapGroupLcuuid()

var agentConfigYaml string
if config, ok := v.vtapGroupLcuuidToConfiguration[vtapGroupLcuuid]; ok {
agentConfigYaml = config.GetUserConfig()
realConfig = deepcopy.Copy(*config).(VTapConfig)
realConfig.UserConfig = config.GetUserConfig()
} else {
if v.realDefaultConfig != nil {
realConfig = deepcopy.Copy(*v.realDefaultConfig).(VTapConfig)
realConfig.UserConfig = koanf.New(".")
}
}

// viper object include map, not support deepcopy
viperConfig := viper.New()
viperConfig.SetConfigType("yaml")
if err := viperConfig.ReadConfig(bytes.NewBufferString(agentConfigYaml)); err != nil {
log.Errorf(v.Logf("viper read agent group (%s) config yaml error: %v", vtapGroupLcuuid, err))
}
realConfig.UserConfig = viperConfig

c.modifyVTapConfigByLicense(&realConfig)
realConfig.modifyConfig(v)
Expand All @@ -1222,16 +1209,17 @@ func (c *VTapCache) updateVTapConfigFromDB() {
v := c.vTapInfo
newConfig := VTapConfig{}

var agentConfigYaml string
config, ok := v.vtapGroupLcuuidToConfiguration[c.GetVTapGroupLcuuid()]
if ok {
agentConfigYaml = config.GetUserConfig()
newConfig = deepcopy.Copy(*config).(VTapConfig)
newConfig.UserConfig = config.GetUserConfig()
} else {
if v.realDefaultConfig != nil {
newConfig = deepcopy.Copy(*v.realDefaultConfig).(VTapConfig)
newConfig.UserConfig = koanf.New(".")
}
}

oldConfig := c.GetVTapConfig()
if oldConfig != nil {
// 采集器配置发生变化 重新生成平台数据
Expand All @@ -1240,14 +1228,6 @@ func (c *VTapCache) updateVTapConfigFromDB() {
}
}

// viper object include map, not support deepcopy
viperConfig := viper.New()
viperConfig.SetConfigType("yaml")
if err := viperConfig.ReadConfig(bytes.NewBufferString(agentConfigYaml)); err != nil {
log.Errorf(v.Logf("viper read agent group (%s) config yaml error: %v", c.GetVTapGroupLcuuid(), err))
}
newConfig.UserConfig = viperConfig

c.modifyVTapConfigByLicense(&newConfig)
newConfig.modifyConfig(v)
newConfig.modifyUserConfig(c)
Expand Down
Loading
Loading