Skip to content

Commit

Permalink
Merge branch 'main' into feature-prevent-agent-print-err-log
Browse files Browse the repository at this point in the history
  • Loading branch information
sharang authored Aug 17, 2023
2 parents 2be4139 + 12c4067 commit d9157e5
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 70 deletions.
10 changes: 8 additions & 2 deletions message/trident.proto
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,13 @@ message SkipInterface {
}

message DeepFlowServerInstanceInfo {
optional string pod_name = 1;
optional string node_name = 2;
optional string pod_name = 1;
optional string node_name = 2;
}

message AnalyzerConfig {
optional uint32 analyzer_id = 1; // for Ingester assign a globally unique flow log ID
optional uint32 region_id = 2; // for Ingester get self region, and drop metrics not from the region.
}

message SyncResponse {
Expand All @@ -508,6 +513,7 @@ message SyncResponse {
repeated VtapIp vtap_ips = 18; // vtap_id到vpc + ip的映射关系, 仅下发给数据节点
repeated SkipInterface skip_interface = 19;
repeated DeepFlowServerInstanceInfo deepflow_server_instances = 20; // Only return the normal deepflow-servers of current Region for Ingester
optional AnalyzerConfig analyzer_config = 21; // Only for Analyzer
}

message UpgradeRequest {
Expand Down
10 changes: 10 additions & 0 deletions server/ingester/flow_log/exporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func putUniversalTags(attrs pcommon.Map, tags0, tags1 *UniversalTags, dataTypeBi
putStrWithoutEmpty(attrs, "df.universal_tag.az_0", tags0.AZ)
putStrWithoutEmpty(attrs, "df.universal_tag.host_0", tags0.Host)
putStrWithoutEmpty(attrs, "df.universal_tag.vpc_0", tags0.L3Epc)
putStrWithoutEmpty(attrs, "df.universal_tag.subnet_0", tags0.Subnet)
putStrWithoutEmpty(attrs, "df.universal_tag.pod_cluster_0", tags0.PodCluster)
putStrWithoutEmpty(attrs, "df.universal_tag.pod_ns_0", tags0.PodNS)
putStrWithoutEmpty(attrs, "df.universal_tag.pod_node_0", tags0.PodNode)
Expand All @@ -68,12 +69,17 @@ func putUniversalTags(attrs pcommon.Map, tags0, tags1 *UniversalTags, dataTypeBi
putStrWithoutEmpty(attrs, "df.universal_tag.rds_0", tags0.RDS)
putStrWithoutEmpty(attrs, "df.universal_tag.lb_0", tags0.LB)
putStrWithoutEmpty(attrs, "df.universal_tag.natgw_0", tags0.NatGW)
putStrWithoutEmpty(attrs, "df.universal_tag.auto_instance_type_0", tags0.AutoInstanceType)
putStrWithoutEmpty(attrs, "df.universal_tag.auto_instance_0", tags0.AutoInstance)
putStrWithoutEmpty(attrs, "df.universal_tag.auto_service_type_0", tags0.AutoServiceType)
putStrWithoutEmpty(attrs, "df.universal_tag.auto_service_0", tags0.AutoService)
}
if dataTypeBits&SERVER_UNIVERSAL_TAG != 0 {
putStrWithoutEmpty(attrs, "df.universal_tag.region_1", tags1.Region)
putStrWithoutEmpty(attrs, "df.universal_tag.az_1", tags1.AZ)
putStrWithoutEmpty(attrs, "df.universal_tag.host_1", tags1.Host)
putStrWithoutEmpty(attrs, "df.universal_tag.vpc_1", tags1.L3Epc)
putStrWithoutEmpty(attrs, "df.universal_tag.subnet_1", tags1.Subnet)
putStrWithoutEmpty(attrs, "df.universal_tag.pod_cluster_1", tags1.PodCluster)
putStrWithoutEmpty(attrs, "df.universal_tag.pod_ns_1", tags1.PodNS)
putStrWithoutEmpty(attrs, "df.universal_tag.pod_node_1", tags1.PodNode)
Expand All @@ -88,6 +94,10 @@ func putUniversalTags(attrs pcommon.Map, tags0, tags1 *UniversalTags, dataTypeBi
putStrWithoutEmpty(attrs, "df.universal_tag.rds_1", tags1.RDS)
putStrWithoutEmpty(attrs, "df.universal_tag.lb_1", tags1.LB)
putStrWithoutEmpty(attrs, "df.universal_tag.natgw_1", tags1.NatGW)
putStrWithoutEmpty(attrs, "df.universal_tag.auto_instance_type_1", tags1.AutoInstanceType)
putStrWithoutEmpty(attrs, "df.universal_tag.auto_instance_1", tags1.AutoInstance)
putStrWithoutEmpty(attrs, "df.universal_tag.auto_service_type_1", tags1.AutoServiceType)
putStrWithoutEmpty(attrs, "df.universal_tag.auto_service_1", tags1.AutoService)
}
}

Expand Down
176 changes: 110 additions & 66 deletions server/ingester/flow_log/exporter/universal_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/deepflowio/deepflow/server/ingester/ingesterctl"
"github.com/deepflowio/deepflow/server/libs/debug"
"github.com/deepflowio/deepflow/server/libs/grpc"
"github.com/deepflowio/deepflow/server/libs/utils"
)

type UniversalTags struct {
Expand Down Expand Up @@ -60,34 +61,41 @@ type UniversalTags struct {
NatGW string

TapPortName string

AutoInstanceType string
AutoInstance string
AutoServiceType string
AutoService string
}

type DeviceType uint8

const (
TYPE_INTERNET = 0
TYPE_VM = 1
TYPE_VROUTER = 5
TYPE_HOST = 6
TYPE_DHCP_PORT = 9
TYPE_POD = 10
TYPE_POD_SERVICE = 11
TYPE_REDIS_INSTANCE = 12
TYPE_RDS_INSTANCE = 13
TYPE_POD_NODE = 14
TYPE_LB = 15
TYPE_NAT_GATEWAY = 16
TYPE_POD_GROUP = 101
TYPE_SERVICE = 102
TYPE_GPROCESS = 120
TYPE_IP = 255
TYPE_INTERNET DeviceType = 0
TYPE_VM DeviceType = 1
TYPE_VROUTER DeviceType = 5
TYPE_HOST DeviceType = 6
TYPE_DHCP_GW DeviceType = 9
TYPE_POD DeviceType = 10
TYPE_POD_SERVICE DeviceType = 11
TYPE_REDIS_INSTANCE DeviceType = 12
TYPE_RDS_INSTANCE DeviceType = 13
TYPE_POD_NODE DeviceType = 14
TYPE_LB DeviceType = 15
TYPE_NAT_GATEWAY DeviceType = 16
TYPE_POD_GROUP DeviceType = 101
TYPE_SERVICE DeviceType = 102
TYPE_GPROCESS DeviceType = 120
TYPE_IP DeviceType = 255
)

// from clickhouse flow_tag.node_type_map
var deviceTypeMap = map[uint32]string{
var deviceTypeStrings = []string{
TYPE_INTERNET: "internet_ip",
TYPE_VM: "chost",
TYPE_VROUTER: "router",
TYPE_HOST: "host",
TYPE_DHCP_PORT: "dhcpgw",
TYPE_DHCP_GW: "dhcpgw",
TYPE_POD: "pod",
TYPE_POD_SERVICE: "pod_service",
TYPE_REDIS_INSTANCE: "redis",
Expand All @@ -101,6 +109,10 @@ var deviceTypeMap = map[uint32]string{
TYPE_IP: "ip",
}

func (t DeviceType) String() string {
return deviceTypeStrings[t]
}

type Labels map[string]string

type UniversalTagMaps struct {
Expand All @@ -121,55 +133,87 @@ type UniversalTagMaps struct {

func (u *UniversalTagsManager) QueryUniversalTags(l7FlowLog *log_data.L7FlowLog) (*UniversalTags, *UniversalTags) {
tagMaps := u.universalTagMaps
return &UniversalTags{
Region: tagMaps.regionMap[l7FlowLog.RegionID0],
AZ: tagMaps.azMap[l7FlowLog.AZID0],
Host: tagMaps.deviceMap[uint64(TYPE_HOST)<<32|uint64(l7FlowLog.HostID0)],
L3DeviceType: deviceTypeMap[uint32(l7FlowLog.L3DeviceType0)],
L3Device: tagMaps.deviceMap[uint64(l7FlowLog.L3DeviceType0)<<32|uint64(l7FlowLog.L3DeviceID0)],
PodNode: tagMaps.podNodeMap[l7FlowLog.PodNodeID0],
PodNS: tagMaps.podNsMap[l7FlowLog.PodNSID0],
PodGroup: tagMaps.podGroupMap[l7FlowLog.PodGroupID0],
Pod: tagMaps.podMap[l7FlowLog.PodID0],
PodCluster: tagMaps.podClusterMap[l7FlowLog.PodClusterID0],
L3Epc: tagMaps.l3EpcMap[uint32(l7FlowLog.L3EpcID0)],
Subnet: tagMaps.subnetMap[l7FlowLog.SubnetID0],
Service: tagMaps.deviceMap[uint64(TYPE_SERVICE)<<32|uint64(l7FlowLog.ServiceID0)],
GProcess: tagMaps.gprocessMap[l7FlowLog.GPID0],
Vtap: tagMaps.vtapMap[l7FlowLog.VtapID],

CHost: tagMaps.deviceMap[uint64(TYPE_VM)<<32|uint64(l7FlowLog.L3DeviceID0)],
Router: tagMaps.deviceMap[uint64(TYPE_VROUTER)<<32|uint64(l7FlowLog.L3DeviceID0)],
DhcpGW: tagMaps.deviceMap[uint64(TYPE_DHCP_PORT)<<32|uint64(l7FlowLog.L3DeviceID0)],
PodService: tagMaps.deviceMap[uint64(TYPE_POD_SERVICE)<<32|uint64(l7FlowLog.L3DeviceID0)],
Redis: tagMaps.deviceMap[uint64(TYPE_REDIS_INSTANCE)<<32|uint64(l7FlowLog.L3DeviceID0)],
RDS: tagMaps.deviceMap[uint64(TYPE_RDS_INSTANCE)<<32|uint64(l7FlowLog.L3DeviceID0)],
LB: tagMaps.deviceMap[uint64(TYPE_LB)<<32|uint64(l7FlowLog.L3DeviceID0)],
}, &UniversalTags{
Region: tagMaps.regionMap[l7FlowLog.RegionID1],
AZ: tagMaps.azMap[l7FlowLog.AZID1],
Host: tagMaps.deviceMap[uint64(TYPE_HOST)<<32|uint64(l7FlowLog.HostID1)],
L3DeviceType: deviceTypeMap[uint32(l7FlowLog.L3DeviceType1)],
L3Device: tagMaps.deviceMap[uint64(l7FlowLog.L3DeviceType1)<<32|uint64(l7FlowLog.L3DeviceID1)],
PodNode: tagMaps.podNodeMap[l7FlowLog.PodNodeID1],
PodNS: tagMaps.podNsMap[l7FlowLog.PodNSID1],
PodGroup: tagMaps.podGroupMap[l7FlowLog.PodGroupID1],
Pod: tagMaps.podMap[l7FlowLog.PodID1],
PodCluster: tagMaps.podClusterMap[l7FlowLog.PodClusterID1],
L3Epc: tagMaps.l3EpcMap[uint32(l7FlowLog.L3EpcID1)],
Subnet: tagMaps.subnetMap[l7FlowLog.SubnetID1],
Service: tagMaps.deviceMap[uint64(TYPE_SERVICE)<<32|uint64(l7FlowLog.ServiceID1)],
GProcess: tagMaps.gprocessMap[l7FlowLog.GPID1],
Vtap: tagMaps.vtapMap[l7FlowLog.VtapID],

CHost: tagMaps.deviceMap[uint64(TYPE_VM)<<32|uint64(l7FlowLog.L3DeviceID1)],
Router: tagMaps.deviceMap[uint64(TYPE_VROUTER)<<32|uint64(l7FlowLog.L3DeviceID1)],
DhcpGW: tagMaps.deviceMap[uint64(TYPE_DHCP_PORT)<<32|uint64(l7FlowLog.L3DeviceID1)],
PodService: tagMaps.deviceMap[uint64(TYPE_POD_SERVICE)<<32|uint64(l7FlowLog.L3DeviceID1)],
Redis: tagMaps.deviceMap[uint64(TYPE_REDIS_INSTANCE)<<32|uint64(l7FlowLog.L3DeviceID1)],
RDS: tagMaps.deviceMap[uint64(TYPE_RDS_INSTANCE)<<32|uint64(l7FlowLog.L3DeviceID1)],
LB: tagMaps.deviceMap[uint64(TYPE_LB)<<32|uint64(l7FlowLog.L3DeviceID1)],
tags0, tags1 := &UniversalTags{
Region: tagMaps.regionMap[l7FlowLog.RegionID0],
AZ: tagMaps.azMap[l7FlowLog.AZID0],
Host: tagMaps.deviceMap[uint64(TYPE_HOST)<<32|uint64(l7FlowLog.HostID0)],
L3DeviceType: DeviceType(l7FlowLog.L3DeviceType0).String(),
L3Device: tagMaps.deviceMap[uint64(l7FlowLog.L3DeviceType0)<<32|uint64(l7FlowLog.L3DeviceID0)],
PodNode: tagMaps.podNodeMap[l7FlowLog.PodNodeID0],
PodNS: tagMaps.podNsMap[l7FlowLog.PodNSID0],
PodGroup: tagMaps.podGroupMap[l7FlowLog.PodGroupID0],
Pod: tagMaps.podMap[l7FlowLog.PodID0],
PodCluster: tagMaps.podClusterMap[l7FlowLog.PodClusterID0],
L3Epc: tagMaps.l3EpcMap[uint32(l7FlowLog.L3EpcID0)],
Subnet: tagMaps.subnetMap[l7FlowLog.SubnetID0],
Service: tagMaps.deviceMap[uint64(TYPE_SERVICE)<<32|uint64(l7FlowLog.ServiceID0)],
GProcess: tagMaps.gprocessMap[l7FlowLog.GPID0],
Vtap: tagMaps.vtapMap[l7FlowLog.VtapID],
}, &UniversalTags{
Region: tagMaps.regionMap[l7FlowLog.RegionID1],
AZ: tagMaps.azMap[l7FlowLog.AZID1],
Host: tagMaps.deviceMap[uint64(TYPE_HOST)<<32|uint64(l7FlowLog.HostID1)],
L3DeviceType: DeviceType(l7FlowLog.L3DeviceType1).String(),
L3Device: tagMaps.deviceMap[uint64(l7FlowLog.L3DeviceType1)<<32|uint64(l7FlowLog.L3DeviceID1)],
PodNode: tagMaps.podNodeMap[l7FlowLog.PodNodeID1],
PodNS: tagMaps.podNsMap[l7FlowLog.PodNSID1],
PodGroup: tagMaps.podGroupMap[l7FlowLog.PodGroupID1],
Pod: tagMaps.podMap[l7FlowLog.PodID1],
PodCluster: tagMaps.podClusterMap[l7FlowLog.PodClusterID1],
L3Epc: tagMaps.l3EpcMap[uint32(l7FlowLog.L3EpcID1)],
Subnet: tagMaps.subnetMap[l7FlowLog.SubnetID1],
Service: tagMaps.deviceMap[uint64(TYPE_SERVICE)<<32|uint64(l7FlowLog.ServiceID1)],
GProcess: tagMaps.gprocessMap[l7FlowLog.GPID1],
Vtap: tagMaps.vtapMap[l7FlowLog.VtapID],
}

l3Device0 := tagMaps.deviceMap[uint64(l7FlowLog.L3DeviceType0)<<32|uint64(l7FlowLog.L3DeviceID0)]
fillDevice(tags0, DeviceType(l7FlowLog.L3DeviceType0), l3Device0)

l3Device1 := tagMaps.deviceMap[uint64(l7FlowLog.L3DeviceType1)<<32|uint64(l7FlowLog.L3DeviceID1)]
fillDevice(tags1, DeviceType(l7FlowLog.L3DeviceType1), l3Device1)

tags0.AutoServiceType = DeviceType(l7FlowLog.AutoServiceType0).String()
tags0.AutoService = u.getAuto(DeviceType(l7FlowLog.AutoServiceType0), l7FlowLog.AutoServiceID0, l7FlowLog.IsIPv4, l7FlowLog.IP40, l7FlowLog.IP60)
tags0.AutoInstanceType = DeviceType(l7FlowLog.AutoInstanceType0).String()
tags0.AutoInstance = u.getAuto(DeviceType(l7FlowLog.AutoInstanceType0), l7FlowLog.AutoInstanceID0, l7FlowLog.IsIPv4, l7FlowLog.IP40, l7FlowLog.IP60)

tags1.AutoServiceType = DeviceType(l7FlowLog.AutoServiceType1).String()
tags1.AutoService = u.getAuto(DeviceType(l7FlowLog.AutoServiceType1), l7FlowLog.AutoServiceID1, l7FlowLog.IsIPv4, l7FlowLog.IP41, l7FlowLog.IP61)
tags1.AutoInstanceType = DeviceType(l7FlowLog.AutoInstanceType1).String()
tags1.AutoInstance = u.getAuto(DeviceType(l7FlowLog.AutoInstanceType1), l7FlowLog.AutoInstanceID1, l7FlowLog.IsIPv4, l7FlowLog.IP41, l7FlowLog.IP61)

return tags0, tags1
}

func fillDevice(tags *UniversalTags, deviceType DeviceType, device string) {
switch deviceType {
case TYPE_VM:
tags.CHost = device
case TYPE_VROUTER:
tags.Router = device
case TYPE_DHCP_GW:
tags.DhcpGW = device
case TYPE_POD_SERVICE:
tags.PodService = device
case TYPE_REDIS_INSTANCE:
tags.Redis = device
case TYPE_RDS_INSTANCE:
tags.RDS = device
case TYPE_LB:
tags.LB = device
}
}

func (u *UniversalTagsManager) getAuto(autoType DeviceType, autoID uint32, isIPv4 bool, ip4 uint32, ip6 net.IP) string {
if autoType == TYPE_IP || autoType == TYPE_INTERNET {
if isIPv4 {
return utils.IpFromUint32(ip4).String()
} else {
return ip6.String()
}
}
return u.universalTagMaps.deviceMap[uint64(autoType)<<32|uint64(autoID)]
}

func (u *UniversalTagsManager) QueryCustomK8sLabels(podID uint32) Labels {
Expand Down
4 changes: 2 additions & 2 deletions server/ingester/prometheus/decoder/grpc_label_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ func (t *PrometheusLabelTable) RequestAllTargetIDs() {
return err
})
if err != nil {
log.Warning("request all prometheus target ids failed: %s", err)
log.Warningf("request all prometheus target ids failed: %s", err)
return
}
newVersion := response.GetVersion()
if t.targetVersion != newVersion {
log.Warning("prometheus target version update from %d to %d", t.targetVersion, newVersion)
log.Infof("prometheus target version update from %d to %d", t.targetVersion, newVersion)
t.targetVersion = newVersion
t.updatePrometheusTargets(response.GetResponseTargetIds())
}
Expand Down

0 comments on commit d9157e5

Please sign in to comment.