Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exporters adds tag-filters-groups configuration #8552

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions server/ingester/exporters/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ type TagFilter struct {
RegexpComplied *regexp.Regexp
}

type TagFiltersGroup struct {
TagFilterCondition TagFilterCondition `yaml:"tag-filter-condition"`
TagFilters []TagFilter `yaml:"tag-filters"`
}

func (t *TagFiltersGroup) Validate() {
t.TagFilterCondition.Validate()
for i := range t.TagFilters {
t.TagFilters[i].Validate()
}
}

func (t *TagFilter) Validate() {
t.OperatorId = operatorStringToID(t.Operator)
if t.OperatorId == EQ || t.OperatorId == NEQ || t.OperatorId == IN || t.OperatorId == NOT_IN {
Expand Down Expand Up @@ -420,7 +432,8 @@ type StructTags struct {
TagDataSourceBits uint32 // gen from 'TagDatasourceStr'

// the field has tagFilter, if it is not nil, should caculate filter
TagFilters []TagFilter // gen from 'ExporterCfg.TagFilters'
TagFilters []TagFilter // gen from 'ExporterCfg.TagFilters'
TagFiltersGroups []TagFiltersGroup // gen from 'ExporterCfg.TagFiltersGroup'

IsExportedField bool // gen from 'ExporterCfg.ExportFields'
}
Expand All @@ -445,15 +458,20 @@ type ExporterCfg struct {
EnumTranslateToNameDisabled bool `yaml:"enum-translate-to-name-disabled"`
UniversalTagTranslateToNameDisabled bool `yaml:"universal-tag-translate-to-name-disabled"`

TagFilterCondition TagFilterCondition `yaml:"tag-filter-condition"`
TagFilters []TagFilter `yaml:"tag-filters"`
ExportFields []string `yaml:"export-fields"`
ExportFieldCategoryBits uint64 // gen by `ExportFields`
ExportFieldNames []string // gen by `ExportFields`
ExportFieldK8s []string // gen by `ExportFields`
// Deprecated, use 'tag-filters-groups'
TagFilterCondition TagFilterCondition `yaml:"tag-filter-condition"`
// Deprecated, use 'tag-filters-groups'
TagFilters []TagFilter `yaml:"tag-filters"`

ExportFieldStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `ExportFields` and init when exporting item first time
TagFieltertStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `TagFilters` and init when exporting item first time
TagFiltersGroups []TagFiltersGroup `yaml:"tag-filters-groups"`
ExportFields []string `yaml:"export-fields"`
ExportFieldCategoryBits uint64 // gen by `ExportFields`
ExportFieldNames []string // gen by `ExportFields`
ExportFieldK8s []string // gen by `ExportFields`

ExportFieldStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `ExportFields` and init when exporting item first time
TagFiltersStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `TagFilters` and init when exporting item first time
TagFiltersGroupsStructTags [MAX_DATASOURCE_ID][][]StructTags // gen by `TagFiltersGroups` and init when exporting item first time

// private configuration
ExtraHeaders map[string]string `yaml:"extra-headers"`
Expand Down Expand Up @@ -547,10 +565,17 @@ func (cfg *ExporterCfg) Validate() error {
for i := range cfg.TagFilters {
cfg.TagFilters[i].Validate()
}
if len(cfg.TagFilters) > 0 {
log.Warning("The 'tag-filters' setting will be deprecated, use 'tag-filters-groups' instead.")
}

cfg.TagFilterCondition.Validate()
cfg.Sasl.Validate()

for i := range cfg.TagFiltersGroups {
cfg.TagFiltersGroups[i].Validate()
}

return nil
}

Expand Down
86 changes: 74 additions & 12 deletions server/ingester/exporters/exporters.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,25 @@ func GetTagFilters(field string, tagFilters []config.TagFilter) []config.TagFilt
return tagFilter
}

func GetTagFiltersGroups(field string, tagFiltersGroups []config.TagFiltersGroup) []config.TagFiltersGroup {
hitFilterGroups := false
groups := make([]config.TagFiltersGroup, len(tagFiltersGroups))
for i, filterGroup := range tagFiltersGroups {
groups[i].TagFilterCondition = filterGroup.TagFilterCondition
for _, filter := range filterGroup.TagFilters {
if filter.FieldName == field {
groups[i].TagFilters = append(groups[i].TagFilters, filter)
hitFilterGroups = true
}
}
}

if hitFilterGroups {
return groups
}
return nil
}

func IsExportField(tag *config.StructTags, exportFieldCategoryBits uint64, exportFieldNames []string) bool {
if tag.Name == "" {
return false
Expand All @@ -184,7 +203,7 @@ func IsExportField(tag *config.StructTags, exportFieldCategoryBits uint64, expor
}

func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, exporterCfg *config.ExporterCfg) {
if exporterCfg.TagFieltertStructTags[dataSourceId] == nil {
if exporterCfg.TagFiltersStructTags[dataSourceId] == nil {
t := reflect.TypeOf(item)
if t.Kind() == reflect.Pointer {
t = t.Elem()
Expand Down Expand Up @@ -274,6 +293,7 @@ func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, expor
ToStringFunc: toStringFunc,
UniversalTagMapID: universal_tag.StringToUniversalTagID(name),
TagFilters: GetTagFilters(name, exporterCfg.TagFilters),
TagFiltersGroups: GetTagFiltersGroups(name, exporterCfg.TagFiltersGroups),
TagDataSourceBits: dataSourceBits,
}
if enumFile != "" {
Expand All @@ -283,39 +303,81 @@ func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, expor
all = append(all, structTag)
}

tagFieltertStructTags := []config.StructTags{}
tagFiltersStructTags := []config.StructTags{}
tagFiltersGroupsStructTags := make([][]config.StructTags, len(exporterCfg.TagFiltersGroups))
exportFieldStructTags := []config.StructTags{}
for _, structTag := range all {
if len(structTag.TagFilters) > 0 {
tagFieltertStructTags = append(tagFieltertStructTags, structTag)
tagFiltersStructTags = append(tagFiltersStructTags, structTag)
}
for i := range structTag.TagFiltersGroups {
if len(structTag.TagFiltersGroups[i].TagFilters) > 0 {
tagFiltersGroupsStructTags[i] = append(tagFiltersGroupsStructTags[i], structTag)
}
}
if structTag.IsExportedField {
exportFieldStructTags = append(exportFieldStructTags, structTag)
}
}
exporterCfg.TagFieltertStructTags[dataSourceId] = tagFieltertStructTags
exporterCfg.TagFiltersStructTags[dataSourceId] = tagFiltersStructTags
exporterCfg.TagFiltersGroupsStructTags[dataSourceId] = tagFiltersGroupsStructTags
exporterCfg.ExportFieldStructTags[dataSourceId] = exportFieldStructTags

dsid := config.DataSourceID(dataSourceId)
log.Infof("export protocl %s datasource %s, get all structTags: %+v", exporterCfg.Protocol, dsid.String(), all)
log.Infof("export protocl %s datasource %s, get tagfilter structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFieltertStructTags)
log.Infof("export protocl %s datasource %s, get exportfield structTags: %+v", exporterCfg.Protocol, dsid.String(), exportFieldStructTags)
log.Infof("export protocol %s datasource %s, get all structTags: %+v", exporterCfg.Protocol, dsid.String(), all)
log.Infof("export protocol %s datasource %s, get tagfilter structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFiltersStructTags)
log.Infof("export protocol %s datasource %s, get tagfilters group structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFiltersGroupsStructTags)
log.Infof("export protocol %s datasource %s, get exportfield structTags: %+v", exporterCfg.Protocol, dsid.String(), exportFieldStructTags)
}
}

func (es *Exporters) IsExportItem(item common.ExportItem, dataSourceId uint32, exporterCfg *config.ExporterCfg) bool {
es.initStructTags(item, dataSourceId, exporterCfg)

ret, shouldExit := true, false
conditionHandler := exporterCfg.TagFilterCondition.NewConditionHandler()
for _, structTag := range exporterCfg.TagFieltertStructTags[dataSourceId] {
for _, structTag := range exporterCfg.TagFiltersStructTags[dataSourceId] {
value := item.GetFieldValueByOffsetAndKind(structTag.Offset, structTag.DataKind, structTag.DataType)
for _, tagFilter := range structTag.TagFilters {
if canExit, ret := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
return ret
if canExit, r := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
ret = r
shouldExit = true
break
}
}
if shouldExit {
break
}
}

// if 'tag-filters' is configured, the configuration of `tag-filter-groups` is ignored
if len(exporterCfg.TagFiltersStructTags[dataSourceId]) > 0 {
return ret
}

for i, groupStructTags := range exporterCfg.TagFiltersGroupsStructTags[dataSourceId] {
ret, shouldExit = true, false
conditionHandler := exporterCfg.TagFiltersGroups[i].TagFilterCondition.NewConditionHandler()
for _, structTag := range groupStructTags {
value := item.GetFieldValueByOffsetAndKind(structTag.Offset, structTag.DataKind, structTag.DataType)
for _, tagFilter := range structTag.TagFiltersGroups[i].TagFilters {
if canExit, r := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
ret = r
shouldExit = true
break
}
}

if shouldExit {
break
}
}

// there is an OR relationship between 'tag-filter-groups'. If one of them is true, it can be returned.
if ret && len(groupStructTags) > 0 {
return ret
}
}
return true
return ret
}

func (es *Exporters) getPutCache(dataSourceId, decoderId, exporterId int) *ExportersCache {
Expand Down
33 changes: 23 additions & 10 deletions server/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,27 @@ ingester:
# enum-translate-to-name-disabled: false
# # whether the id value of universal-tag is not converted into a resource name for output
# universal-tag-translate-to-name-disabled: false
# tag-filter-condition:
# # Condition type, default is "and"
# # and: Logical AND between tag-filters / or: Logical OR between tag-filters
# type: "and"
# tag-filters:
# #- field-name: signal_source # database column name
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# tag-filters-groups: # 'OR' relationship between all 'tag-filters-groups'
# #- tag-filter-condition: # relationship between all 'tag-filters'
# # Condition type, default is "and", "and": Logical AND between 'tag-filters' , "or": Logical OR between 'tag-filters'
# # type: and # Condition type, default is "and"
# # tag-filters:
# # - field-name: region_id_0
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3, 4] # vlaues
# # - field-name: az_id_0
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# #- tag-filter-condition: # relationship between all 'tag-filters'
# # Condition type, default is "and", "and": Logical AND between 'tag-filters' , "or": Logical OR between 'tag-filters'
# # type: and
# # tag-filters:
# # - field-name: region_id_1
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# # - field-name: az_id_1
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# export-fields: # field_name or $category
# - $tag
# - $metrics
Expand Down Expand Up @@ -792,7 +805,7 @@ ingester:
# queue-size: 100000
# batch-size: 1024
# flush-timeout: 10
# tag-filters:
# tag-filters-groups: # 'OR' relationship between all 'tag-filters-groups'
# export-fields:
# - $tag
# - $metrics
Expand All @@ -813,7 +826,7 @@ ingester:
# queue-size: 100000
# batch-size: 32
# flush-timeout: 10
# tag-filters:
# tag-filters-groups: # 'OR' relationship between all 'tag-filters-groups'
# export-fields:
# - $tag
# - $metrics
Expand Down
Loading