Skip to content

Commit

Permalink
feat: exporters adds tag-filters-groups configuration
Browse files Browse the repository at this point in the history
It is only valid when the configuration of 'tag-filters' is empty
There is always an OR relationship between `tag-filters-groups`
  • Loading branch information
lzf575 committed Nov 26, 2024
1 parent 5092d7b commit f9b72c2
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 15 deletions.
25 changes: 22 additions & 3 deletions server/ingester/exporters/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ type TagFilter struct {
RegexpComplied *regexp.Regexp
}

type TagFiltersGroup struct {
TagFilterCondition TagFilterCondition `yaml:"tag-filter-condition"`
TagFilters []TagFilter `yaml:"tag-filters"`
}

func (t *TagFiltersGroup) Validate() {
t.TagFilterCondition.Validate()
for i := range t.TagFilters {
t.TagFilters[i].Validate()
}
}

func (t *TagFilter) Validate() {
t.OperatorId = operatorStringToID(t.Operator)
if t.OperatorId == EQ || t.OperatorId == NEQ || t.OperatorId == IN || t.OperatorId == NOT_IN {
Expand Down Expand Up @@ -420,7 +432,8 @@ type StructTags struct {
TagDataSourceBits uint32 // gen from 'TagDatasourceStr'

// the field has tagFilter, if it is not nil, should caculate filter
TagFilters []TagFilter // gen from 'ExporterCfg.TagFilters'
TagFilters []TagFilter // gen from 'ExporterCfg.TagFilters'
TagFiltersGroups []TagFiltersGroup // gen from 'ExporterCfg.TagFiltersGroup'

IsExportedField bool // gen from 'ExporterCfg.ExportFields'
}
Expand All @@ -447,13 +460,15 @@ type ExporterCfg struct {

TagFilterCondition TagFilterCondition `yaml:"tag-filter-condition"`
TagFilters []TagFilter `yaml:"tag-filters"`
TagFiltersGroups []TagFiltersGroup `yaml:"tag-filters-groups"`
ExportFields []string `yaml:"export-fields"`
ExportFieldCategoryBits uint64 // gen by `ExportFields`
ExportFieldNames []string // gen by `ExportFields`
ExportFieldK8s []string // gen by `ExportFields`

ExportFieldStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `ExportFields` and init when exporting item first time
TagFieltertStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `TagFilters` and init when exporting item first time
ExportFieldStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `ExportFields` and init when exporting item first time
TagFiltersStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `TagFilters` and init when exporting item first time
TagFiltersGroupsStructTags [MAX_DATASOURCE_ID][][]StructTags // gen by `TagFiltersGroups` and init when exporting item first time

// private configuration
ExtraHeaders map[string]string `yaml:"extra-headers"`
Expand Down Expand Up @@ -551,6 +566,10 @@ func (cfg *ExporterCfg) Validate() error {
cfg.TagFilterCondition.Validate()
cfg.Sasl.Validate()

for i := range cfg.TagFiltersGroups {
cfg.TagFiltersGroups[i].Validate()
}

return nil
}

Expand Down
86 changes: 74 additions & 12 deletions server/ingester/exporters/exporters.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,25 @@ func GetTagFilters(field string, tagFilters []config.TagFilter) []config.TagFilt
return tagFilter
}

func GetTagFiltersGroups(field string, tagFiltersGroups []config.TagFiltersGroup) []config.TagFiltersGroup {
hitFilterGroups := false
groups := make([]config.TagFiltersGroup, len(tagFiltersGroups))
for i, filterGroup := range tagFiltersGroups {
groups[i].TagFilterCondition = filterGroup.TagFilterCondition
for _, filter := range filterGroup.TagFilters {
if filter.FieldName == field {
groups[i].TagFilters = append(groups[i].TagFilters, filter)
hitFilterGroups = true
}
}
}

if hitFilterGroups {
return groups
}
return nil
}

func IsExportField(tag *config.StructTags, exportFieldCategoryBits uint64, exportFieldNames []string) bool {
if tag.Name == "" {
return false
Expand All @@ -184,7 +203,7 @@ func IsExportField(tag *config.StructTags, exportFieldCategoryBits uint64, expor
}

func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, exporterCfg *config.ExporterCfg) {
if exporterCfg.TagFieltertStructTags[dataSourceId] == nil {
if exporterCfg.TagFiltersStructTags[dataSourceId] == nil {
t := reflect.TypeOf(item)
if t.Kind() == reflect.Pointer {
t = t.Elem()
Expand Down Expand Up @@ -274,6 +293,7 @@ func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, expor
ToStringFunc: toStringFunc,
UniversalTagMapID: universal_tag.StringToUniversalTagID(name),
TagFilters: GetTagFilters(name, exporterCfg.TagFilters),
TagFiltersGroups: GetTagFiltersGroups(name, exporterCfg.TagFiltersGroups),
TagDataSourceBits: dataSourceBits,
}
if enumFile != "" {
Expand All @@ -283,39 +303,81 @@ func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, expor
all = append(all, structTag)
}

tagFieltertStructTags := []config.StructTags{}
tagFiltersStructTags := []config.StructTags{}
tagFiltersGroupsStructTags := make([][]config.StructTags, len(exporterCfg.TagFiltersGroups))
exportFieldStructTags := []config.StructTags{}
for _, structTag := range all {
if len(structTag.TagFilters) > 0 {
tagFieltertStructTags = append(tagFieltertStructTags, structTag)
tagFiltersStructTags = append(tagFiltersStructTags, structTag)
}
for i := range structTag.TagFiltersGroups {
if len(structTag.TagFiltersGroups[i].TagFilters) > 0 {
tagFiltersGroupsStructTags[i] = append(tagFiltersGroupsStructTags[i], structTag)
}
}
if structTag.IsExportedField {
exportFieldStructTags = append(exportFieldStructTags, structTag)
}
}
exporterCfg.TagFieltertStructTags[dataSourceId] = tagFieltertStructTags
exporterCfg.TagFiltersStructTags[dataSourceId] = tagFiltersStructTags
exporterCfg.TagFiltersGroupsStructTags[dataSourceId] = tagFiltersGroupsStructTags
exporterCfg.ExportFieldStructTags[dataSourceId] = exportFieldStructTags

dsid := config.DataSourceID(dataSourceId)
log.Infof("export protocl %s datasource %s, get all structTags: %+v", exporterCfg.Protocol, dsid.String(), all)
log.Infof("export protocl %s datasource %s, get tagfilter structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFieltertStructTags)
log.Infof("export protocl %s datasource %s, get exportfield structTags: %+v", exporterCfg.Protocol, dsid.String(), exportFieldStructTags)
log.Infof("export protocol %s datasource %s, get all structTags: %+v", exporterCfg.Protocol, dsid.String(), all)
log.Infof("export protocol %s datasource %s, get tagfilter structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFiltersStructTags)
log.Infof("export protocol %s datasource %s, get tagfilters group structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFiltersGroupsStructTags)
log.Infof("export protocol %s datasource %s, get exportfield structTags: %+v", exporterCfg.Protocol, dsid.String(), exportFieldStructTags)
}
}

func (es *Exporters) IsExportItem(item common.ExportItem, dataSourceId uint32, exporterCfg *config.ExporterCfg) bool {
es.initStructTags(item, dataSourceId, exporterCfg)

ret, shouldExit := true, false
conditionHandler := exporterCfg.TagFilterCondition.NewConditionHandler()
for _, structTag := range exporterCfg.TagFieltertStructTags[dataSourceId] {
for _, structTag := range exporterCfg.TagFiltersStructTags[dataSourceId] {
value := item.GetFieldValueByOffsetAndKind(structTag.Offset, structTag.DataKind, structTag.DataType)
for _, tagFilter := range structTag.TagFilters {
if canExit, ret := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
return ret
if canExit, r := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
ret = r
shouldExit = true
break
}
}
if shouldExit {
break
}
}

// if 'tag-filters' is configured, the configuration of `tag-filter-groups` is ignored
if len(exporterCfg.TagFiltersStructTags[dataSourceId]) > 0 {
return ret
}

for i, groupStructTags := range exporterCfg.TagFiltersGroupsStructTags[dataSourceId] {
ret, shouldExit = true, false
conditionHandler := exporterCfg.TagFiltersGroups[i].TagFilterCondition.NewConditionHandler()
for _, structTag := range groupStructTags {
value := item.GetFieldValueByOffsetAndKind(structTag.Offset, structTag.DataKind, structTag.DataType)
for _, tagFilter := range structTag.TagFiltersGroups[i].TagFilters {
if canExit, r := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
ret = r
shouldExit = true
break
}
}

if shouldExit {
break
}
}

// there is an OR relationship between 'tag-filter-groups'. If one of them is true, it can be returned.
if ret && len(groupStructTags) > 0 {
return ret
}
}
return true
return ret
}

func (es *Exporters) getPutCache(dataSourceId, decoderId, exporterId int) *ExportersCache {
Expand Down
20 changes: 20 additions & 0 deletions server/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,26 @@ ingester:
# #- field-name: signal_source # database column name
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# # only when the configuration of `tag-filters` above is empty, the configuration of `tag-filters-groups` is valid.
# tag-filters-groups: # 'OR' relationship between all tag-filter-groups
# #- tag-filter-condition: # default 'AND' relationship between all tag-filters
# # type: and
# # tag-filters:
# # - field-name: region_id_0
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# # - field-name: az_id_0
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# #- tag-filter-condition: # default 'AND' relationship between all tag-filters
# # type: and
# # tag-filters:
# # - field-name: region_id_1
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# # - field-name: az_id_1
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# export-fields: # field_name or $category
# - $tag
# - $metrics
Expand Down

0 comments on commit f9b72c2

Please sign in to comment.