diff --git a/internal/httpcli/http.go b/internal/httpcli/http.go index 8b53405c34..7f168d6d56 100644 --- a/internal/httpcli/http.go +++ b/internal/httpcli/http.go @@ -244,13 +244,12 @@ func (s *httpClientTraceStat) Trace() *httptrace.ClientTrace { } // NewHTTPClientTraceStat create a hook for HTTP client running metrics. -func NewHTTPClientTraceStat(from string) *httpClientTraceStat { +func NewHTTPClientTraceStat(from string, remote string) *httpClientTraceStat { s := &httpClientTraceStat{ - from: from, + from: from, + remoteAddr: remote, } - s.addTrace() - return s } @@ -261,6 +260,7 @@ func (s *httpClientTraceStat) Metrics() { httpClientGotFirstResponseByteCost.WithLabelValues(s.from).Observe(float64(s.ttfb) / float64(time.Second)) httpClientConnIdleTime.WithLabelValues(s.from).Observe(float64(s.idleTime) / float64(time.Second)) + if s.reuseConn { httpClientTCPConn.WithLabelValues(s.from, s.remoteAddr, "reused").Add(1) } else { @@ -278,7 +278,9 @@ func (s *httpClientTraceStat) addTrace() { s.reuseConn = ci.Reused s.idle = ci.WasIdle s.idleTime = ci.IdleTime - s.remoteAddr = ci.Conn.RemoteAddr().String() + if s.remoteAddr == "" { + s.remoteAddr = ci.Conn.RemoteAddr().String() + } }, DNSStart: func(httptrace.DNSStartInfo) { s.dnsStart = time.Now() }, diff --git a/internal/io/dataway/endpoint.go b/internal/io/dataway/endpoint.go index c8878c7225..c28c6c0d75 100644 --- a/internal/io/dataway/endpoint.go +++ b/internal/io/dataway/endpoint.go @@ -615,7 +615,7 @@ func (ep *endPoint) doSendReq(req *http.Request) (*http.Response, error) { }() if ep.httpTrace { - s := httpcli.NewHTTPClientTraceStat("dataway") + s := httpcli.NewHTTPClientTraceStat("dataway", "") defer s.Metrics() req = req.WithContext(httptrace.WithClientTrace(req.Context(), s.Trace())) diff --git a/internal/plugins/inputs/all/all.go b/internal/plugins/inputs/all/all.go index ba25ba0c18..b3771295ce 100644 --- a/internal/plugins/inputs/all/all.go +++ b/internal/plugins/inputs/all/all.go @@ -82,6 +82,7 @@ import ( _ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/prom" _ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/promremote" _ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/promtail" + _ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/promv2" _ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/proxy" _ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/pushgateway" _ "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/pythond" diff --git a/internal/plugins/inputs/kubernetesprometheus/prom.go b/internal/plugins/inputs/kubernetesprometheus/prom.go index 1563b6fcdf..d9a63e7dc4 100644 --- a/internal/plugins/inputs/kubernetesprometheus/prom.go +++ b/internal/plugins/inputs/kubernetesprometheus/prom.go @@ -86,7 +86,8 @@ func (p *promScraper) scrape() error { } func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...promscrape.Option) []promscrape.Option { - name := string(role) + "/" + key + source := "kubernetesprometheus/" + string(role) + remote := key callbackFn := func(pts []*point.Point) error { if len(pts) == 0 { @@ -96,7 +97,7 @@ func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...promscr if err := feeder.FeedV2( point.Metric, pts, - dkio.WithInputName(name), + dkio.WithInputName(source), dkio.DisableGlobalTags(true), dkio.WithElection(true), ); err != nil { @@ -108,8 +109,8 @@ func buildPromOptions(role Role, key string, feeder dkio.Feeder, opts ...promscr } res := []promscrape.Option{ - // promscrape.WithLogger(klog), // WithLogger must in the first - promscrape.WithSource(name), + promscrape.WithSource(source), + promscrape.WithRemote(remote), promscrape.WithCallback(callbackFn), } res = append(res, opts...) diff --git a/internal/plugins/inputs/promv2/const.go b/internal/plugins/inputs/promv2/const.go new file mode 100644 index 0000000000..b70b4c4110 --- /dev/null +++ b/internal/plugins/inputs/promv2/const.go @@ -0,0 +1,44 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package promv2 + +const sampleConfig = ` +[[inputs.promv2]] + ## Collector alias. + source = "prom" + + urls = ["http://127.0.0.1:9100/metrics", "http://127.0.0.1:9200/metrics"] + + ## (Optional) Collect interval: (defaults to "30s"). + interval = "30s" + + ## Measurement name. + ## If measurement_name is empty, split metric name by '_', the first field after split as measurement set name, the rest as current metric name. + ## If measurement_name is not empty, using this as measurement set name. + measurement_name = "" + + ## Keep Exist Metric Name + ## If the keep_exist_metric_name is true, keep the raw value for field names. + keep_exist_metric_name = false + + ## TLS config + # insecure_skip_verify = true + ## Following ca_certs/cert/cert_key are optional, if insecure_skip_verify = true. + # ca_certs = ["/opt/tls/ca.crt"] + # cert = "/opt/tls/client.root.crt" + # cert_key = "/opt/tls/client.root.key" + + ## Set to 'true' to enable election. + election = true + + ## Add HTTP headers to data pulling (Example basic authentication). + # [inputs.promv2.http_headers] + # Authorization = "" + + [inputs.promv2.tags] + # some_tag = "some_value" + # more_tag = "some_other_value" +` diff --git a/internal/plugins/inputs/promv2/input.go b/internal/plugins/inputs/promv2/input.go new file mode 100644 index 0000000000..b08f490ba5 --- /dev/null +++ b/internal/plugins/inputs/promv2/input.go @@ -0,0 +1,238 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +// Package promv2 scrape prometheus exporter metrics. +package promv2 + +import ( + "fmt" + "net/url" + "time" + + "github.com/GuanceCloud/cliutils/logger" + "github.com/GuanceCloud/cliutils/point" + + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/datakit" + dkio "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/io" + dknet "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/net" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/promscrape" +) + +var _ inputs.ElectionInput = (*Input)(nil) + +const ( + inputName = "promv2" +) + +type Input struct { + Source string `toml:"source"` + MeasurementName string `toml:"measurement_name"` + + URL string `toml:"url"` + endpoint *url.URL // parsed URL + + KeepExistMetricName bool `toml:"keep_exist_metric_name"` + DisableInstanceTag bool `toml:"disable_instance_tag"` + + dknet.TLSClientConfig + + HTTPHeaders map[string]string `toml:"http_headers"` + Tags map[string]string `toml:"tags"` + + Interval time.Duration `toml:"interval"` + Election bool `toml:"election"` + + chPause chan bool + pause bool + + Feeder dkio.Feeder + Tagger datakit.GlobalTagger + + lastStart time.Time + + scraper *promscrape.PromScraper + count int + log *logger.Logger +} + +func (*Input) SampleConfig() string { return sampleConfig } + +func (*Input) SampleMeasurement() []inputs.Measurement { return nil } + +func (*Input) AvailableArchs() []string { return datakit.AllOSWithElection } + +func (*Input) Catalog() string { return "prom" } + +func (ipt *Input) ElectionEnabled() bool { + return ipt.Election +} + +func (ipt *Input) Run() { + ipt.log = logger.SLogger(inputName + "/" + ipt.Source) + ipt.log.Info("start") + + if err := ipt.setup(); err != nil { + ipt.log.Infof("init failure: %s", err) + ipt.log.Info("exit") + return + } + + tick := time.NewTicker(ipt.Interval) + defer tick.Stop() + + for { + if !ipt.pause { + ipt.scrape() + } + + select { + case <-datakit.Exit.Wait(): + ipt.log.Info("prom exit") + return + case ipt.pause = <-ipt.chPause: + // nil + case <-tick.C: + // next + } + } +} + +func (ipt *Input) setup() error { + // parse url + u, err := url.Parse(ipt.URL) + if err != nil { + return err + } + ipt.endpoint = u + + tags := make(map[string]string) + + // on production, ipt.Tagger is empty, but we may add testing tagger here. + var globalTags map[string]string + if ipt.Election { + globalTags = ipt.Tagger.ElectionTags() + ipt.log.Infof("add global election tags %q", globalTags) + } else { + globalTags = ipt.Tagger.HostTags() + ipt.log.Infof("add global host tags %q", globalTags) + } + + mergedTags := inputs.MergeTags(globalTags, ipt.Tags, ipt.URL) + for k, v := range mergedTags { + tags[k] = v + } + + // set instance tag. + // The `instance' tag should not override. + if !ipt.DisableInstanceTag { + if _, ok := mergedTags["instance"]; !ok { + tags["instance"] = u.Host + } + } + + opts := []promscrape.Option{ + promscrape.WithSource("promv2/" + ipt.Source), + promscrape.WithMeasurement(ipt.MeasurementName), + promscrape.WithExtraTags(tags), + promscrape.KeepExistMetricName(ipt.KeepExistMetricName), + promscrape.WithCallback(ipt.callback), + } + + if len(ipt.CaCerts) != 0 || + ipt.CertKey != "" || + ipt.Cert != "" || + ipt.InsecureSkipVerify { + opts = append(opts, + promscrape.WithTLSOpen(true), + promscrape.WithCacertFiles(ipt.CaCerts), + promscrape.WithCertFile(ipt.Cert), + promscrape.WithKeyFile(ipt.CertKey), + promscrape.WithInsecureSkipVerify(ipt.InsecureSkipVerify), + ) + } + + ps, err := promscrape.NewPromScraper(opts...) + if err != nil { + return err + } + + ipt.scraper = ps + + return nil +} + +func (ipt *Input) scrape() { + ipt.lastStart = time.Now() + // reset count + ipt.count = 0 + + if err := ipt.scraper.ScrapeURL(ipt.URL); err != nil { + ipt.log.Warn(err) + } + + scrapeTotal.WithLabelValues(ipt.Source, + fmt.Sprintf(":%s%s", ipt.endpoint.Port(), ipt.endpoint.Path)).Observe(float64(ipt.count)) +} + +func (ipt *Input) callback(pts []*point.Point) error { + if len(pts) == 0 { + return nil + } + + cost := time.Since(ipt.lastStart) + + if err := ipt.Feeder.FeedV2( + point.Metric, + pts, + dkio.WithCollectCost(cost), + dkio.WithInputName(inputName+"/"+ipt.Source), + dkio.WithElection(ipt.Election), + ); err != nil { + ipt.log.Warnf("failed to feed prom metrics: %s", err) + } + ipt.count += len(pts) + + return nil +} + +func (ipt *Input) Pause() error { + tick := time.NewTicker(inputs.ElectionPauseTimeout) + select { + case ipt.chPause <- true: + return nil + case <-tick.C: + return fmt.Errorf("pause %s failed", inputName) + } +} + +func (ipt *Input) Resume() error { + tick := time.NewTicker(inputs.ElectionResumeTimeout) + select { + case ipt.chPause <- false: + return nil + case <-tick.C: + return fmt.Errorf("resume %s failed", inputName) + } +} + +func newProm() *Input { + return &Input{ + Source: "not-set", + pause: false, + chPause: make(chan bool, inputs.ElectionPauseChannelLength), + Tags: make(map[string]string), + Feeder: dkio.DefaultFeeder(), + Tagger: datakit.DefaultGlobalTagger(), + Interval: time.Second * 30, + } +} + +func init() { //nolint:gochecknoinits + setupMetrics() + inputs.Add(inputName, func() inputs.Input { + return newProm() + }) +} diff --git a/internal/plugins/inputs/promv2/metrics.go b/internal/plugins/inputs/promv2/metrics.go new file mode 100644 index 0000000000..4eec1e8cd4 --- /dev/null +++ b/internal/plugins/inputs/promv2/metrics.go @@ -0,0 +1,37 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package promv2 + +import ( + "github.com/GuanceCloud/cliutils/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +var scrapeTotal *prometheus.SummaryVec + +func setupMetrics() { + scrapeTotal = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "datakit", + Subsystem: "input_promv2", + Name: "scrape_points", + Help: "The number of points scrape from endpoint", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.99: 0.001, + }, + }, + []string{ + "source", + "remote", + }, + ) + + metrics.MustRegister( + scrapeTotal, + ) +} diff --git a/internal/prom/prom.go b/internal/prom/prom.go index 56318e11b0..b8350baf2b 100644 --- a/internal/prom/prom.go +++ b/internal/prom/prom.go @@ -151,7 +151,7 @@ func (p *Prom) Request(url string) (*http.Response, error) { } // trace - s := httpcli.NewHTTPClientTraceStat("prom/" + p.opt.source) + s := httpcli.NewHTTPClientTraceStat("prom/"+p.opt.source, "") defer s.Metrics() req = req.WithContext(httptrace.WithClientTrace(req.Context(), s.Trace())) diff --git a/internal/promscrape/options.go b/internal/promscrape/options.go index 374390233a..94c4053775 100644 --- a/internal/promscrape/options.go +++ b/internal/promscrape/options.go @@ -16,7 +16,7 @@ import ( type option struct { optionClientConn - source string + source, remote string measurement string keepExistMetricName bool @@ -35,7 +35,7 @@ type optionClientConn struct { insecureSkipVerify bool tlsClientConfig *dknet.TLSClientConfig - headers map[string]string + httpHeaders map[string]string } type Option func(opt *option) @@ -46,9 +46,10 @@ var discardPointsFn = func([]*point.Point) error { func defaultOption() *option { return &option{ + source: "promscrape", optionClientConn: optionClientConn{ - timeout: time.Second * 10, - headers: make(map[string]string), + timeout: time.Second * 10, + httpHeaders: make(map[string]string), }, extraTags: make(map[string]string), callback: discardPointsFn, @@ -56,6 +57,7 @@ func defaultOption() *option { } func WithSource(str string) Option { return func(opt *option) { opt.source = str } } +func WithRemote(str string) Option { return func(opt *option) { opt.remote = str } } func WithMeasurement(str string) Option { return func(opt *option) { opt.measurement = str } } func KeepExistMetricName(b bool) Option { return func(opt *option) { opt.keepExistMetricName = b } @@ -90,9 +92,17 @@ func WithInsecureSkipVerify(b bool) Option { return func(opt *option) { opt.insecureSkipVerify = b } } +func WithHTTPHeader(m map[string]string) Option { + return func(opt *option) { + for k, v := range m { + opt.httpHeaders[k] = v + } + } +} + func WithBearerToken(str string) Option { return func(opt *option) { - opt.headers["Authorization"] = "Bearer " + str + opt.httpHeaders["Authorization"] = "Bearer " + str } } diff --git a/internal/promscrape/scraper.go b/internal/promscrape/scraper.go index f3d31e1873..e1037fc7e2 100644 --- a/internal/promscrape/scraper.go +++ b/internal/promscrape/scraper.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptrace" "strings" "time" @@ -53,7 +54,7 @@ func buildHTTPClient(opt *optionClientConn) (*http.Client, error) { CaCerts: opt.cacertFiles, Cert: opt.certFile, CertKey: opt.keyFile, - InsecureSkipVerify: false, + InsecureSkipVerify: opt.insecureSkipVerify, } conf, err := tlsconfig.TLSConfig() if err != nil { @@ -114,10 +115,15 @@ func (p *PromScraper) callbackForRow(rows []Row) error { func (p *PromScraper) newRequest(u string) (*http.Request, error) { req, err := http.NewRequest("GET", u, nil) - for k, v := range p.opt.headers { + req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") + for k, v := range p.opt.httpHeaders { req.Header.Set(k, v) } - req.Header.Set("Accept", "text/plain;version=0.0.4;q=1,*/*;q=0.1") + + s := httpcli.NewHTTPClientTraceStat(p.opt.source, p.opt.remote) + defer s.Metrics() + req = req.WithContext(httptrace.WithClientTrace(req.Context(), s.Trace())) + return req, err } diff --git a/vendor/github.com/GuanceCloud/cliutils/diskcache/drop.go b/vendor/github.com/GuanceCloud/cliutils/diskcache/drop.go index 262d00459c..678173e782 100644 --- a/vendor/github.com/GuanceCloud/cliutils/diskcache/drop.go +++ b/vendor/github.com/GuanceCloud/cliutils/diskcache/drop.go @@ -8,8 +8,9 @@ package diskcache import "os" const ( - reasonExceedCapacity = "exceed-max-capacity" - reasonBadDataFile = "bad-data-file" + reasonExceedCapacity = "exceed-max-capacity" + reasonBadDataFile = "bad-data-file" + reasonTooSmallReadBuffer = "too-small-read-buffer" ) func (c *DiskCache) dropBatch() error {