From 770fb3872aee693faa5797638d75810dd9a43ea2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=AD=E5=BD=AA?= Date: Wed, 25 Sep 2024 12:09:45 +0800 Subject: [PATCH] fix: fix up status is 1 when mysql/redis init failed. --- .../plugins/inputs/elasticsearch/input.go | 32 +++++++++++-------- .../inputs/elasticsearch/metric_up_helpers.go | 20 +++++++++++- internal/plugins/inputs/mysql/input.go | 5 ++- internal/plugins/inputs/nsq/input.go | 23 +++++++------ internal/plugins/inputs/oracle/input.go | 3 ++ internal/plugins/inputs/postgresql/input.go | 2 ++ internal/plugins/inputs/redis/input.go | 2 -- 7 files changed, 60 insertions(+), 27 deletions(-) diff --git a/internal/plugins/inputs/elasticsearch/input.go b/internal/plugins/inputs/elasticsearch/input.go index 1e775e525b..d0889a766a 100644 --- a/internal/plugins/inputs/elasticsearch/input.go +++ b/internal/plugins/inputs/elasticsearch/input.go @@ -433,30 +433,34 @@ func (ipt *Input) Collect() error { for _, serv := range ipt.Servers { func(s string) { g.Go(func(ctx context.Context) error { - var clusterName string - var err error - url := ipt.nodeStatsURL(s) - ipt.setUpState(url) + var ( + clusterName string + err error + nodeURL = ipt.nodeStatsURL(s) + ) + + ipt.setUpState(nodeURL) + // Always gather node stats - if clusterName, err = ipt.gatherNodeStats(url); err != nil { - ipt.setErrUpState(url) + if clusterName, err = ipt.gatherNodeStats(nodeURL); err != nil { + ipt.setErrUpState(nodeURL) l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) } if ipt.ClusterHealth { - url = s + "/_cluster/health" + url := s + "/_cluster/health" if ipt.ClusterHealthLevel != "" { url = url + "?level=" + ipt.ClusterHealthLevel } if err := ipt.gatherClusterHealth(url, s); err != nil { - ipt.setErrUpState(url) + ipt.setErrUpState(nodeURL) l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) } } if ipt.ClusterStats && (ipt.serverInfo[s].isMaster() || !ipt.ClusterStatsOnlyFromMaster || !ipt.Local) { if err := ipt.gatherClusterStats(s + "/_cluster/stats"); err != nil { - ipt.setErrUpState(url) + ipt.setErrUpState(nodeURL) l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) } } @@ -471,7 +475,7 @@ func (ipt *Input) Collect() error { "/"+ strings.Join(ipt.IndicesInclude, ",")+ "/_stats?ignore_unavailable=true", clusterName); err != nil { - ipt.setErrUpState(url) + ipt.setErrUpState(nodeURL) l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) } } else { @@ -479,22 +483,24 @@ func (ipt *Input) Collect() error { "/"+ strings.Join(ipt.IndicesInclude, ",")+ "/_stats?level=shards&ignore_unavailable=true", clusterName); err != nil { - ipt.setErrUpState(url) + ipt.setErrUpState(nodeURL) l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) } } // get settings if err := ipt.gatherIndicesSettings(s, clusterName); err != nil { - ipt.setErrUpState(url) + ipt.setErrUpState(nodeURL) l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) } } - ipt.FeedUpMetric(url) + + ipt.FeedUpMetric(nodeURL) return nil }) }(serv) } + ipt.collectCustomerObjectMeasurement() return g.Wait() } diff --git a/internal/plugins/inputs/elasticsearch/metric_up_helpers.go b/internal/plugins/inputs/elasticsearch/metric_up_helpers.go index d8e5579bda..ffae8faba1 100644 --- a/internal/plugins/inputs/elasticsearch/metric_up_helpers.go +++ b/internal/plugins/inputs/elasticsearch/metric_up_helpers.go @@ -20,11 +20,29 @@ import ( ) func (ipt *Input) setUpState(server string) { + l.Debugf("set up metric for %q to 1", server) + ipt.serverInfoMutex.Lock() ipt.upStates[server] = 1 + ipt.serverInfoMutex.Unlock() } func (ipt *Input) setErrUpState(server string) { + l.Debugf("set up metric for %q to 0", server) + ipt.serverInfoMutex.Lock() ipt.upStates[server] = 0 + ipt.serverInfoMutex.Unlock() +} + +func (ipt *Input) getUpState(server string) int { + ipt.serverInfoMutex.Lock() + defer ipt.serverInfoMutex.Unlock() + + if x, ok := ipt.upStates[server]; !ok { + l.Errorf("up status for server %q not set, should not been here", server) + return -1 // not set yet, should not been here + } else { + return x + } } func (ipt *Input) getUpJob() string { @@ -54,7 +72,7 @@ func (ipt *Input) buildUpPoints(server string) ([]*point.Point, error) { "instance": ipt.getUpInstance(server), } fields := map[string]interface{}{ - "up": ipt.upStates[server], + "up": ipt.getUpState(server), } m := &upMeasurement{ name: "collector", diff --git a/internal/plugins/inputs/mysql/input.go b/internal/plugins/inputs/mysql/input.go index 687a508eda..bdaab84379 100644 --- a/internal/plugins/inputs/mysql/input.go +++ b/internal/plugins/inputs/mysql/input.go @@ -723,6 +723,9 @@ func (ipt *Input) Run() { metrics.WithLastErrorInput(inputName), metrics.WithLastErrorCategory(point.Metric), ) + + // On init failing, we still upload up metric to show that the mysql input not working. + ipt.FeedUpMetric() } else { break } @@ -879,7 +882,7 @@ func defaultInput() *Input { feeder: dkio.DefaultFeeder(), tagger: datakit.DefaultGlobalTagger(), semStop: cliutils.NewSem(), - UpState: 1, + UpState: 0, } } diff --git a/internal/plugins/inputs/nsq/input.go b/internal/plugins/inputs/nsq/input.go index 7ed89df691..d405e9604e 100644 --- a/internal/plugins/inputs/nsq/input.go +++ b/internal/plugins/inputs/nsq/input.go @@ -125,9 +125,13 @@ func (ipt *Input) Run() { l.Debugf("not leader, skipped") continue } + ipt.setUpState() + l.Debugf("feed nsq pts") + ipt.FeedCoPts() + start := time.Now() pts, err := ipt.gather() if err != nil { @@ -135,24 +139,23 @@ func (ipt *Input) Run() { ipt.setErrUpState() } - if len(pts) == 0 { - ipt.FeedUpMetric() - continue + if len(pts) > 0 { + if err := ipt.feeder.FeedV2(point.Metric, pts, + dkio.WithCollectCost(time.Since(start)), + dkio.WithElection(ipt.Election), + dkio.WithInputName(inputName), + ); err != nil { + l.Errorf("io.Feed: %s, ignored", err) + } } - if err := ipt.feeder.FeedV2(point.Metric, pts, - dkio.WithCollectCost(time.Since(start)), - dkio.WithElection(ipt.Election), - dkio.WithInputName(inputName), - ); err != nil { - l.Errorf("io.Feed: %s, ignored", err) - } ipt.FeedUpMetric() case <-updateListTicker.C: if ipt.pause { l.Debugf("not leader, skipped") continue } + if ipt.isLookupd() { if err := ipt.updateEndpointListByLookupd(ipt.lookupdEndpoint); err != nil { l.Error(err) diff --git a/internal/plugins/inputs/oracle/input.go b/internal/plugins/inputs/oracle/input.go index a0fa110b85..c0a98effee 100644 --- a/internal/plugins/inputs/oracle/input.go +++ b/internal/plugins/inputs/oracle/input.go @@ -251,6 +251,9 @@ func (ipt *Input) Init() { case <-tick.C: } + + // on init failing, we still upload up metric to show that the oracle input not working. + ipt.FeedUpMetric() } } diff --git a/internal/plugins/inputs/postgresql/input.go b/internal/plugins/inputs/postgresql/input.go index cfd8ba0228..2a78882ac2 100644 --- a/internal/plugins/inputs/postgresql/input.go +++ b/internal/plugins/inputs/postgresql/input.go @@ -988,6 +988,8 @@ func (ipt *Input) Run() { metrics.WithLastErrorInput(inputName), metrics.WithLastErrorCategory(point.Metric), ) + + ipt.FeedUpMetric() } else { break } diff --git a/internal/plugins/inputs/redis/input.go b/internal/plugins/inputs/redis/input.go index 676cc8de32..fea368a76a 100644 --- a/internal/plugins/inputs/redis/input.go +++ b/internal/plugins/inputs/redis/input.go @@ -586,8 +586,6 @@ func defaultInput() *Input { Election: true, feeder: dkio.DefaultFeeder(), tagger: datakit.DefaultGlobalTagger(), - - UpState: 1, } }