Skip to content

Commit

Permalink
fix: fix up status is 1 when mysql/redis init failed.
Browse files Browse the repository at this point in the history
  • Loading branch information
谭彪 committed Sep 25, 2024
1 parent 3087146 commit 770fb38
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 27 deletions.
32 changes: 19 additions & 13 deletions internal/plugins/inputs/elasticsearch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,30 +433,34 @@ func (ipt *Input) Collect() error {
for _, serv := range ipt.Servers {
func(s string) {
g.Go(func(ctx context.Context) error {
var clusterName string
var err error
url := ipt.nodeStatsURL(s)
ipt.setUpState(url)
var (
clusterName string
err error
nodeURL = ipt.nodeStatsURL(s)
)

ipt.setUpState(nodeURL)

// Always gather node stats
if clusterName, err = ipt.gatherNodeStats(url); err != nil {
ipt.setErrUpState(url)
if clusterName, err = ipt.gatherNodeStats(nodeURL); err != nil {
ipt.setErrUpState(nodeURL)
l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))
}

if ipt.ClusterHealth {
url = s + "/_cluster/health"
url := s + "/_cluster/health"
if ipt.ClusterHealthLevel != "" {
url = url + "?level=" + ipt.ClusterHealthLevel
}
if err := ipt.gatherClusterHealth(url, s); err != nil {
ipt.setErrUpState(url)
ipt.setErrUpState(nodeURL)
l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))
}
}

if ipt.ClusterStats && (ipt.serverInfo[s].isMaster() || !ipt.ClusterStatsOnlyFromMaster || !ipt.Local) {
if err := ipt.gatherClusterStats(s + "/_cluster/stats"); err != nil {
ipt.setErrUpState(url)
ipt.setErrUpState(nodeURL)
l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))
}
}
Expand All @@ -471,30 +475,32 @@ func (ipt *Input) Collect() error {
"/"+
strings.Join(ipt.IndicesInclude, ",")+
"/_stats?ignore_unavailable=true", clusterName); err != nil {
ipt.setErrUpState(url)
ipt.setErrUpState(nodeURL)
l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))
}
} else {
if err := ipt.gatherIndicesStats(s+
"/"+
strings.Join(ipt.IndicesInclude, ",")+
"/_stats?level=shards&ignore_unavailable=true", clusterName); err != nil {
ipt.setErrUpState(url)
ipt.setErrUpState(nodeURL)
l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))
}
}

// get settings
if err := ipt.gatherIndicesSettings(s, clusterName); err != nil {
ipt.setErrUpState(url)
ipt.setErrUpState(nodeURL)
l.Warn(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))
}
}
ipt.FeedUpMetric(url)

ipt.FeedUpMetric(nodeURL)
return nil
})
}(serv)
}

ipt.collectCustomerObjectMeasurement()
return g.Wait()
}
Expand Down
20 changes: 19 additions & 1 deletion internal/plugins/inputs/elasticsearch/metric_up_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,29 @@ import (
)

func (ipt *Input) setUpState(server string) {
l.Debugf("set up metric for %q to 1", server)
ipt.serverInfoMutex.Lock()
ipt.upStates[server] = 1
ipt.serverInfoMutex.Unlock()
}

func (ipt *Input) setErrUpState(server string) {
l.Debugf("set up metric for %q to 0", server)
ipt.serverInfoMutex.Lock()
ipt.upStates[server] = 0
ipt.serverInfoMutex.Unlock()
}

func (ipt *Input) getUpState(server string) int {
ipt.serverInfoMutex.Lock()
defer ipt.serverInfoMutex.Unlock()

if x, ok := ipt.upStates[server]; !ok {
l.Errorf("up status for server %q not set, should not been here", server)
return -1 // not set yet, should not been here
} else {
return x
}
}

func (ipt *Input) getUpJob() string {
Expand Down Expand Up @@ -54,7 +72,7 @@ func (ipt *Input) buildUpPoints(server string) ([]*point.Point, error) {
"instance": ipt.getUpInstance(server),
}
fields := map[string]interface{}{
"up": ipt.upStates[server],
"up": ipt.getUpState(server),
}
m := &upMeasurement{
name: "collector",
Expand Down
5 changes: 4 additions & 1 deletion internal/plugins/inputs/mysql/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ func (ipt *Input) Run() {
metrics.WithLastErrorInput(inputName),
metrics.WithLastErrorCategory(point.Metric),
)

// On init failing, we still upload up metric to show that the mysql input not working.
ipt.FeedUpMetric()
} else {
break
}
Expand Down Expand Up @@ -879,7 +882,7 @@ func defaultInput() *Input {
feeder: dkio.DefaultFeeder(),
tagger: datakit.DefaultGlobalTagger(),
semStop: cliutils.NewSem(),
UpState: 1,
UpState: 0,
}
}

Expand Down
23 changes: 13 additions & 10 deletions internal/plugins/inputs/nsq/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,37 @@ func (ipt *Input) Run() {
l.Debugf("not leader, skipped")
continue
}

ipt.setUpState()

l.Debugf("feed nsq pts")

ipt.FeedCoPts()

start := time.Now()
pts, err := ipt.gather()
if err != nil {
l.Errorf("gather: %s, ignored", err)
ipt.setErrUpState()
}

if len(pts) == 0 {
ipt.FeedUpMetric()
continue
if len(pts) > 0 {
if err := ipt.feeder.FeedV2(point.Metric, pts,
dkio.WithCollectCost(time.Since(start)),
dkio.WithElection(ipt.Election),
dkio.WithInputName(inputName),
); err != nil {
l.Errorf("io.Feed: %s, ignored", err)
}
}

if err := ipt.feeder.FeedV2(point.Metric, pts,
dkio.WithCollectCost(time.Since(start)),
dkio.WithElection(ipt.Election),
dkio.WithInputName(inputName),
); err != nil {
l.Errorf("io.Feed: %s, ignored", err)
}
ipt.FeedUpMetric()
case <-updateListTicker.C:
if ipt.pause {
l.Debugf("not leader, skipped")
continue
}

if ipt.isLookupd() {
if err := ipt.updateEndpointListByLookupd(ipt.lookupdEndpoint); err != nil {
l.Error(err)
Expand Down
3 changes: 3 additions & 0 deletions internal/plugins/inputs/oracle/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (ipt *Input) Init() {

case <-tick.C:
}

// on init failing, we still upload up metric to show that the oracle input not working.
ipt.FeedUpMetric()
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/plugins/inputs/postgresql/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,8 @@ func (ipt *Input) Run() {
metrics.WithLastErrorInput(inputName),
metrics.WithLastErrorCategory(point.Metric),
)

ipt.FeedUpMetric()
} else {
break
}
Expand Down
2 changes: 0 additions & 2 deletions internal/plugins/inputs/redis/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,6 @@ func defaultInput() *Input {
Election: true,
feeder: dkio.DefaultFeeder(),
tagger: datakit.DefaultGlobalTagger(),

UpState: 1,
}
}

Expand Down

0 comments on commit 770fb38

Please sign in to comment.