Skip to content

Commit

Permalink
修复 KubernetesPrometheus 采集器在特殊情况下的异常
Browse files Browse the repository at this point in the history
  • Loading branch information
liguozhuang authored and 谭彪 committed Sep 11, 2024
1 parent 6bfce6e commit 63fa8fe
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 210 deletions.
4 changes: 3 additions & 1 deletion internal/plugins/inputs/kubernetesprometheus/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ var (
inputName = "kubernetesprometheus"
klog = logger.DefaultSLogger(inputName)

// Maximum: 1 (inputs.Run) + 4 * 2 (resource manager) + N (Services Number).
managerGo = datakit.G("kubernetesprometheus_manager")
workerGo = datakit.G("kubernetesprometheus_worker")
// Maximum: 4 * maxConcurrent.
workerGo = datakit.G("kubernetesprometheus_worker")
)

const (
Expand Down
18 changes: 10 additions & 8 deletions internal/plugins/inputs/kubernetesprometheus/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Endpoints struct {
store cache.Store

instances []*Instance
scrape *scrapeWorker
scrape *scrapeManager
feeder dkio.Feeder
}

Expand All @@ -41,15 +41,15 @@ func NewEndpoints(informerFactory informers.SharedInformerFactory, instances []*
store: informer.Informer().GetStore(),

instances: instances,
scrape: newScrapeWorker(RoleEndpoints),
scrape: newScrapeManager(RoleEndpoints),
feeder: feeder,
}, nil
}

func (e *Endpoints) Run(ctx context.Context) {
defer e.queue.ShutDown()

e.scrape.startWorker(ctx, maxConcurrent(nodeLocalFrom(ctx)))
e.scrape.run(ctx, maxConcurrent(nodeLocalFrom(ctx)))

e.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -123,6 +123,7 @@ func (e *Endpoints) startScrape(ctx context.Context, key string, item *corev1.En
}

nodeName, nodeNameExists := nodeNameFrom(ctx)
feature := endpointsFeature(item)

for _, ins := range e.instances {
if !ins.validator.Matches(item.Namespace, item.Labels) {
Expand All @@ -136,7 +137,6 @@ func (e *Endpoints) startScrape(ctx context.Context, key string, item *corev1.En

// record key
klog.Infof("added Endpoints %s", key)
e.scrape.registerKey(key, endpointsFeature(item))

cfgs, err := pr.parsePromConfig(ins)
if err != nil {
Expand All @@ -162,21 +162,23 @@ func (e *Endpoints) startScrape(ctx context.Context, key string, item *corev1.En
}

urlstr := cfg.urlstr
election := cfg.nodeName == ""
checkPausedFunc := func() bool {
return checkPaused(ctx, cfg.nodeName == "")
}

prom, err := newPromTarget(ctx, urlstr, interval, election, opts)
prom, err := newPromScraper(RoleEndpoints, key, urlstr, interval, checkPausedFunc, opts)
if err != nil {
klog.Warnf("fail new prom %s for %s", urlstr, err)
continue
}

e.scrape.registerTarget(key, prom)
e.scrape.registerScrape(key, feature, prom)
}
}
}

func (e *Endpoints) terminateScrape(key string) {
e.scrape.terminate(key)
e.scrape.terminateScrape(key)
}

func endpointsFeature(item *corev1.Endpoints) string {
Expand Down
8 changes: 8 additions & 0 deletions internal/plugins/inputs/kubernetesprometheus/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,11 @@ func pauseFrom(ctx context.Context) (bool, bool) {
p := pause.Load()
return p, true
}

func checkPaused(ctx context.Context, election bool) bool {
if !election {
return false
}
paused, exists := pauseFrom(ctx)
return exists && paused
}
37 changes: 37 additions & 0 deletions internal/plugins/inputs/kubernetesprometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
var (
collectPtsCounter *prometheus.CounterVec
scrapeTargetNumber *prometheus.GaugeVec
scrapeTargetCost *prometheus.SummaryVec
activeWorkerTasks *prometheus.GaugeVec
)

func setupMetrics() {
Expand Down Expand Up @@ -42,8 +44,43 @@ func setupMetrics() {
},
)

scrapeTargetCost = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "datakit",
Subsystem: "input_kubernetesprometheus",
Name: "resource_scrape_cost_seconds",
Help: "The scrape cost in seconds",

Objectives: map[float64]float64{
0.5: 0.05,
0.9: 0.01,
0.99: 0.001,
},
},
[]string{
"role",
"name",
"url",
},
)

activeWorkerTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "datakit",
Subsystem: "input_kubernetesprometheus",
Name: "worker_number",
Help: "The number of the worker",
},
[]string{
"role",
"worker",
},
)

metrics.MustRegister(
collectPtsCounter,
scrapeTargetNumber,
scrapeTargetCost,
activeWorkerTasks,
)
}
18 changes: 11 additions & 7 deletions internal/plugins/inputs/kubernetesprometheus/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Node struct {
store cache.Store

instances []*Instance
scrape *scrapeWorker
scrape *scrapeManager
feeder dkio.Feeder
}

Expand All @@ -39,15 +39,15 @@ func NewNode(informerFactory informers.SharedInformerFactory, instances []*Insta
store: informer.Informer().GetStore(),

instances: instances,
scrape: newScrapeWorker(RoleNode),
scrape: newScrapeManager(RoleNode),
feeder: feeder,
}, nil
}

func (n *Node) Run(ctx context.Context) {
defer n.queue.ShutDown()

n.scrape.startWorker(ctx, maxConcurrent(nodeLocalFrom(ctx)))
n.scrape.run(ctx, maxConcurrent(nodeLocalFrom(ctx)))

n.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -125,6 +125,11 @@ func (n *Node) startScrape(ctx context.Context, key string, item *corev1.Node) {
return
}

feature := nodeFeature(item)
checkPausedFunc := func() bool {
return checkPaused(ctx, false /* not use election */)
}

for _, ins := range n.instances {
if !ins.validator.Matches("", item.Labels) {
continue
Expand All @@ -137,7 +142,6 @@ func (n *Node) startScrape(ctx context.Context, key string, item *corev1.Node) {

// record key
klog.Infof("added Node %s", key)
n.scrape.registerKey(key, nodeFeature(item))

cfg, err := pr.parsePromConfig(ins)
if err != nil {
Expand All @@ -159,18 +163,18 @@ func (n *Node) startScrape(ctx context.Context, key string, item *corev1.Node) {
opts = append(opts, tlsOpts...)
}

prom, err := newPromTarget(ctx, urlstr, interval, false /* not use election */, opts)
prom, err := newPromScraper(RoleNode, key, urlstr, interval, checkPausedFunc, opts)
if err != nil {
klog.Warnf("fail new prom %s for %s", urlstr, err)
continue
}

n.scrape.registerTarget(key, prom)
n.scrape.registerScrape(key, feature, prom)
}
}

func (n *Node) terminateScrape(key string) {
n.scrape.terminate(key)
n.scrape.terminateScrape(key)
}

func nodeFeature(item *corev1.Node) string {
Expand Down
18 changes: 11 additions & 7 deletions internal/plugins/inputs/kubernetesprometheus/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Pod struct {
store cache.Store

instances []*Instance
scrape *scrapeWorker
scrape *scrapeManager
feeder dkio.Feeder
}

Expand All @@ -39,15 +39,15 @@ func NewPod(informerFactory informers.SharedInformerFactory, instances []*Instan
store: informer.Informer().GetStore(),

instances: instances,
scrape: newScrapeWorker(RolePod),
scrape: newScrapeManager(RolePod),
feeder: feeder,
}, nil
}

func (p *Pod) Run(ctx context.Context) {
defer p.queue.ShutDown()

p.scrape.startWorker(ctx, maxConcurrent(nodeLocalFrom(ctx)))
p.scrape.run(ctx, maxConcurrent(nodeLocalFrom(ctx)))

p.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -125,6 +125,11 @@ func (p *Pod) startScrape(ctx context.Context, key string, item *corev1.Pod) {
return
}

feature := podFeature(item)
checkPausedFunc := func() bool {
return checkPaused(ctx, false /* not use election */)
}

for _, ins := range p.instances {
if !ins.validator.Matches(item.Namespace, item.Labels) {
continue
Expand All @@ -137,7 +142,6 @@ func (p *Pod) startScrape(ctx context.Context, key string, item *corev1.Pod) {

// record key
klog.Infof("added Pod %s", key)
p.scrape.registerKey(key, podFeature(item))

cfg, err := pr.parsePromConfig(ins)
if err != nil {
Expand All @@ -159,18 +163,18 @@ func (p *Pod) startScrape(ctx context.Context, key string, item *corev1.Pod) {
opts = append(opts, tlsOpts...)
}

prom, err := newPromTarget(ctx, urlstr, interval, false /* not use election */, opts)
prom, err := newPromScraper(RolePod, key, urlstr, interval, checkPausedFunc, opts)
if err != nil {
klog.Warnf("fail new prom %s for %s", urlstr, err)
continue
}

p.scrape.registerTarget(key, prom)
p.scrape.registerScrape(key, feature, prom)
}
}

func (p *Pod) terminateScrape(key string) {
p.scrape.terminate(key)
p.scrape.terminateScrape(key)
}

func podFeature(item *corev1.Pod) string {
Expand Down
74 changes: 44 additions & 30 deletions internal/plugins/inputs/kubernetesprometheus/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,80 @@
package kubernetesprometheus

import (
"context"
"os"
"sync/atomic"
"time"

"github.com/GuanceCloud/cliutils/point"
dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io"
iprom "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/prom"
)

type promTarget struct {
urlstr string
pm *iprom.Prom
type promScraper struct {
role, key string
urlstr string
pm *iprom.Prom

shouldScrape func() bool
lastTime time.Time
checkPaused func() bool
terminated atomic.Bool

interval time.Duration
lastTime time.Time
}

func newPromTarget(ctx context.Context, urlstr string, interval time.Duration, election bool, opts []iprom.PromOption) (*promTarget, error) {
func newPromScraper(
role Role,
key string,
urlstr string,
interval time.Duration,
checkPaused func() bool,
opts []iprom.PromOption,
) (*promScraper, error) {
var err error
p := promTarget{urlstr: urlstr}
p := promScraper{
role: string(role),
key: key,
urlstr: urlstr,
checkPaused: checkPaused,
interval: interval,
}

p.pm, err = iprom.NewProm(opts...)
if err != nil {
return nil, err
}

p.shouldScrape = func() bool {
if election {
paused, exists := pauseFrom(ctx)
if exists && paused {
return false
}
}
return &p, nil
}

if p.lastTime.IsZero() {
p.lastTime = time.Now()
return true
}
if time.Since(p.lastTime) < interval {
return false
}
func (p *promScraper) targetURL() string { return p.urlstr }
func (p *promScraper) isTerminated() bool { return p.terminated.Load() }
func (p *promScraper) markAsTerminated() { p.terminated.Store(true) }

func (p *promScraper) shouldScrape() bool {
if p.lastTime.IsZero() {
p.lastTime = time.Now()
return true
}

return &p, nil
if time.Since(p.lastTime) < p.interval {
return false
}
if p.checkPaused != nil {
paused := p.checkPaused()
return !paused
}
return true
}

func (p *promTarget) url() string { return p.urlstr }
func (p *promTarget) scrape() error {
if !p.shouldScrape() {
return nil
}
func (p *promScraper) scrape() error {
p.lastTime = time.Now()
_, err := p.pm.CollectFromHTTPV2(p.urlstr)
scrapeTargetCost.WithLabelValues(p.role, p.key, p.urlstr).Observe(float64(time.Since(p.lastTime)) / float64(time.Second))
return err
}

func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...iprom.PromOption) []iprom.PromOption {
name := string(role) + "::" + key
name := string(role) + "/" + key

callbackFn := func(pts []*point.Point) error {
if len(pts) == 0 {
Expand Down
Loading

0 comments on commit 63fa8fe

Please sign in to comment.