From 724b9010c7f505d7a0c2a2ffa053093da34e9130 Mon Sep 17 00:00:00 2001 From: liguozhuang Date: Wed, 28 Aug 2024 18:40:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20KubernetesPrometheus=20?= =?UTF-8?q?=E5=9C=A8=E7=89=B9=E6=AE=8A=E7=8E=AF=E5=A2=83=E4=B8=8B=E9=87=87?= =?UTF-8?q?=E9=9B=86=E9=87=8D=E5=A4=8D=E7=9A=84=E9=97=AE=E9=A2=98=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=87=87=E9=9B=86=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../doc/en/inputs/kubernetesprometheus.md | 10 ++ .../doc/zh/inputs/kubernetesprometheus.md | 10 ++ .../inputs/kubernetesprometheus/const.go | 9 +- .../inputs/kubernetesprometheus/endpoints.go | 56 ++++---- .../kubernetesprometheus/endpoints_parser.go | 8 +- .../inputs/kubernetesprometheus/input.go | 60 ++++++-- .../inputs/kubernetesprometheus/instance.go | 85 +++++------ .../inputs/kubernetesprometheus/metrics.go | 12 +- .../inputs/kubernetesprometheus/node.go | 56 ++++---- .../inputs/kubernetesprometheus/pod.go | 54 +++---- .../inputs/kubernetesprometheus/prom.go | 64 ++++++--- .../inputs/kubernetesprometheus/scrape.go | 134 ++++++++++++++++++ .../inputs/kubernetesprometheus/scraper.go | 61 -------- .../inputs/kubernetesprometheus/service.go | 84 ++++++----- .../inputs/kubernetesprometheus/utils.go | 70 +++++++++ 15 files changed, 490 insertions(+), 283 deletions(-) create mode 100644 internal/plugins/inputs/kubernetesprometheus/scrape.go delete mode 100644 internal/plugins/inputs/kubernetesprometheus/scraper.go create mode 100644 internal/plugins/inputs/kubernetesprometheus/utils.go diff --git a/internal/export/doc/en/inputs/kubernetesprometheus.md b/internal/export/doc/en/inputs/kubernetesprometheus.md index 75f9183ef4..b3885ac1f7 100644 --- a/internal/export/doc/en/inputs/kubernetesprometheus.md +++ b/internal/export/doc/en/inputs/kubernetesprometheus.md @@ -114,6 +114,16 @@ A brief description of how this collector operates helps in better understanding cert_key = "/opt/nginx/peer.key" ``` +Additionally, there is a type of global configuration, which is the highest-level configuration, mainly responsible for enabling or disabling certain features, such as `node_local` here: + +```yaml +[inputs.kubernetesprometheus] + node_local = true # Whether to enable NodeLocal mode, distributing the collection across nodes + + [[inputs.kubernetesprometheus.instances]] + # ..other +``` + ```markdown ???+ attention diff --git a/internal/export/doc/zh/inputs/kubernetesprometheus.md b/internal/export/doc/zh/inputs/kubernetesprometheus.md index 387ae7e9f7..b9254d4573 100644 --- a/internal/export/doc/zh/inputs/kubernetesprometheus.md +++ b/internal/export/doc/zh/inputs/kubernetesprometheus.md @@ -115,6 +115,16 @@ KubernetesPrometheus 是一个只能应用在 Kubernetes 的采集器,它根 cert_key = "/opt/nginx/peer.key" ``` +此外还有一类全局配置,它是最顶层的配置,主要负责一些功能开启或关闭,例如此处的 `node_local`: + +```yaml +[inputs.kubernetesprometheus] + node_local = true # 是否开启 NodeLocal 模式,将采集分散到各个节点 + + [[inputs.kubernetesprometheus.instances]] + # ..other +``` + ???+ attention diff --git a/internal/plugins/inputs/kubernetesprometheus/const.go b/internal/plugins/inputs/kubernetesprometheus/const.go index fb453a7b94..a76cd13038 100644 --- a/internal/plugins/inputs/kubernetesprometheus/const.go +++ b/internal/plugins/inputs/kubernetesprometheus/const.go @@ -6,26 +6,23 @@ package kubernetesprometheus import ( - "sync/atomic" - "github.com/GuanceCloud/cliutils/logger" "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" ) var ( inputName = "kubernetesprometheus" - klog = logger.DefaultSLogger("inputName") + klog = logger.DefaultSLogger(inputName) managerGo = datakit.G("kubernetesprometheus_manager") workerGo = datakit.G("kubernetesprometheus_worker") - - workerCounter = atomic.Int64{} - maxWorkerNum = int64(1000) ) const ( example = ` [inputs.kubernetesprometheus] + node_local = true + [[inputs.kubernetesprometheus.instances]] role = "node" namespaces = [] diff --git a/internal/plugins/inputs/kubernetesprometheus/endpoints.go b/internal/plugins/inputs/kubernetesprometheus/endpoints.go index 5c4373dd11..1b4380fcf6 100644 --- a/internal/plugins/inputs/kubernetesprometheus/endpoints.go +++ b/internal/plugins/inputs/kubernetesprometheus/endpoints.go @@ -26,8 +26,7 @@ type Endpoints struct { store cache.Store instances []*Instance - scraper *scraper - keys map[string]string + scrape *scrapeWorker feeder dkio.Feeder } @@ -42,8 +41,7 @@ func NewEndpoints(informerFactory informers.SharedInformerFactory, instances []* store: informer.Informer().GetStore(), instances: instances, - scraper: newScraper(), - keys: make(map[string]string), + scrape: newScrapeWorker(RoleEndpoints), feeder: feeder, }, nil } @@ -51,6 +49,8 @@ func NewEndpoints(informerFactory informers.SharedInformerFactory, instances []* func (e *Endpoints) Run(ctx context.Context) { defer e.queue.ShutDown() + e.scrape.startWorker(ctx, maxConcurrent(nodeLocalFrom(ctx))) + e.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { e.enqueue(obj) @@ -63,10 +63,11 @@ func (e *Endpoints) Run(ctx context.Context) { }, }) - go func() { + managerGo.Go(func(_ context.Context) error { for e.process(ctx) { } - }() + return nil + }) <-ctx.Done() } @@ -95,7 +96,7 @@ func (e *Endpoints) process(ctx context.Context) bool { if !exists { klog.Infof("deleted Endpoints %s", key) - e.terminateProms(key) + e.terminateScrape(key) return true } @@ -105,23 +106,24 @@ func (e *Endpoints) process(ctx context.Context) bool { return true } - info, exists := e.keys[key] - if exists && info == joinEndpointsInfo(ep) { + if e.scrape.matchesKey(key, endpointsFeature(ep)) { return true } klog.Infof("found endpoints %s", key) - e.terminateProms(key) - e.runProm(ctx, key, ep) + e.terminateScrape(key) + e.startScrape(ctx, key, ep) return true } -func (e *Endpoints) runProm(ctx context.Context, key string, item *corev1.Endpoints) { +func (e *Endpoints) startScrape(ctx context.Context, key string, item *corev1.Endpoints) { if shouldSkipEndpoints(item) { return } + nodeName, nodeNameExists := nodeNameFrom(ctx) + for _, ins := range e.instances { if !ins.validator.Matches(item.Namespace, item.Labels) { continue @@ -134,7 +136,7 @@ func (e *Endpoints) runProm(ctx context.Context, key string, item *corev1.Endpoi // record key klog.Infof("added Endpoints %s", key) - e.keys[key] = joinEndpointsInfo(item) + e.scrape.registerKey(key, endpointsFeature(item)) cfgs, err := pr.parsePromConfig(ins) if err != nil { @@ -144,6 +146,10 @@ func (e *Endpoints) runProm(ctx context.Context, key string, item *corev1.Endpoi interval := ins.Interval for _, cfg := range cfgs { + if nodeNameExists && cfg.nodeName != "" && cfg.nodeName != nodeName { + continue + } + opts := buildPromOptions( RoleEndpoints, key, e.feeder, iprom.WithMeasurementName(cfg.measurement), @@ -156,26 +162,24 @@ func (e *Endpoints) runProm(ctx context.Context, key string, item *corev1.Endpoi } urlstr := cfg.urlstr + election := cfg.nodeName == "" - workerInc(RoleEndpoints, key) - workerGo.Go(func(_ context.Context) error { - defer func() { - workerInc(RoleEndpoints, key) - }() + prom, err := newPromTarget(ctx, urlstr, interval, election, opts) + if err != nil { + klog.Warnf("fail new prom %s for %s", urlstr, err) + continue + } - e.scraper.runProm(ctx, key, urlstr, interval, opts) - return nil - }) + e.scrape.registerTarget(key, prom) } } } -func (e *Endpoints) terminateProms(key string) { - e.scraper.terminateProms(key) - delete(e.keys, key) +func (e *Endpoints) terminateScrape(key string) { + e.scrape.terminate(key) } -func joinEndpointsInfo(item *corev1.Endpoints) string { +func endpointsFeature(item *corev1.Endpoints) string { var ips []string for _, sub := range item.Subsets { for _, address := range sub.Addresses { @@ -186,5 +190,5 @@ func joinEndpointsInfo(item *corev1.Endpoints) string { } func shouldSkipEndpoints(item *corev1.Endpoints) bool { - return maxedOutClients() || len(item.Subsets) == 0 || len(item.Subsets[0].Addresses) == 0 + return len(item.Subsets) == 0 || len(item.Subsets[0].Addresses) == 0 } diff --git a/internal/plugins/inputs/kubernetesprometheus/endpoints_parser.go b/internal/plugins/inputs/kubernetesprometheus/endpoints_parser.go index a793d0aa2a..6753deda61 100644 --- a/internal/plugins/inputs/kubernetesprometheus/endpoints_parser.go +++ b/internal/plugins/inputs/kubernetesprometheus/endpoints_parser.go @@ -168,7 +168,7 @@ func (p *endpointsParser) parsePromConfig(ins *Instance) ([]*basePromConfig, err var configs []*basePromConfig for _, set := range p.item.Subsets { - for addressIdx := range set.Addresses { + for addressIdx, address := range set.Addresses { // length 5 oldElems := []string{ins.Scheme, ins.Address, ins.Port, ins.Path, ins.Measurement} newElems := deepCopySlice(oldElems) @@ -224,10 +224,16 @@ func (p *endpointsParser) parsePromConfig(ins *Instance) ([]*basePromConfig, err } } + nodeName := "" + if address.NodeName != nil { + nodeName = *address.NodeName + } + configs = append(configs, &basePromConfig{ urlstr: u.String(), measurement: measurement, tags: tags, + nodeName: nodeName, }) } } diff --git a/internal/plugins/inputs/kubernetesprometheus/input.go b/internal/plugins/inputs/kubernetesprometheus/input.go index c8be3c9c44..9ec5e01079 100644 --- a/internal/plugins/inputs/kubernetesprometheus/input.go +++ b/internal/plugins/inputs/kubernetesprometheus/input.go @@ -9,6 +9,8 @@ package kubernetesprometheus import ( "context" "fmt" + "sync" + "sync/atomic" "time" "github.com/GuanceCloud/cliutils/logger" @@ -19,13 +21,15 @@ import ( ) type Input struct { + NodeLocal bool `toml:"node_local"` InstanceManager chPause chan bool - pause bool + pause *atomic.Bool feeder dkio.Feeder + cancel context.CancelFunc - cancel context.CancelFunc + runOnce sync.Once } func (*Input) SampleConfig() string { return example } @@ -39,12 +43,17 @@ func (ipt *Input) Run() { klog = logger.SLogger("kubernetesprometheus") for { - if !ipt.pause { - if err := ipt.start(); err != nil { - klog.Warn(err) - } - } else { - ipt.stop() + // enable nodeLocal model or election success + if ipt.NodeLocal || !ipt.pause.Load() { + ipt.runOnce.Do( + func() { + managerGo.Go(func(_ context.Context) error { + if err := ipt.start(); err != nil { + klog.Warn(err) + } + return nil + }) + }) } select { @@ -53,8 +62,16 @@ func (ipt *Input) Run() { klog.Info("exit") return - case ipt.pause = <-ipt.chPause: - // nil + case pause := <-ipt.chPause: + ipt.pause.Store(pause) + + // disable nodeLocal model and election defeat + if !ipt.NodeLocal && pause { + ipt.stop() + ipt.runOnce = sync.Once{} // reset runOnce + } + default: + // next } } } @@ -67,21 +84,34 @@ func (ipt *Input) start() error { return err } + nodeName, err := getLocalNodeName() + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) ipt.cancel = cancel + if ipt.NodeLocal { + ctx = withNodeName(ctx, nodeName) + ctx = withNodeLocal(ctx, ipt.NodeLocal) + } + ctx = withPause(ctx, ipt.pause) + ipt.InstanceManager.Run(ctx, apiClient.Clientset, apiClient.InformerFactory, ipt.feeder) apiClient.InformerFactory.Start(ctx.Done()) apiClient.InformerFactory.WaitForCacheSync(ctx.Done()) <-ctx.Done() + klog.Info("end") return nil } func (ipt *Input) stop() { - ipt.cancel() - klog.Info("stop") + if ipt.cancel != nil { + ipt.cancel() + } } func (ipt *Input) Pause() error { @@ -108,8 +138,10 @@ func init() { //nolint:gochecknoinits setupMetrics() inputs.Add(inputName, func() inputs.Input { return &Input{ - chPause: make(chan bool, inputs.ElectionPauseChannelLength), - feeder: dkio.DefaultFeeder(), + NodeLocal: true, + chPause: make(chan bool, inputs.ElectionPauseChannelLength), + pause: &atomic.Bool{}, + feeder: dkio.DefaultFeeder(), } }) } diff --git a/internal/plugins/inputs/kubernetesprometheus/instance.go b/internal/plugins/inputs/kubernetesprometheus/instance.go index e4381efb97..2e77d2a61f 100644 --- a/internal/plugins/inputs/kubernetesprometheus/instance.go +++ b/internal/plugins/inputs/kubernetesprometheus/instance.go @@ -7,11 +7,11 @@ package kubernetesprometheus import ( "context" - "net" - "net/url" "regexp" + "sync/atomic" "time" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io" dknet "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/net" "k8s.io/client-go/informers" @@ -74,6 +74,8 @@ type ( urlstr string measurement string tags map[string]string + // Only used on Endpoints. + nodeName string } ) @@ -204,63 +206,48 @@ func (k keyMatcher) matches(str string) (matched bool, args []string) { return false, nil } -func buildURLWithParams(scheme, address, port, path, params string) (*url.URL, error) { - u := &url.URL{ - Scheme: scheme, - Host: address + ":" + port, - Path: path, +func maxConcurrent(nodeLocal bool) int { + if nodeLocal { + return datakit.AvailableCPUs + 1 } - if params != "" { - query, err := url.ParseQuery(params) - if err != nil { - return nil, err - } else { - u.RawQuery = query.Encode() - } - } - - if _, err := url.Parse(u.String()); err != nil { - return nil, err - } - return u, nil + return datakit.AvailableCPUs*3 + 1 } -func maxedOutClients() bool { - if x := workerCounter.Load(); x >= maxWorkerNum { - klog.Warnf("maxed out clients %s > %d, cannot create prom collection", x, maxWorkerNum) - return true - } - return false +type ctxKey int + +const ( + ctxKeyNodeName ctxKey = iota + 1 + ctxKeyNodeLocal + ctxKeyPause +) + +func withNodeName(ctx context.Context, nodeName string) context.Context { + return context.WithValue(ctx, ctxKeyNodeName, nodeName) } -func workerInc(role Role, key string) { - _ = workerCounter.Add(1) - forkedWorkerGauge.WithLabelValues(string(role), key).Inc() +func withNodeLocal(ctx context.Context, nodeLocal bool) context.Context { + return context.WithValue(ctx, ctxKeyNodeLocal, nodeLocal) } -func workerDec(role Role, key string) { - _ = workerCounter.Add(-1) - forkedWorkerGauge.WithLabelValues(string(role), key).Dec() +func withPause(ctx context.Context, pause *atomic.Bool) context.Context { + return context.WithValue(ctx, ctxKeyPause, pause) } -func splitHost(remote string) string { - host := remote +func nodeNameFrom(ctx context.Context) (string, bool) { + nodeName, ok := ctx.Value(ctxKeyNodeName).(string) + return nodeName, ok +} - // try get 'host' tag from remote URL. - if u, err := url.Parse(remote); err == nil && u.Host != "" { // like scheme://host:[port]/... - host = u.Host - if ip, _, err := net.SplitHostPort(u.Host); err == nil { - host = ip - } - } else { // not URL, only IP:Port - if ip, _, err := net.SplitHostPort(remote); err == nil { - host = ip - } - } +func nodeLocalFrom(ctx context.Context) bool { + nodeLocal, ok := ctx.Value(ctxKeyNodeLocal).(bool) + return nodeLocal && ok +} - if host == "localhost" || net.ParseIP(host).IsLoopback() { - return "" +func pauseFrom(ctx context.Context) (bool, bool) { + pause, ok := ctx.Value(ctxKeyPause).(*atomic.Bool) + if !ok { + return false, false } - - return host + p := pause.Load() + return p, true } diff --git a/internal/plugins/inputs/kubernetesprometheus/metrics.go b/internal/plugins/inputs/kubernetesprometheus/metrics.go index a5c133a4b2..3640f29c93 100644 --- a/internal/plugins/inputs/kubernetesprometheus/metrics.go +++ b/internal/plugins/inputs/kubernetesprometheus/metrics.go @@ -11,8 +11,8 @@ import ( ) var ( - collectPtsCounter *prometheus.CounterVec - forkedWorkerGauge *prometheus.GaugeVec + collectPtsCounter *prometheus.CounterVec + scrapeTargetNumber *prometheus.GaugeVec ) func setupMetrics() { @@ -29,12 +29,12 @@ func setupMetrics() { }, ) - forkedWorkerGauge = prometheus.NewGaugeVec( + scrapeTargetNumber = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "datakit", Subsystem: "input_kubernetesprometheus", - Name: "forked_worker_number", - Help: "The number of the worker", + Name: "resource_target_number", + Help: "The number of the target", }, []string{ "role", @@ -44,6 +44,6 @@ func setupMetrics() { metrics.MustRegister( collectPtsCounter, - forkedWorkerGauge, + scrapeTargetNumber, ) } diff --git a/internal/plugins/inputs/kubernetesprometheus/node.go b/internal/plugins/inputs/kubernetesprometheus/node.go index b95bd4e35a..c3542bf96d 100644 --- a/internal/plugins/inputs/kubernetesprometheus/node.go +++ b/internal/plugins/inputs/kubernetesprometheus/node.go @@ -24,8 +24,7 @@ type Node struct { store cache.Store instances []*Instance - scraper *scraper - keys map[string]string + scrape *scrapeWorker feeder dkio.Feeder } @@ -40,8 +39,7 @@ func NewNode(informerFactory informers.SharedInformerFactory, instances []*Insta store: informer.Informer().GetStore(), instances: instances, - scraper: newScraper(), - keys: make(map[string]string), + scrape: newScrapeWorker(RoleNode), feeder: feeder, }, nil } @@ -49,6 +47,8 @@ func NewNode(informerFactory informers.SharedInformerFactory, instances []*Insta func (n *Node) Run(ctx context.Context) { defer n.queue.ShutDown() + n.scrape.startWorker(ctx, maxConcurrent(nodeLocalFrom(ctx))) + n.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { n.enqueue(obj) @@ -61,10 +61,11 @@ func (n *Node) Run(ctx context.Context) { }, }) - go func() { + managerGo.Go(func(_ context.Context) error { for n.process(ctx) { } - }() + return nil + }) <-ctx.Done() } @@ -93,7 +94,7 @@ func (n *Node) process(ctx context.Context) bool { if !exists { klog.Infof("deleted Node %s", key) - n.terminateProms(key) + n.terminateScrape(key) return true } @@ -103,19 +104,23 @@ func (n *Node) process(ctx context.Context) bool { return true } - info, exists := n.keys[key] - if exists && info == joinNodeInfo(node) { + nodeName, exists := nodeNameFrom(ctx) + if exists && node.Name != nodeName { + return true + } + + if n.scrape.matchesKey(key, nodeFeature(node)) { return true } klog.Infof("found node %s", key) - n.terminateProms(key) - n.runProm(ctx, key, node) + n.terminateScrape(key) + n.startScrape(ctx, key, node) return true } -func (n *Node) runProm(ctx context.Context, key string, item *corev1.Node) { +func (n *Node) startScrape(ctx context.Context, key string, item *corev1.Node) { if shouldSkipNode(item) { return } @@ -132,7 +137,7 @@ func (n *Node) runProm(ctx context.Context, key string, item *corev1.Node) { // record key klog.Infof("added Node %s", key) - n.keys[key] = joinNodeInfo(item) + n.scrape.registerKey(key, nodeFeature(item)) cfg, err := pr.parsePromConfig(ins) if err != nil { @@ -154,23 +159,21 @@ func (n *Node) runProm(ctx context.Context, key string, item *corev1.Node) { opts = append(opts, tlsOpts...) } - workerInc(RoleNode, key) - workerGo.Go(func(_ context.Context) error { - defer func() { - workerDec(RoleNode, key) - }() - n.scraper.runProm(ctx, key, urlstr, interval, opts) - return nil - }) + prom, err := newPromTarget(ctx, urlstr, interval, false /* not use election */, opts) + if err != nil { + klog.Warnf("fail new prom %s for %s", urlstr, err) + continue + } + + n.scrape.registerTarget(key, prom) } } -func (n *Node) terminateProms(key string) { - n.scraper.terminateProms(key) - delete(n.keys, key) +func (n *Node) terminateScrape(key string) { + n.scrape.terminate(key) } -func joinNodeInfo(item *corev1.Node) string { +func nodeFeature(item *corev1.Node) string { internalIP := "" for _, address := range item.Status.Addresses { if address.Type == corev1.NodeInternalIP { @@ -181,9 +184,6 @@ func joinNodeInfo(item *corev1.Node) string { } func shouldSkipNode(item *corev1.Node) bool { - if maxedOutClients() { - return true - } for _, address := range item.Status.Addresses { if address.Type == corev1.NodeInternalIP && address.Address == "" { return true diff --git a/internal/plugins/inputs/kubernetesprometheus/pod.go b/internal/plugins/inputs/kubernetesprometheus/pod.go index faf669da70..bdc408d744 100644 --- a/internal/plugins/inputs/kubernetesprometheus/pod.go +++ b/internal/plugins/inputs/kubernetesprometheus/pod.go @@ -24,8 +24,7 @@ type Pod struct { store cache.Store instances []*Instance - scraper *scraper - keys map[string]string + scrape *scrapeWorker feeder dkio.Feeder } @@ -40,8 +39,7 @@ func NewPod(informerFactory informers.SharedInformerFactory, instances []*Instan store: informer.Informer().GetStore(), instances: instances, - scraper: newScraper(), - keys: make(map[string]string), + scrape: newScrapeWorker(RolePod), feeder: feeder, }, nil } @@ -49,6 +47,8 @@ func NewPod(informerFactory informers.SharedInformerFactory, instances []*Instan func (p *Pod) Run(ctx context.Context) { defer p.queue.ShutDown() + p.scrape.startWorker(ctx, maxConcurrent(nodeLocalFrom(ctx))) + p.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { p.enqueue(obj) @@ -61,10 +61,11 @@ func (p *Pod) Run(ctx context.Context) { }, }) - go func() { + managerGo.Go(func(_ context.Context) error { for p.process(ctx) { } - }() + return nil + }) <-ctx.Done() } @@ -93,7 +94,7 @@ func (p *Pod) process(ctx context.Context) bool { if !exists { klog.Infof("deleted Pod %s", key) - p.terminateProms(key) + p.terminateScrape(key) return true } @@ -103,19 +104,23 @@ func (p *Pod) process(ctx context.Context) bool { return true } - info, exists := p.keys[key] - if exists && info == joinPodInfo(pod) { + nodeName, exists := nodeNameFrom(ctx) + if exists && pod.Spec.NodeName != nodeName { + return true + } + + if p.scrape.matchesKey(key, podFeature(pod)) { return true } klog.Infof("found pod %s", key) - p.terminateProms(key) - p.runProm(ctx, key, pod) + p.terminateScrape(key) + p.startScrape(ctx, key, pod) return true } -func (p *Pod) runProm(ctx context.Context, key string, item *corev1.Pod) { +func (p *Pod) startScrape(ctx context.Context, key string, item *corev1.Pod) { if shouldSkipPod(item) { return } @@ -132,7 +137,7 @@ func (p *Pod) runProm(ctx context.Context, key string, item *corev1.Pod) { // record key klog.Infof("added Pod %s", key) - p.keys[key] = joinPodInfo(item) + p.scrape.registerKey(key, podFeature(item)) cfg, err := pr.parsePromConfig(ins) if err != nil { @@ -154,27 +159,24 @@ func (p *Pod) runProm(ctx context.Context, key string, item *corev1.Pod) { opts = append(opts, tlsOpts...) } - workerInc(RolePod, key) - workerGo.Go(func(_ context.Context) error { - defer func() { - workerInc(RolePod, key) - }() + prom, err := newPromTarget(ctx, urlstr, interval, false /* not use election */, opts) + if err != nil { + klog.Warnf("fail new prom %s for %s", urlstr, err) + continue + } - p.scraper.runProm(ctx, key, urlstr, interval, opts) - return nil - }) + p.scrape.registerTarget(key, prom) } } -func (p *Pod) terminateProms(key string) { - p.scraper.terminateProms(key) - delete(p.keys, key) +func (p *Pod) terminateScrape(key string) { + p.scrape.terminate(key) } -func joinPodInfo(item *corev1.Pod) string { +func podFeature(item *corev1.Pod) string { return item.Status.HostIP + ":" + item.Status.PodIP } func shouldSkipPod(item *corev1.Pod) bool { - return maxedOutClients() || item.Status.PodIP == "" || item.Status.Phase != corev1.PodRunning + return item.Status.PodIP == "" || item.Status.Phase != corev1.PodRunning } diff --git a/internal/plugins/inputs/kubernetesprometheus/prom.go b/internal/plugins/inputs/kubernetesprometheus/prom.go index ff14d0dce9..548caa129b 100644 --- a/internal/plugins/inputs/kubernetesprometheus/prom.go +++ b/internal/plugins/inputs/kubernetesprometheus/prom.go @@ -11,43 +11,61 @@ import ( "time" "github.com/GuanceCloud/cliutils/point" - "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io" iprom "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/prom" ) -func runPromCollect(ctx context.Context, interval time.Duration, urlstr string, opts []iprom.PromOption) error { - pm, err := iprom.NewProm(opts...) +type promTarget struct { + urlstr string + pm *iprom.Prom + + shouldScrape func() bool + lastTime time.Time +} + +func newPromTarget(ctx context.Context, urlstr string, interval time.Duration, election bool, opts []iprom.PromOption) (*promTarget, error) { + var err error + p := promTarget{urlstr: urlstr} + + p.pm, err = iprom.NewProm(opts...) if err != nil { - return err + return nil, err } - tick := time.NewTicker(interval) - defer tick.Stop() + p.shouldScrape = func() bool { + if election { + paused, exists := pauseFrom(ctx) + if exists && paused { + return false + } + } - for { - if _, err := pm.CollectFromHTTPV2(urlstr); err != nil { - klog.Warn(err) - } else { - klog.Debugf("collect once %s", urlstr) + if p.lastTime.IsZero() { + p.lastTime = time.Now() + return true + } + if time.Since(p.lastTime) < interval { + return false } - select { - case <-datakit.Exit.Wait(): - klog.Infof("prom %s exit", urlstr) - return nil + return true + } - case <-ctx.Done(): - klog.Debugf("prom %s stop", urlstr) - return nil + return &p, nil +} - case <-tick.C: - // next - } +func (p *promTarget) url() string { return p.urlstr } +func (p *promTarget) scrape() error { + if !p.shouldScrape() { + return nil } + _, err := p.pm.CollectFromHTTPV2(p.urlstr) + return err } func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...iprom.PromOption) []iprom.PromOption { + name := string(role) + "::" + key + callbackFn := func(pts []*point.Point) error { if len(pts) == 0 { return nil @@ -56,7 +74,7 @@ func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...iprom.P if err := feeder.FeedV2( point.Metric, pts, - dkio.WithInputName(inputName), + dkio.WithInputName(name), dkio.DisableGlobalTags(true), dkio.WithElection(true), ); err != nil { @@ -69,7 +87,7 @@ func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...iprom.P res := []iprom.PromOption{ iprom.WithLogger(klog), // WithLogger must in the first - iprom.WithSource(string(role) + "/" + key), + iprom.WithSource(name), iprom.WithMaxBatchCallback(1, callbackFn), } res = append(res, opts...) diff --git a/internal/plugins/inputs/kubernetesprometheus/scrape.go b/internal/plugins/inputs/kubernetesprometheus/scrape.go new file mode 100644 index 0000000000..fad3aea623 --- /dev/null +++ b/internal/plugins/inputs/kubernetesprometheus/scrape.go @@ -0,0 +1,134 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package kubernetesprometheus + +import ( + "context" + "sync" + "time" + + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" +) + +const workInterval = time.Second * 5 + +type scrapeTarget interface { + url() string + scrape() error +} + +type scrapeWorker struct { + role Role + keys map[string]string + targets map[string][]scrapeTarget + + targetChan chan scrapeTarget + mu sync.Mutex +} + +func newScrapeWorker(role Role) *scrapeWorker { + return &scrapeWorker{ + role: role, + keys: make(map[string]string), + targets: make(map[string][]scrapeTarget), + } +} + +func (s *scrapeWorker) startWorker(ctx context.Context, workerNum int) { + if s.targetChan == nil { + s.targetChan = make(chan scrapeTarget, workerNum) + } + + for i := 0; i < workerNum; i++ { + workerGo.Go(func(_ context.Context) error { + for target := range s.targetChan { + if err := target.scrape(); err != nil { + klog.Warn(err) + } + } + return nil + }) + } + + workerGo.Go(func(_ context.Context) error { + defer func() { + close(s.targetChan) + klog.Infof("role-%s worker exit", s.role) + }() + + tick := time.NewTicker(workInterval) + defer tick.Stop() + klog.Infof("role-%s worker start", s.role) + + for { + s.mu.Lock() + for _, targets := range s.targets { + for _, target := range targets { + select { + case <-datakit.Exit.Wait(): + return nil + + case <-ctx.Done(): + return nil + + default: + // next + } + s.targetChan <- target + } + } + s.mu.Unlock() + + select { + case <-datakit.Exit.Wait(): + return nil + + case <-ctx.Done(): + return nil + + case <-tick.C: + // next + } + } + }) +} + +func (s *scrapeWorker) registerKey(key, feature string) { + s.mu.Lock() + s.keys[key] = feature + s.mu.Unlock() +} + +func (s *scrapeWorker) matchesKey(key, feature string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + f, exists := s.keys[key] + return exists && f == feature +} + +func (s *scrapeWorker) registerTarget(key string, target scrapeTarget) { + s.mu.Lock() + defer s.mu.Unlock() + + s.targets[key] = append(s.targets[key], target) + klog.Infof("added prom url %s for %s, current len(%d)", target.url(), key, len(s.targets[key])) + scrapeTargetNumber.WithLabelValues(string(s.role), key).Add(1) +} + +func (s *scrapeWorker) terminate(key string) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.keys, key) + + if _, ok := s.targets[key]; ok { + length := len(s.targets[key]) + delete(s.targets, key) + klog.Infof("terminated prom len(%d) for key %s", length, key) + scrapeTargetNumber.WithLabelValues(string(s.role), key).Sub(float64(length)) + } +} diff --git a/internal/plugins/inputs/kubernetesprometheus/scraper.go b/internal/plugins/inputs/kubernetesprometheus/scraper.go deleted file mode 100644 index ac768acfe2..0000000000 --- a/internal/plugins/inputs/kubernetesprometheus/scraper.go +++ /dev/null @@ -1,61 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the MIT License. -// This product includes software developed at Guance Cloud (https://www.guance.com/). -// Copyright 2021-present Guance, Inc. - -package kubernetesprometheus - -import ( - "context" - "sync" - "time" - - iprom "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/prom" -) - -type scraper struct { - proms map[string]map[string]context.CancelFunc - mu sync.Mutex -} - -func newScraper() *scraper { - return &scraper{ - proms: make(map[string]map[string]context.CancelFunc), - } -} - -func (s *scraper) runProm(ctx context.Context, key, urlstr string, interval time.Duration, opts []iprom.PromOption) { - // lock - s.mu.Lock() - if _, exist := s.proms[key]; !exist { - s.proms[key] = make(map[string]context.CancelFunc) - } - ctx, cancel := context.WithCancel(ctx) - s.proms[key][urlstr] = cancel - length := len(s.proms[key]) - // unlock - s.mu.Unlock() - - klog.Infof("create prom url %s for %s, interval %s, current len(%d)", urlstr, key, interval, length) - - if err := runPromCollect(ctx, interval, urlstr, opts); err != nil { - klog.Warnf("failed of prom %s", err) - } -} - -func (s *scraper) terminateProms(key string) { - s.mu.Lock() - defer s.mu.Unlock() - - cancels, exist := s.proms[key] - if !exist { - return - } - - klog.Infof("terminate prom len(%d) from key %s", len(cancels), key) - - for _, cancel := range cancels { - cancel() - } - delete(s.proms, key) -} diff --git a/internal/plugins/inputs/kubernetesprometheus/service.go b/internal/plugins/inputs/kubernetesprometheus/service.go index 9cd5e5378c..cba19bf389 100644 --- a/internal/plugins/inputs/kubernetesprometheus/service.go +++ b/internal/plugins/inputs/kubernetesprometheus/service.go @@ -29,9 +29,7 @@ type Service struct { store cache.Store instances []*Instance - scraper *scraper - keys map[string]string - runners map[string]context.CancelFunc + scrape *scrapeWorker feeder dkio.Feeder } @@ -52,9 +50,7 @@ func NewService( store: informer.Informer().GetStore(), instances: instances, - scraper: newScraper(), - keys: make(map[string]string), - runners: make(map[string]context.CancelFunc), + scrape: newScrapeWorker(RoleService), feeder: feeder, }, nil } @@ -62,6 +58,8 @@ func NewService( func (s *Service) Run(ctx context.Context) { defer s.queue.ShutDown() + s.scrape.startWorker(ctx, maxConcurrent(nodeLocalFrom(ctx))) + s.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { s.enqueue(obj) @@ -74,10 +72,11 @@ func (s *Service) Run(ctx context.Context) { }, }) - go func() { + managerGo.Go(func(_ context.Context) error { for s.process(ctx) { } - }() + return nil + }) <-ctx.Done() } @@ -106,7 +105,7 @@ func (s *Service) process(ctx context.Context) bool { if !exists { klog.Infof("deleted Service %s", key) - s.terminateRunner(key) + s.terminateScrape(key) return true } @@ -116,23 +115,25 @@ func (s *Service) process(ctx context.Context) bool { return true } - info, exists := s.keys[key] - if exists && info == joinServiceInfo(svc) { + if s.scrape.matchesKey(key, serviceFeature(svc)) { return true } klog.Infof("found service %s", key) - s.terminateRunner(key) - s.startRunner(ctx, key, svc) + s.terminateScrape(key) + s.startScrape(ctx, key, svc) return true } -func (s *Service) startRunner(ctx context.Context, key string, item *corev1.Service) { +func (s *Service) startScrape(ctx context.Context, key string, item *corev1.Service) { if shouldSkipService(item) { return } + nodeName, nodeNameExist := nodeNameFrom(ctx) + svcFeature := serviceFeature(item) + for _, ins := range s.instances { if !ins.validator.Matches(item.Namespace, item.Labels) { continue @@ -145,29 +146,27 @@ func (s *Service) startRunner(ctx context.Context, key string, item *corev1.Serv // record key klog.Infof("added Service %s", key) - s.keys[key] = joinServiceInfo(item) - - ctx, cancel := context.WithCancel(ctx) - s.runners[key] = cancel + s.scrape.registerKey(key, svcFeature) epIns := pr.transToEndpointsInstance(ins) - interval := ins.Interval managerGo.Go(func(_ context.Context) error { - tick := time.NewTicker(time.Second * 10) + tick := time.NewTicker(time.Second * 20) defer tick.Stop() - var epInfo string + var oldEndpointsFeature string for { select { case <-datakit.Exit.Wait(): klog.Info("svc-ep prom exit") return nil + case <-ctx.Done(): klog.Info("svc-ep return") return nil + case <-tick.C: // next } @@ -179,14 +178,15 @@ func (s *Service) startRunner(ctx context.Context, key string, item *corev1.Serv continue } - info := joinEndpointsInfo(ep) - if epInfo == info { + newEndpointsFeature := endpointsFeature(ep) + if newEndpointsFeature == oldEndpointsFeature { // no change continue } - // set epInfo - epInfo = info - s.scraper.terminateProms(key) + // reset oldEndpointsFeature + oldEndpointsFeature = newEndpointsFeature + s.scrape.terminate(key) + s.scrape.registerKey(key, svcFeature) pr := newEndpointsParser(ep) cfgs, err := pr.parsePromConfig(epIns) @@ -196,6 +196,10 @@ func (s *Service) startRunner(ctx context.Context, key string, item *corev1.Serv } for _, cfg := range cfgs { + if nodeNameExist && cfg.nodeName != "" && cfg.nodeName != nodeName { + continue + } + opts := buildPromOptions( RoleService, key, s.feeder, iprom.WithMeasurementName(cfg.measurement), @@ -208,35 +212,29 @@ func (s *Service) startRunner(ctx context.Context, key string, item *corev1.Serv } urlstr := cfg.urlstr + election := cfg.nodeName == "" - workerInc(RoleService, key) - workerGo.Go(func(_ context.Context) error { - defer func() { - workerDec(RoleService, key) - }() + prom, err := newPromTarget(ctx, urlstr, interval, election, opts) + if err != nil { + klog.Warnf("fail new prom %s for %s", cfg.urlstr, err) + continue + } - s.scraper.runProm(ctx, key, urlstr, interval, opts) - return nil - }) + s.scrape.registerTarget(key, prom) } } }) } } -func (s *Service) terminateRunner(key string) { - if cancel, exist := s.runners[key]; exist { - cancel() - delete(s.runners, key) - } - s.scraper.terminateProms(key) - delete(s.keys, key) +func (s *Service) terminateScrape(key string) { + s.scrape.terminate(key) } -func joinServiceInfo(item *corev1.Service) string { +func serviceFeature(item *corev1.Service) string { return item.Spec.ClusterIP } func shouldSkipService(_ *corev1.Service) bool { - return maxedOutClients() + return false } diff --git a/internal/plugins/inputs/kubernetesprometheus/utils.go b/internal/plugins/inputs/kubernetesprometheus/utils.go new file mode 100644 index 0000000000..da3b831998 --- /dev/null +++ b/internal/plugins/inputs/kubernetesprometheus/utils.go @@ -0,0 +1,70 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package kubernetesprometheus + +import ( + "fmt" + "net" + "net/url" + "os" +) + +func buildURLWithParams(scheme, address, port, path, params string) (*url.URL, error) { + u := &url.URL{ + Scheme: scheme, + Host: address + ":" + port, + Path: path, + } + if params != "" { + query, err := url.ParseQuery(params) + if err != nil { + return nil, err + } else { + u.RawQuery = query.Encode() + } + } + + if _, err := url.Parse(u.String()); err != nil { + return nil, err + } + return u, nil +} + +func splitHost(remote string) string { + host := remote + + // try get 'host' tag from remote URL. + if u, err := url.Parse(remote); err == nil && u.Host != "" { // like scheme://host:[port]/... + host = u.Host + if ip, _, err := net.SplitHostPort(u.Host); err == nil { + host = ip + } + } else { // not URL, only IP:Port + if ip, _, err := net.SplitHostPort(remote); err == nil { + host = ip + } + } + + if host == "localhost" || net.ParseIP(host).IsLoopback() { + return "" + } + + return host +} + +func getLocalNodeName() (string, error) { + var e string + if os.Getenv("NODE_NAME") != "" { + e = os.Getenv("NODE_NAME") + } + if os.Getenv("ENV_K8S_NODE_NAME") != "" { + e = os.Getenv("ENV_K8S_NODE_NAME") + } + if e != "" { + return e, nil + } + return "", fmt.Errorf("invalid ENV_K8S_NODE_NAME environment, cannot be empty") +}