Skip to content

Commit

Permalink
Prom v2
Browse files Browse the repository at this point in the history
  • Loading branch information
谭彪 committed Oct 18, 2024
1 parent f180eea commit c2f4c6b
Show file tree
Hide file tree
Showing 11 changed files with 361 additions and 21 deletions.
12 changes: 7 additions & 5 deletions internal/httpcli/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,12 @@ func (s *httpClientTraceStat) Trace() *httptrace.ClientTrace {
}

// NewHTTPClientTraceStat create a hook for HTTP client running metrics.
func NewHTTPClientTraceStat(from string) *httpClientTraceStat {
func NewHTTPClientTraceStat(from string, remote string) *httpClientTraceStat {
s := &httpClientTraceStat{
from: from,
from: from,
remoteAddr: remote,
}

s.addTrace()

return s
}

Expand All @@ -261,6 +260,7 @@ func (s *httpClientTraceStat) Metrics() {
httpClientGotFirstResponseByteCost.WithLabelValues(s.from).Observe(float64(s.ttfb) / float64(time.Second))

httpClientConnIdleTime.WithLabelValues(s.from).Observe(float64(s.idleTime) / float64(time.Second))

if s.reuseConn {
httpClientTCPConn.WithLabelValues(s.from, s.remoteAddr, "reused").Add(1)
} else {
Expand All @@ -278,7 +278,9 @@ func (s *httpClientTraceStat) addTrace() {
s.reuseConn = ci.Reused
s.idle = ci.WasIdle
s.idleTime = ci.IdleTime
s.remoteAddr = ci.Conn.RemoteAddr().String()
if s.remoteAddr == "" {
s.remoteAddr = ci.Conn.RemoteAddr().String()
}
},

DNSStart: func(httptrace.DNSStartInfo) { s.dnsStart = time.Now() },
Expand Down
2 changes: 1 addition & 1 deletion internal/io/dataway/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (ep *endPoint) doSendReq(req *http.Request) (*http.Response, error) {
}()

if ep.httpTrace {
s := httpcli.NewHTTPClientTraceStat("dataway")
s := httpcli.NewHTTPClientTraceStat("dataway", "")
defer s.Metrics()

req = req.WithContext(httptrace.WithClientTrace(req.Context(), s.Trace()))
Expand Down
1 change: 1 addition & 0 deletions internal/plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import (
_ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/prom"
_ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/promremote"
_ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/promtail"
_ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/promv2"
_ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/proxy"
_ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/pushgateway"
_ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/pythond"
Expand Down
9 changes: 5 additions & 4 deletions internal/plugins/inputs/kubernetesprometheus/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func (p *promScraper) scrape() error {
}

func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...promscrape.Option) []promscrape.Option {
name := string(role) + "/" + key
source := "kubernetesprometheus/" + string(role)
remote := key

callbackFn := func(pts []*point.Point) error {
if len(pts) == 0 {
Expand All @@ -96,7 +97,7 @@ func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...promscr
if err := feeder.FeedV2(
point.Metric,
pts,
dkio.WithInputName(name),
dkio.WithInputName(source),
dkio.DisableGlobalTags(true),
dkio.WithElection(true),
); err != nil {
Expand All @@ -108,8 +109,8 @@ func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...promscr
}

res := []promscrape.Option{
// promscrape.WithLogger(klog), // WithLogger must in the first
promscrape.WithSource(name),
promscrape.WithSource(source),
promscrape.WithRemote(remote),
promscrape.WithCallback(callbackFn),
}
res = append(res, opts...)
Expand Down
44 changes: 44 additions & 0 deletions internal/plugins/inputs/promv2/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT License.
// This product includes software developed at Guance Cloud (https://www.guance.com/).
// Copyright 2021-present Guance, Inc.

package promv2

const sampleConfig = `
[[inputs.promv2]]
## Collector alias.
source = "prom"
urls = ["http://127.0.0.1:9100/metrics", "http://127.0.0.1:9200/metrics"]
## (Optional) Collect interval: (defaults to "30s").
interval = "30s"
## Measurement name.
## If measurement_name is empty, split metric name by '_', the first field after split as measurement set name, the rest as current metric name.
## If measurement_name is not empty, using this as measurement set name.
measurement_name = ""
## Keep Exist Metric Name
## If the keep_exist_metric_name is true, keep the raw value for field names.
keep_exist_metric_name = false
## TLS config
# insecure_skip_verify = true
## Following ca_certs/cert/cert_key are optional, if insecure_skip_verify = true.
# ca_certs = ["/opt/tls/ca.crt"]
# cert = "/opt/tls/client.root.crt"
# cert_key = "/opt/tls/client.root.key"
## Set to 'true' to enable election.
election = true
## Add HTTP headers to data pulling (Example basic authentication).
# [inputs.promv2.http_headers]
# Authorization = ""
[inputs.promv2.tags]
# some_tag = "some_value"
# more_tag = "some_other_value"
`
238 changes: 238 additions & 0 deletions internal/plugins/inputs/promv2/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT License.
// This product includes software developed at Guance Cloud (https://www.guance.com/).
// Copyright 2021-present Guance, Inc.

// Package promv2 scrape prometheus exporter metrics.
package promv2

import (
"fmt"
"net/url"
"time"

"github.com/GuanceCloud/cliutils/logger"
"github.com/GuanceCloud/cliutils/point"

"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit"
dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io"
dknet "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/net"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs"
"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/promscrape"
)

var _ inputs.ElectionInput = (*Input)(nil)

const (
inputName = "promv2"
)

type Input struct {
Source string `toml:"source"`
MeasurementName string `toml:"measurement_name"`

URL string `toml:"url"`
endpoint *url.URL // parsed URL

KeepExistMetricName bool `toml:"keep_exist_metric_name"`
DisableInstanceTag bool `toml:"disable_instance_tag"`

dknet.TLSClientConfig

HTTPHeaders map[string]string `toml:"http_headers"`
Tags map[string]string `toml:"tags"`

Interval time.Duration `toml:"interval"`
Election bool `toml:"election"`

chPause chan bool
pause bool

Feeder dkio.Feeder
Tagger datakit.GlobalTagger

lastStart time.Time

scraper *promscrape.PromScraper
count int
log *logger.Logger
}

func (*Input) SampleConfig() string { return sampleConfig }

func (*Input) SampleMeasurement() []inputs.Measurement { return nil }

func (*Input) AvailableArchs() []string { return datakit.AllOSWithElection }

func (*Input) Catalog() string { return "prom" }

func (ipt *Input) ElectionEnabled() bool {
return ipt.Election
}

func (ipt *Input) Run() {
ipt.log = logger.SLogger(inputName + "/" + ipt.Source)
ipt.log.Info("start")

if err := ipt.setup(); err != nil {
ipt.log.Infof("init failure: %s", err)
ipt.log.Info("exit")
return
}

tick := time.NewTicker(ipt.Interval)
defer tick.Stop()

for {
if !ipt.pause {
ipt.scrape()
}

select {
case <-datakit.Exit.Wait():
ipt.log.Info("prom exit")
return
case ipt.pause = <-ipt.chPause:
// nil
case <-tick.C:
// next
}
}
}

func (ipt *Input) setup() error {
// parse url
u, err := url.Parse(ipt.URL)
if err != nil {
return err
}
ipt.endpoint = u

tags := make(map[string]string)

// on production, ipt.Tagger is empty, but we may add testing tagger here.
var globalTags map[string]string
if ipt.Election {
globalTags = ipt.Tagger.ElectionTags()
ipt.log.Infof("add global election tags %q", globalTags)
} else {
globalTags = ipt.Tagger.HostTags()
ipt.log.Infof("add global host tags %q", globalTags)
}

mergedTags := inputs.MergeTags(globalTags, ipt.Tags, ipt.URL)
for k, v := range mergedTags {
tags[k] = v
}

// set instance tag.
// The `instance' tag should not override.
if !ipt.DisableInstanceTag {
if _, ok := mergedTags["instance"]; !ok {
tags["instance"] = u.Host
}
}

opts := []promscrape.Option{
promscrape.WithSource("promv2/" + ipt.Source),
promscrape.WithMeasurement(ipt.MeasurementName),
promscrape.WithExtraTags(tags),
promscrape.KeepExistMetricName(ipt.KeepExistMetricName),
promscrape.WithCallback(ipt.callback),
}

if len(ipt.CaCerts) != 0 ||
ipt.CertKey != "" ||
ipt.Cert != "" ||
ipt.InsecureSkipVerify {
opts = append(opts,
promscrape.WithTLSOpen(true),
promscrape.WithCacertFiles(ipt.CaCerts),
promscrape.WithCertFile(ipt.Cert),
promscrape.WithKeyFile(ipt.CertKey),
promscrape.WithInsecureSkipVerify(ipt.InsecureSkipVerify),
)
}

ps, err := promscrape.NewPromScraper(opts...)
if err != nil {
return err
}

ipt.scraper = ps

return nil
}

func (ipt *Input) scrape() {
ipt.lastStart = time.Now()
// reset count
ipt.count = 0

if err := ipt.scraper.ScrapeURL(ipt.URL); err != nil {
ipt.log.Warn(err)
}

scrapeTotal.WithLabelValues(ipt.Source,
fmt.Sprintf(":%s%s", ipt.endpoint.Port(), ipt.endpoint.Path)).Observe(float64(ipt.count))
}

func (ipt *Input) callback(pts []*point.Point) error {
if len(pts) == 0 {
return nil
}

cost := time.Since(ipt.lastStart)

if err := ipt.Feeder.FeedV2(
point.Metric,
pts,
dkio.WithCollectCost(cost),
dkio.WithInputName(inputName+"/"+ipt.Source),
dkio.WithElection(ipt.Election),
); err != nil {
ipt.log.Warnf("failed to feed prom metrics: %s", err)
}
ipt.count += len(pts)

return nil
}

func (ipt *Input) Pause() error {
tick := time.NewTicker(inputs.ElectionPauseTimeout)
select {
case ipt.chPause <- true:
return nil
case <-tick.C:
return fmt.Errorf("pause %s failed", inputName)
}
}

func (ipt *Input) Resume() error {
tick := time.NewTicker(inputs.ElectionResumeTimeout)
select {
case ipt.chPause <- false:
return nil
case <-tick.C:
return fmt.Errorf("resume %s failed", inputName)
}
}

func newProm() *Input {
return &Input{
Source: "not-set",
pause: false,
chPause: make(chan bool, inputs.ElectionPauseChannelLength),
Tags: make(map[string]string),
Feeder: dkio.DefaultFeeder(),
Tagger: datakit.DefaultGlobalTagger(),
Interval: time.Second * 30,
}
}

func init() { //nolint:gochecknoinits
setupMetrics()
inputs.Add(inputName, func() inputs.Input {
return newProm()
})
}
Loading

0 comments on commit c2f4c6b

Please sign in to comment.