Skip to content

Commit

Permalink
修复 KubernetesPrometheus 在特殊环境下采集重复的问题,优化采集性能
Browse files Browse the repository at this point in the history
  • Loading branch information
liguozhuang authored and 谭彪 committed Aug 28, 2024
1 parent a194b7e commit 724b901
Show file tree
Hide file tree
Showing 15 changed files with 490 additions and 283 deletions.
10 changes: 10 additions & 0 deletions internal/export/doc/en/inputs/kubernetesprometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ A brief description of how this collector operates helps in better understanding
cert_key = "/opt/nginx/peer.key"
```

Additionally, there is a type of global configuration, which is the highest-level configuration, mainly responsible for enabling or disabling certain features, such as `node_local` here:

```yaml
[inputs.kubernetesprometheus]
node_local = true # Whether to enable NodeLocal mode, distributing the collection across nodes

[[inputs.kubernetesprometheus.instances]]
# ..other
```

```markdown
<!-- markdownlint-disable MD046 -->
???+ attention
Expand Down
10 changes: 10 additions & 0 deletions internal/export/doc/zh/inputs/kubernetesprometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ KubernetesPrometheus 是一个只能应用在 Kubernetes 的采集器,它根
cert_key = "/opt/nginx/peer.key"
```

此外还有一类全局配置,它是最顶层的配置,主要负责一些功能开启或关闭,例如此处的 `node_local`

```yaml
[inputs.kubernetesprometheus]
node_local = true # 是否开启 NodeLocal 模式,将采集分散到各个节点

[[inputs.kubernetesprometheus.instances]]
# ..other
```

<!-- markdownlint-disable MD046 -->
???+ attention

Expand Down
9 changes: 3 additions & 6 deletions internal/plugins/inputs/kubernetesprometheus/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,23 @@
package kubernetesprometheus

import (
"sync/atomic"

"github.com/GuanceCloud/cliutils/logger"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit"
)

var (
inputName = "kubernetesprometheus"
klog = logger.DefaultSLogger("inputName")
klog = logger.DefaultSLogger(inputName)

managerGo = datakit.G("kubernetesprometheus_manager")
workerGo = datakit.G("kubernetesprometheus_worker")

workerCounter = atomic.Int64{}
maxWorkerNum = int64(1000)
)

const (
example = `
[inputs.kubernetesprometheus]
node_local = true
[[inputs.kubernetesprometheus.instances]]
role = "node"
namespaces = []
Expand Down
56 changes: 30 additions & 26 deletions internal/plugins/inputs/kubernetesprometheus/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ type Endpoints struct {
store cache.Store

instances []*Instance
scraper *scraper
keys map[string]string
scrape *scrapeWorker
feeder dkio.Feeder
}

Expand All @@ -42,15 +41,16 @@ func NewEndpoints(informerFactory informers.SharedInformerFactory, instances []*
store: informer.Informer().GetStore(),

instances: instances,
scraper: newScraper(),
keys: make(map[string]string),
scrape: newScrapeWorker(RoleEndpoints),
feeder: feeder,
}, nil
}

func (e *Endpoints) Run(ctx context.Context) {
defer e.queue.ShutDown()

e.scrape.startWorker(ctx, maxConcurrent(nodeLocalFrom(ctx)))

e.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e.enqueue(obj)
Expand All @@ -63,10 +63,11 @@ func (e *Endpoints) Run(ctx context.Context) {
},
})

go func() {
managerGo.Go(func(_ context.Context) error {
for e.process(ctx) {
}
}()
return nil
})

<-ctx.Done()
}
Expand Down Expand Up @@ -95,7 +96,7 @@ func (e *Endpoints) process(ctx context.Context) bool {

if !exists {
klog.Infof("deleted Endpoints %s", key)
e.terminateProms(key)
e.terminateScrape(key)
return true
}

Expand All @@ -105,23 +106,24 @@ func (e *Endpoints) process(ctx context.Context) bool {
return true
}

info, exists := e.keys[key]
if exists && info == joinEndpointsInfo(ep) {
if e.scrape.matchesKey(key, endpointsFeature(ep)) {
return true
}

klog.Infof("found endpoints %s", key)

e.terminateProms(key)
e.runProm(ctx, key, ep)
e.terminateScrape(key)
e.startScrape(ctx, key, ep)
return true
}

func (e *Endpoints) runProm(ctx context.Context, key string, item *corev1.Endpoints) {
func (e *Endpoints) startScrape(ctx context.Context, key string, item *corev1.Endpoints) {
if shouldSkipEndpoints(item) {
return
}

nodeName, nodeNameExists := nodeNameFrom(ctx)

for _, ins := range e.instances {
if !ins.validator.Matches(item.Namespace, item.Labels) {
continue
Expand All @@ -134,7 +136,7 @@ func (e *Endpoints) runProm(ctx context.Context, key string, item *corev1.Endpoi

// record key
klog.Infof("added Endpoints %s", key)
e.keys[key] = joinEndpointsInfo(item)
e.scrape.registerKey(key, endpointsFeature(item))

cfgs, err := pr.parsePromConfig(ins)
if err != nil {
Expand All @@ -144,6 +146,10 @@ func (e *Endpoints) runProm(ctx context.Context, key string, item *corev1.Endpoi
interval := ins.Interval

for _, cfg := range cfgs {
if nodeNameExists && cfg.nodeName != "" && cfg.nodeName != nodeName {
continue
}

opts := buildPromOptions(
RoleEndpoints, key, e.feeder,
iprom.WithMeasurementName(cfg.measurement),
Expand All @@ -156,26 +162,24 @@ func (e *Endpoints) runProm(ctx context.Context, key string, item *corev1.Endpoi
}

urlstr := cfg.urlstr
election := cfg.nodeName == ""

workerInc(RoleEndpoints, key)
workerGo.Go(func(_ context.Context) error {
defer func() {
workerInc(RoleEndpoints, key)
}()
prom, err := newPromTarget(ctx, urlstr, interval, election, opts)
if err != nil {
klog.Warnf("fail new prom %s for %s", urlstr, err)
continue
}

e.scraper.runProm(ctx, key, urlstr, interval, opts)
return nil
})
e.scrape.registerTarget(key, prom)
}
}
}

func (e *Endpoints) terminateProms(key string) {
e.scraper.terminateProms(key)
delete(e.keys, key)
func (e *Endpoints) terminateScrape(key string) {
e.scrape.terminate(key)
}

func joinEndpointsInfo(item *corev1.Endpoints) string {
func endpointsFeature(item *corev1.Endpoints) string {
var ips []string
for _, sub := range item.Subsets {
for _, address := range sub.Addresses {
Expand All @@ -186,5 +190,5 @@ func joinEndpointsInfo(item *corev1.Endpoints) string {
}

func shouldSkipEndpoints(item *corev1.Endpoints) bool {
return maxedOutClients() || len(item.Subsets) == 0 || len(item.Subsets[0].Addresses) == 0
return len(item.Subsets) == 0 || len(item.Subsets[0].Addresses) == 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (p *endpointsParser) parsePromConfig(ins *Instance) ([]*basePromConfig, err
var configs []*basePromConfig

for _, set := range p.item.Subsets {
for addressIdx := range set.Addresses {
for addressIdx, address := range set.Addresses {
// length 5
oldElems := []string{ins.Scheme, ins.Address, ins.Port, ins.Path, ins.Measurement}
newElems := deepCopySlice(oldElems)
Expand Down Expand Up @@ -224,10 +224,16 @@ func (p *endpointsParser) parsePromConfig(ins *Instance) ([]*basePromConfig, err
}
}

nodeName := ""
if address.NodeName != nil {
nodeName = *address.NodeName
}

configs = append(configs, &basePromConfig{
urlstr: u.String(),
measurement: measurement,
tags: tags,
nodeName: nodeName,
})
}
}
Expand Down
60 changes: 46 additions & 14 deletions internal/plugins/inputs/kubernetesprometheus/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package kubernetesprometheus
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/GuanceCloud/cliutils/logger"
Expand All @@ -19,13 +21,15 @@ import (
)

type Input struct {
NodeLocal bool `toml:"node_local"`
InstanceManager

chPause chan bool
pause bool
pause *atomic.Bool
feeder dkio.Feeder
cancel context.CancelFunc

cancel context.CancelFunc
runOnce sync.Once
}

func (*Input) SampleConfig() string { return example }
Expand All @@ -39,12 +43,17 @@ func (ipt *Input) Run() {
klog = logger.SLogger("kubernetesprometheus")

for {
if !ipt.pause {
if err := ipt.start(); err != nil {
klog.Warn(err)
}
} else {
ipt.stop()
// enable nodeLocal model or election success
if ipt.NodeLocal || !ipt.pause.Load() {
ipt.runOnce.Do(
func() {
managerGo.Go(func(_ context.Context) error {
if err := ipt.start(); err != nil {
klog.Warn(err)
}
return nil
})
})
}

select {
Expand All @@ -53,8 +62,16 @@ func (ipt *Input) Run() {
klog.Info("exit")
return

case ipt.pause = <-ipt.chPause:
// nil
case pause := <-ipt.chPause:
ipt.pause.Store(pause)

// disable nodeLocal model and election defeat
if !ipt.NodeLocal && pause {
ipt.stop()
ipt.runOnce = sync.Once{} // reset runOnce
}
default:
// next
}
}
}
Expand All @@ -67,21 +84,34 @@ func (ipt *Input) start() error {
return err
}

nodeName, err := getLocalNodeName()
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
ipt.cancel = cancel

if ipt.NodeLocal {
ctx = withNodeName(ctx, nodeName)
ctx = withNodeLocal(ctx, ipt.NodeLocal)
}
ctx = withPause(ctx, ipt.pause)

ipt.InstanceManager.Run(ctx, apiClient.Clientset, apiClient.InformerFactory, ipt.feeder)

apiClient.InformerFactory.Start(ctx.Done())
apiClient.InformerFactory.WaitForCacheSync(ctx.Done())

<-ctx.Done()
klog.Info("end")
return nil
}

func (ipt *Input) stop() {
ipt.cancel()
klog.Info("stop")
if ipt.cancel != nil {
ipt.cancel()
}
}

func (ipt *Input) Pause() error {
Expand All @@ -108,8 +138,10 @@ func init() { //nolint:gochecknoinits
setupMetrics()
inputs.Add(inputName, func() inputs.Input {
return &Input{
chPause: make(chan bool, inputs.ElectionPauseChannelLength),
feeder: dkio.DefaultFeeder(),
NodeLocal: true,
chPause: make(chan bool, inputs.ElectionPauseChannelLength),
pause: &atomic.Bool{},
feeder: dkio.DefaultFeeder(),
}
})
}
Loading

0 comments on commit 724b901

Please sign in to comment.