Skip to content

Commit

Permalink
tmp - potential revert point
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Jan 29, 2025
1 parent 220f9ca commit 0c0c928
Show file tree
Hide file tree
Showing 10 changed files with 547 additions and 88 deletions.
111 changes: 111 additions & 0 deletions internal/component/common/relabel/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package relabel

// TODO(thampiotr): write comment
// TODO(thampiotr): port the test

import (
"crypto/md5"
"encoding/binary"
"fmt"
"strings"

"github.com/prometheus/common/model"
)

type LabelBuilder interface {
Get(label string) string
Range(f func(label string, value string))
Set(label string, val string)
Del(ns ...string)
}

// ProcessBuilder is like Process, but the caller passes a labels.Builder
// containing the initial set of labels, which is mutated by the rules.
func ProcessBuilder(lb LabelBuilder, cfgs ...*Config) (keep bool) {
for _, cfg := range cfgs {
keep = doRelabel(cfg, lb)
if !keep {
return false
}
}
return true
}

func doRelabel(cfg *Config, lb LabelBuilder) (keep bool) {
var va [16]string
values := va[:0]
if len(cfg.SourceLabels) > cap(values) {
values = make([]string, 0, len(cfg.SourceLabels))
}
for _, ln := range cfg.SourceLabels {
values = append(values, lb.Get(string(ln)))
}
val := strings.Join(values, cfg.Separator)

switch cfg.Action {
case Drop:
if cfg.Regex.MatchString(val) {
return false
}
case Keep:
if !cfg.Regex.MatchString(val) {
return false
}
case DropEqual:
if lb.Get(cfg.TargetLabel) == val {
return false
}
case KeepEqual:
if lb.Get(cfg.TargetLabel) != val {
return false
}
case Replace:
indexes := cfg.Regex.FindStringSubmatchIndex(val)
// If there is no match no replacement must take place.
if indexes == nil {
break
}
target := model.LabelName(cfg.Regex.ExpandString([]byte{}, cfg.TargetLabel, val, indexes))
if !target.IsValid() {
break
}
res := cfg.Regex.ExpandString([]byte{}, cfg.Replacement, val, indexes)
if len(res) == 0 {
lb.Del(string(target))
break
}
lb.Set(string(target), string(res))
case Lowercase:
lb.Set(cfg.TargetLabel, strings.ToLower(val))
case Uppercase:
lb.Set(cfg.TargetLabel, strings.ToUpper(val))
case HashMod:
hash := md5.Sum([]byte(val))
// Use only the last 8 bytes of the hash to give the same result as earlier versions of this code.
mod := binary.BigEndian.Uint64(hash[8:]) % cfg.Modulus
lb.Set(cfg.TargetLabel, fmt.Sprintf("%d", mod))
case LabelMap:
lb.Range(func(name, value string) {
if cfg.Regex.MatchString(name) {
res := cfg.Regex.ReplaceAllString(name, cfg.Replacement)
lb.Set(res, value)
}
})
case LabelDrop:
lb.Range(func(name, value string) {
if cfg.Regex.MatchString(name) {
lb.Del(name)
}
})
case LabelKeep:
lb.Range(func(name, value string) {
if !cfg.Regex.MatchString(name) {
lb.Del(value)
}
})
default:
panic(fmt.Errorf("relabel: unknown relabel action type %q", cfg.Action))
}

return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ goos: darwin
goarch: arm64
pkg: github.com/grafana/alloy/internal/component/discovery
cpu: Apple M2
Benchmark_Targets_TypicalPipeline-8 72 15039526 ns/op 20086634 B/op 60441 allocs/op
Benchmark_Targets_TypicalPipeline-8 75 15234195 ns/op 20085425 B/op 60437 allocs/op
Benchmark_Targets_TypicalPipeline-8 74 15218076 ns/op 20085406 B/op 60437 allocs/op
Benchmark_Targets_TypicalPipeline-8 76 14936634 ns/op 20084764 B/op 60435 allocs/op
Benchmark_Targets_TypicalPipeline-8 74 16101057 ns/op 20084489 B/op 60434 allocs/op
Benchmark_Targets_TypicalPipeline-8 75 14948820 ns/op 20084826 B/op 60435 allocs/op
Benchmark_Targets_TypicalPipeline-8 48 22740573 ns/op 21467362 B/op 100933 allocs/op
Benchmark_Targets_TypicalPipeline-8 45 23141183 ns/op 21466235 B/op 100932 allocs/op
Benchmark_Targets_TypicalPipeline-8 48 24125997 ns/op 21466766 B/op 100936 allocs/op
Benchmark_Targets_TypicalPipeline-8 46 22152172 ns/op 21469613 B/op 100944 allocs/op
Benchmark_Targets_TypicalPipeline-8 46 26163378 ns/op 21466312 B/op 100928 allocs/op
Benchmark_Targets_TypicalPipeline-8 38 41765575 ns/op 21539059 B/op 100944 allocs/op
PASS
ok github.com/grafana/alloy/internal/component/discovery 13.209s
ok github.com/grafana/alloy/internal/component/discovery 8.729s
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
goos: darwin
goarch: arm64
pkg: github.com/grafana/alloy/internal/component/discovery
cpu: Apple M2
Benchmark_Targets_TypicalPipeline-8 33 34000016 ns/op 20898478 B/op 160937 allocs/op
Benchmark_Targets_TypicalPipeline-8 32 36584542 ns/op 20826227 B/op 160935 allocs/op
Benchmark_Targets_TypicalPipeline-8 30 35797154 ns/op 20827766 B/op 160935 allocs/op
Benchmark_Targets_TypicalPipeline-8 33 34531398 ns/op 20899688 B/op 160941 allocs/op
Benchmark_Targets_TypicalPipeline-8 32 35422260 ns/op 20827490 B/op 160933 allocs/op
Benchmark_Targets_TypicalPipeline-8 33 34420867 ns/op 20897838 B/op 160939 allocs/op
PASS
ok github.com/grafana/alloy/internal/component/discovery 8.385s
1 change: 1 addition & 0 deletions internal/component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func toAlloyTargets(cache map[string]*targetgroup.Group) []Target {

for _, group := range cache {
for _, target := range group.Targets {
// TODO(thampiotr): how many have group.Labels empty?
allTargets = append(allTargets, NewTargetFromSpecificAndBaseLabelSet(target, group.Labels))
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/component/discovery/distributed_targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func (dt *DistributedTargets) MovedToRemoteInstance(prev *DistributedTargets) []
}

func keyFor(tgt Target) shard.Key {
return shard.Key(tgt.NonMetaLabels().Hash())
return shard.Key(tgt.NonMetaLabelsHash())
}

func keyForLabels(tgt Target, lbls []string) shard.Key {
return shard.Key(tgt.SpecificLabels(lbls).Hash())
return shard.Key(tgt.SpecificLabelsHash(lbls))
}

type disabledCluster struct{}
Expand Down
22 changes: 10 additions & 12 deletions internal/component/discovery/relabel/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"sync"

"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/alloy/internal/component"
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/discovery"
Expand Down Expand Up @@ -47,7 +45,6 @@ type Component struct {
opts component.Options

mut sync.RWMutex
rcs []*relabel.Config

debugDataPublisher livedebugging.DebugDataPublisher
}
Expand Down Expand Up @@ -82,24 +79,25 @@ func (c *Component) Run(ctx context.Context) error {

// Update implements component.Component.
func (c *Component) Update(args component.Arguments) error {
c.mut.Lock()
defer c.mut.Unlock()

newArgs := args.(Arguments)

targets := make([]discovery.Target, 0, len(newArgs.Targets))
relabelConfigs := alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelConfigs)
c.rcs = relabelConfigs

// TODO(thampiotr): can do even better perhaps by relabeling all the group ones separately from all the
// individual ones
for _, t := range newArgs.Targets {
lset := t.Labels()
relabelled, keep := relabel.Process(lset, relabelConfigs...)
var (
relabelled discovery.Target
builder = discovery.NewLabelsBuilderFromTarget(t)
keep = alloy_relabel.ProcessBuilder(builder, newArgs.RelabelConfigs...)
)
if keep {
targets = append(targets, discovery.NewTargetFromModelLabels(relabelled))
relabelled = discovery.NewTargetFromLabelsBuilder(builder)
targets = append(targets, relabelled)
}
componentID := livedebugging.ComponentID(c.opts.ID)
if c.debugDataPublisher.IsActive(componentID) {
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lset.String(), relabelled.String()))
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", t, relabelled))
}
}

Expand Down
Loading

0 comments on commit 0c0c928

Please sign in to comment.