Skip to content

Commit

Permalink
feat: exporters adds tag-filters-groups configuration
Browse files Browse the repository at this point in the history
there is always an OR relationship between `tag-filters-groups`
  • Loading branch information
lzf575 committed Nov 28, 2024
1 parent edd8f13 commit 315f628
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 31 deletions.
43 changes: 34 additions & 9 deletions server/ingester/exporters/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ type TagFilter struct {
RegexpComplied *regexp.Regexp
}

type TagFiltersGroup struct {
TagFilterCondition TagFilterCondition `yaml:"tag-filter-condition"`
TagFilters []TagFilter `yaml:"tag-filters"`
}

func (t *TagFiltersGroup) Validate() {
t.TagFilterCondition.Validate()
for i := range t.TagFilters {
t.TagFilters[i].Validate()
}
}

func (t *TagFilter) Validate() {
t.OperatorId = operatorStringToID(t.Operator)
if t.OperatorId == EQ || t.OperatorId == NEQ || t.OperatorId == IN || t.OperatorId == NOT_IN {
Expand Down Expand Up @@ -420,7 +432,8 @@ type StructTags struct {
TagDataSourceBits uint32 // gen from 'TagDatasourceStr'

// the field has tagFilter, if it is not nil, should caculate filter
TagFilters []TagFilter // gen from 'ExporterCfg.TagFilters'
TagFilters []TagFilter // gen from 'ExporterCfg.TagFilters'
TagFiltersGroups []TagFiltersGroup // gen from 'ExporterCfg.TagFiltersGroup'

IsExportedField bool // gen from 'ExporterCfg.ExportFields'
}
Expand All @@ -445,15 +458,20 @@ type ExporterCfg struct {
EnumTranslateToNameDisabled bool `yaml:"enum-translate-to-name-disabled"`
UniversalTagTranslateToNameDisabled bool `yaml:"universal-tag-translate-to-name-disabled"`

TagFilterCondition TagFilterCondition `yaml:"tag-filter-condition"`
TagFilters []TagFilter `yaml:"tag-filters"`
ExportFields []string `yaml:"export-fields"`
ExportFieldCategoryBits uint64 // gen by `ExportFields`
ExportFieldNames []string // gen by `ExportFields`
ExportFieldK8s []string // gen by `ExportFields`
// Deprecated, use 'tag-filters-groups'
TagFilterCondition TagFilterCondition `yaml:"tag-filter-condition"`
// Deprecated, use 'tag-filters-groups'
TagFilters []TagFilter `yaml:"tag-filters"`

ExportFieldStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `ExportFields` and init when exporting item first time
TagFieltertStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `TagFilters` and init when exporting item first time
TagFiltersGroups []TagFiltersGroup `yaml:"tag-filters-groups"`
ExportFields []string `yaml:"export-fields"`
ExportFieldCategoryBits uint64 // gen by `ExportFields`
ExportFieldNames []string // gen by `ExportFields`
ExportFieldK8s []string // gen by `ExportFields`

ExportFieldStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `ExportFields` and init when exporting item first time
TagFiltersStructTags [MAX_DATASOURCE_ID][]StructTags // gen by `TagFilters` and init when exporting item first time
TagFiltersGroupsStructTags [MAX_DATASOURCE_ID][][]StructTags // gen by `TagFiltersGroups` and init when exporting item first time

// private configuration
ExtraHeaders map[string]string `yaml:"extra-headers"`
Expand Down Expand Up @@ -547,10 +565,17 @@ func (cfg *ExporterCfg) Validate() error {
for i := range cfg.TagFilters {
cfg.TagFilters[i].Validate()
}
if len(cfg.TagFilters) > 0 {
log.Warning("the 'tag-filters' setting will be deprecated, use 'tag-filters-groups' instead")
}

cfg.TagFilterCondition.Validate()
cfg.Sasl.Validate()

for i := range cfg.TagFiltersGroups {
cfg.TagFiltersGroups[i].Validate()
}

return nil
}

Expand Down
86 changes: 74 additions & 12 deletions server/ingester/exporters/exporters.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,25 @@ func GetTagFilters(field string, tagFilters []config.TagFilter) []config.TagFilt
return tagFilter
}

func GetTagFiltersGroups(field string, tagFiltersGroups []config.TagFiltersGroup) []config.TagFiltersGroup {
hitFilterGroups := false
groups := make([]config.TagFiltersGroup, len(tagFiltersGroups))
for i, filterGroup := range tagFiltersGroups {
groups[i].TagFilterCondition = filterGroup.TagFilterCondition
for _, filter := range filterGroup.TagFilters {
if filter.FieldName == field {
groups[i].TagFilters = append(groups[i].TagFilters, filter)
hitFilterGroups = true
}
}
}

if hitFilterGroups {
return groups
}
return nil
}

func IsExportField(tag *config.StructTags, exportFieldCategoryBits uint64, exportFieldNames []string) bool {
if tag.Name == "" {
return false
Expand All @@ -184,7 +203,7 @@ func IsExportField(tag *config.StructTags, exportFieldCategoryBits uint64, expor
}

func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, exporterCfg *config.ExporterCfg) {
if exporterCfg.TagFieltertStructTags[dataSourceId] == nil {
if exporterCfg.TagFiltersStructTags[dataSourceId] == nil {
t := reflect.TypeOf(item)
if t.Kind() == reflect.Pointer {
t = t.Elem()
Expand Down Expand Up @@ -274,6 +293,7 @@ func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, expor
ToStringFunc: toStringFunc,
UniversalTagMapID: universal_tag.StringToUniversalTagID(name),
TagFilters: GetTagFilters(name, exporterCfg.TagFilters),
TagFiltersGroups: GetTagFiltersGroups(name, exporterCfg.TagFiltersGroups),
TagDataSourceBits: dataSourceBits,
}
if enumFile != "" {
Expand All @@ -283,39 +303,81 @@ func (es *Exporters) initStructTags(item interface{}, dataSourceId uint32, expor
all = append(all, structTag)
}

tagFieltertStructTags := []config.StructTags{}
tagFiltersStructTags := []config.StructTags{}
tagFiltersGroupsStructTags := make([][]config.StructTags, len(exporterCfg.TagFiltersGroups))
exportFieldStructTags := []config.StructTags{}
for _, structTag := range all {
if len(structTag.TagFilters) > 0 {
tagFieltertStructTags = append(tagFieltertStructTags, structTag)
tagFiltersStructTags = append(tagFiltersStructTags, structTag)
}
for i := range structTag.TagFiltersGroups {
if len(structTag.TagFiltersGroups[i].TagFilters) > 0 {
tagFiltersGroupsStructTags[i] = append(tagFiltersGroupsStructTags[i], structTag)
}
}
if structTag.IsExportedField {
exportFieldStructTags = append(exportFieldStructTags, structTag)
}
}
exporterCfg.TagFieltertStructTags[dataSourceId] = tagFieltertStructTags
exporterCfg.TagFiltersStructTags[dataSourceId] = tagFiltersStructTags
exporterCfg.TagFiltersGroupsStructTags[dataSourceId] = tagFiltersGroupsStructTags
exporterCfg.ExportFieldStructTags[dataSourceId] = exportFieldStructTags

dsid := config.DataSourceID(dataSourceId)
log.Infof("export protocl %s datasource %s, get all structTags: %+v", exporterCfg.Protocol, dsid.String(), all)
log.Infof("export protocl %s datasource %s, get tagfilter structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFieltertStructTags)
log.Infof("export protocl %s datasource %s, get exportfield structTags: %+v", exporterCfg.Protocol, dsid.String(), exportFieldStructTags)
log.Infof("export protocol %s datasource %s, get all structTags: %+v", exporterCfg.Protocol, dsid.String(), all)
log.Infof("export protocol %s datasource %s, get tagfilter structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFiltersStructTags)
log.Infof("export protocol %s datasource %s, get tagfilters group structTags: %+v", exporterCfg.Protocol, dsid.String(), tagFiltersGroupsStructTags)
log.Infof("export protocol %s datasource %s, get exportfield structTags: %+v", exporterCfg.Protocol, dsid.String(), exportFieldStructTags)
}
}

func (es *Exporters) IsExportItem(item common.ExportItem, dataSourceId uint32, exporterCfg *config.ExporterCfg) bool {
es.initStructTags(item, dataSourceId, exporterCfg)

ret, shouldExit := true, false
conditionHandler := exporterCfg.TagFilterCondition.NewConditionHandler()
for _, structTag := range exporterCfg.TagFieltertStructTags[dataSourceId] {
for _, structTag := range exporterCfg.TagFiltersStructTags[dataSourceId] {
value := item.GetFieldValueByOffsetAndKind(structTag.Offset, structTag.DataKind, structTag.DataType)
for _, tagFilter := range structTag.TagFilters {
if canExit, ret := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
return ret
if canExit, r := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
ret = r
shouldExit = true
break
}
}
if shouldExit {
break
}
}

// if 'tag-filters' is configured, the configuration of `tag-filter-groups` is ignored
if len(exporterCfg.TagFiltersStructTags[dataSourceId]) > 0 {
return ret
}

for i, groupStructTags := range exporterCfg.TagFiltersGroupsStructTags[dataSourceId] {
ret, shouldExit = true, false
conditionHandler := exporterCfg.TagFiltersGroups[i].TagFilterCondition.NewConditionHandler()
for _, structTag := range groupStructTags {
value := item.GetFieldValueByOffsetAndKind(structTag.Offset, structTag.DataKind, structTag.DataType)
for _, tagFilter := range structTag.TagFiltersGroups[i].TagFilters {
if canExit, r := conditionHandler.Decision(tagFilter.MatchValue(value)); canExit {
ret = r
shouldExit = true
break
}
}

if shouldExit {
break
}
}

// there is an OR relationship between 'tag-filter-groups'. If one of them is true, it can be returned.
if ret && len(groupStructTags) > 0 {
return ret
}
}
return true
return ret
}

func (es *Exporters) getPutCache(dataSourceId, decoderId, exporterId int) *ExportersCache {
Expand Down
Binary file added server/libs/ckdb/.table.go.swp
Binary file not shown.
Binary file added server/libs/stats/.stats.go.swp
Binary file not shown.
33 changes: 23 additions & 10 deletions server/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,27 @@ ingester:
# enum-translate-to-name-disabled: false
# # whether the id value of universal-tag is not converted into a resource name for output
# universal-tag-translate-to-name-disabled: false
# tag-filter-condition:
# # Condition type, default is "and"
# # and: Logical AND between tag-filters / or: Logical OR between tag-filters
# type: "and"
# tag-filters:
# #- field-name: signal_source # database column name
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# tag-filters-groups: # 'OR' relationship between all 'tag-filters-groups'
# #- tag-filter-condition: # relationship between all 'tag-filters'
# # Condition type, default is "and", "and": Logical AND between 'tag-filters' , "or": Logical OR between 'tag-filters'
# # type: and # Condition type, default is "and"
# # tag-filters:
# # - field-name: region_id_0
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3, 4] # vlaues
# # - field-name: az_id_0
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# #- tag-filter-condition: # relationship between all 'tag-filters'
# # Condition type, default is "and", "and": Logical AND between 'tag-filters' , "or": Logical OR between 'tag-filters'
# # type: and
# # tag-filters:
# # - field-name: region_id_1
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# # - field-name: az_id_1
# # operator: "=" # can be '=', '!=', 'in', 'not in', ':', '!:', "~", "!~"
# # field-values: [3] # vlaues
# export-fields: # field_name or $category
# - $tag
# - $metrics
Expand Down Expand Up @@ -792,7 +805,7 @@ ingester:
# queue-size: 100000
# batch-size: 1024
# flush-timeout: 10
# tag-filters:
# tag-filters-groups: # 'OR' relationship between all 'tag-filters-groups'
# export-fields:
# - $tag
# - $metrics
Expand All @@ -813,7 +826,7 @@ ingester:
# queue-size: 100000
# batch-size: 32
# flush-timeout: 10
# tag-filters:
# tag-filters-groups: # 'OR' relationship between all 'tag-filters-groups'
# export-fields:
# - $tag
# - $metrics
Expand Down

0 comments on commit 315f628

Please sign in to comment.