diff --git a/internal/errchan/errchan.go b/internal/errchan/errchan.go deleted file mode 100644 index 467a0f4a74d35..0000000000000 --- a/internal/errchan/errchan.go +++ /dev/null @@ -1,37 +0,0 @@ -package errchan - -import ( - "fmt" - "strings" -) - -type ErrChan struct { - C chan error -} - -// New returns an error channel of max length 'n' -// errors can be sent to the ErrChan.C channel, and will be returned when -// ErrChan.Error() is called. -func New(n int) *ErrChan { - return &ErrChan{ - C: make(chan error, n), - } -} - -// Error closes the ErrChan.C channel and returns an error if there are any -// non-nil errors, otherwise returns nil. -func (e *ErrChan) Error() error { - close(e.C) - - var out string - for err := range e.C { - if err != nil { - out += "[" + err.Error() + "], " - } - } - - if out != "" { - return fmt.Errorf("Errors encountered: " + strings.TrimRight(out, ", ")) - } - return nil -} diff --git a/plugins/inputs/aerospike/aerospike.go b/plugins/inputs/aerospike/aerospike.go index 10f7fcd40a6bb..10cccc68957b1 100644 --- a/plugins/inputs/aerospike/aerospike.go +++ b/plugins/inputs/aerospike/aerospike.go @@ -10,7 +10,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" as "github.com/aerospike/aerospike-client-go" @@ -41,17 +40,16 @@ func (a *Aerospike) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - errChan := errchan.New(len(a.Servers)) wg.Add(len(a.Servers)) for _, server := range a.Servers { go func(serv string) { defer wg.Done() - errChan.C <- a.gatherServer(serv, acc) + acc.AddError(a.gatherServer(serv, acc)) }(server) } wg.Wait() - return errChan.Error() + return nil } func (a *Aerospike) gatherServer(hostport string, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/aerospike/aerospike_test.go b/plugins/inputs/aerospike/aerospike_test.go index b20af16570b6c..2055da3bbf2cb 100644 --- a/plugins/inputs/aerospike/aerospike_test.go +++ b/plugins/inputs/aerospike/aerospike_test.go @@ -19,7 +19,7 @@ func TestAerospikeStatistics(t *testing.T) { var acc testutil.Accumulator - err := a.Gather(&acc) + err := acc.GatherError(a.Gather) require.NoError(t, err) assert.True(t, acc.HasMeasurement("aerospike_node")) @@ -41,8 +41,7 @@ func TestAerospikeStatisticsPartialErr(t *testing.T) { var acc testutil.Accumulator - err := a.Gather(&acc) - require.Error(t, err) + require.Error(t, acc.GatherError(a.Gather)) assert.True(t, acc.HasMeasurement("aerospike_node")) assert.True(t, acc.HasMeasurement("aerospike_namespace")) diff --git a/plugins/inputs/apache/apache.go b/plugins/inputs/apache/apache.go index 85e7ee4d9c136..b50860cfb3b64 100644 --- a/plugins/inputs/apache/apache.go +++ b/plugins/inputs/apache/apache.go @@ -8,6 +8,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/influxdata/telegraf" @@ -65,28 +66,23 @@ func (n *Apache) Gather(acc telegraf.Accumulator) error { n.ResponseTimeout.Duration = time.Second * 5 } - var outerr error - var errch = make(chan error) - + var wg sync.WaitGroup + wg.Add(len(n.Urls)) for _, u := range n.Urls { addr, err := url.Parse(u) if err != nil { - return fmt.Errorf("Unable to parse address '%s': %s", u, err) + acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err)) + continue } go func(addr *url.URL) { - errch <- n.gatherUrl(addr, acc) + defer wg.Done() + acc.AddError(n.gatherUrl(addr, acc)) }(addr) } - // Drain channel, waiting for all requests to finish and save last error. - for range n.Urls { - if err := <-errch; err != nil { - outerr = err - } - } - - return outerr + wg.Wait() + return nil } func (n *Apache) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/apache/apache_test.go b/plugins/inputs/apache/apache_test.go index 2a80b38685687..ca8f4733c6bc5 100644 --- a/plugins/inputs/apache/apache_test.go +++ b/plugins/inputs/apache/apache_test.go @@ -41,7 +41,7 @@ func TestHTTPApache(t *testing.T) { } var acc testutil.Accumulator - err := a.Gather(&acc) + err := acc.GatherError(a.Gather) require.NoError(t, err) fields := map[string]interface{}{ diff --git a/plugins/inputs/cassandra/cassandra.go b/plugins/inputs/cassandra/cassandra.go index 710a2b661c878..88bb3f577b090 100644 --- a/plugins/inputs/cassandra/cassandra.go +++ b/plugins/inputs/cassandra/cassandra.go @@ -7,7 +7,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "io/ioutil" - "log" "net/http" "net/url" "strings" @@ -123,8 +122,8 @@ func (j javaMetric) addTagsFields(out map[string]interface{}) { } j.acc.AddFields(tokens["class"]+tokens["type"], fields, tags) } else { - fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", - j.metric, out) + j.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n", + j.metric, out)) } } @@ -155,8 +154,8 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) { addCassandraMetric(k, c, v.(map[string]interface{})) } } else { - fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", - c.metric, out) + c.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n", + c.metric, out)) return } } else { @@ -164,8 +163,8 @@ func (c cassandraMetric) addTagsFields(out map[string]interface{}) { addCassandraMetric(r.(map[string]interface{})["mbean"].(string), c, values.(map[string]interface{})) } else { - fmt.Printf("Missing key 'value' in '%s' output response\n%v\n", - c.metric, out) + c.acc.AddError(fmt.Errorf("Missing key 'value' in '%s' output response\n%v\n", + c.metric, out)) return } } @@ -274,8 +273,8 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error { m = newCassandraMetric(serverTokens["host"], metric, acc) } else { // unsupported metric type - log.Printf("I! Unsupported Cassandra metric [%s], skipping", - metric) + acc.AddError(fmt.Errorf("E! Unsupported Cassandra metric [%s], skipping", + metric)) continue } @@ -283,7 +282,8 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error { requestUrl, err := url.Parse("http://" + serverTokens["host"] + ":" + serverTokens["port"] + context + metric) if err != nil { - return err + acc.AddError(err) + continue } if serverTokens["user"] != "" && serverTokens["passwd"] != "" { requestUrl.User = url.UserPassword(serverTokens["user"], @@ -291,8 +291,12 @@ func (c *Cassandra) Gather(acc telegraf.Accumulator) error { } out, err := c.getAttr(requestUrl) + if err != nil { + acc.AddError(err) + continue + } if out["status"] != 200.0 { - fmt.Printf("URL returned with status %v\n", out["status"]) + acc.AddError(fmt.Errorf("URL returned with status %v\n", out["status"])) continue } m.addTagsFields(out) diff --git a/plugins/inputs/cassandra/cassandra_test.go b/plugins/inputs/cassandra/cassandra_test.go index aa39017fe7353..1a7e5a657816e 100644 --- a/plugins/inputs/cassandra/cassandra_test.go +++ b/plugins/inputs/cassandra/cassandra_test.go @@ -151,7 +151,7 @@ func TestHttpJsonJavaMultiValue(t *testing.T) { var acc testutil.Accumulator acc.SetDebug(true) - err := cassandra.Gather(&acc) + err := acc.GatherError(cassandra.Gather) assert.Nil(t, err) assert.Equal(t, 2, len(acc.Metrics)) @@ -180,7 +180,7 @@ func TestHttpJsonJavaMultiType(t *testing.T) { var acc testutil.Accumulator acc.SetDebug(true) - err := cassandra.Gather(&acc) + err := acc.GatherError(cassandra.Gather) assert.Nil(t, err) assert.Equal(t, 2, len(acc.Metrics)) @@ -197,16 +197,17 @@ func TestHttpJsonJavaMultiType(t *testing.T) { } // Test that the proper values are ignored or collected -func TestHttpJsonOn404(t *testing.T) { +func TestHttp404(t *testing.T) { - jolokia := genJolokiaClientStub(validJavaMultiValueJSON, 404, Servers, + jolokia := genJolokiaClientStub(invalidJSON, 404, Servers, []string{HeapMetric}) var acc testutil.Accumulator - err := jolokia.Gather(&acc) + err := acc.GatherError(jolokia.Gather) - assert.Nil(t, err) + assert.Error(t, err) assert.Equal(t, 0, len(acc.Metrics)) + assert.Contains(t, err.Error(), "has status code 404") } // Test that the proper values are ignored or collected for class=Cassandra @@ -214,7 +215,7 @@ func TestHttpJsonCassandraMultiValue(t *testing.T) { cassandra := genJolokiaClientStub(validCassandraMultiValueJSON, 200, Servers, []string{ReadLatencyMetric}) var acc testutil.Accumulator - err := cassandra.Gather(&acc) + err := acc.GatherError(cassandra.Gather) assert.Nil(t, err) assert.Equal(t, 1, len(acc.Metrics)) @@ -246,7 +247,7 @@ func TestHttpJsonCassandraNestedMultiValue(t *testing.T) { var acc testutil.Accumulator acc.SetDebug(true) - err := cassandra.Gather(&acc) + err := acc.GatherError(cassandra.Gather) assert.Nil(t, err) assert.Equal(t, 2, len(acc.Metrics)) diff --git a/plugins/inputs/ceph/ceph.go b/plugins/inputs/ceph/ceph.go index 7c03b626250fb..0de9cb13b5688 100644 --- a/plugins/inputs/ceph/ceph.go +++ b/plugins/inputs/ceph/ceph.go @@ -101,12 +101,12 @@ func (c *Ceph) gatherAdminSocketStats(acc telegraf.Accumulator) error { for _, s := range sockets { dump, err := perfDump(c.CephBinary, s) if err != nil { - log.Printf("E! error reading from socket '%s': %v", s.socket, err) + acc.AddError(fmt.Errorf("E! error reading from socket '%s': %v", s.socket, err)) continue } data, err := parseDump(dump) if err != nil { - log.Printf("E! error parsing dump from socket '%s': %v", s.socket, err) + acc.AddError(fmt.Errorf("E! error parsing dump from socket '%s': %v", s.socket, err)) continue } for tag, metrics := range data { diff --git a/plugins/inputs/cgroup/cgroup_linux.go b/plugins/inputs/cgroup/cgroup_linux.go index e8ba6f8819df8..f43b589754238 100644 --- a/plugins/inputs/cgroup/cgroup_linux.go +++ b/plugins/inputs/cgroup/cgroup_linux.go @@ -22,10 +22,11 @@ func (g *CGroup) Gather(acc telegraf.Accumulator) error { for dir := range list { if dir.err != nil { - return dir.err + acc.AddError(dir.err) + continue } if err := g.gatherDir(dir.path, acc); err != nil { - return err + acc.AddError(err) } } diff --git a/plugins/inputs/cgroup/cgroup_test.go b/plugins/inputs/cgroup/cgroup_test.go index 206b51f6d28f4..0ad3a5b242ca3 100644 --- a/plugins/inputs/cgroup/cgroup_test.go +++ b/plugins/inputs/cgroup/cgroup_test.go @@ -24,7 +24,7 @@ var cg1 = &CGroup{ func TestCgroupStatistics_1(t *testing.T) { var acc testutil.Accumulator - err := cg1.Gather(&acc) + err := acc.GatherError(cg1.Gather) require.NoError(t, err) tags := map[string]string{ @@ -56,7 +56,7 @@ var cg2 = &CGroup{ func TestCgroupStatistics_2(t *testing.T) { var acc testutil.Accumulator - err := cg2.Gather(&acc) + err := acc.GatherError(cg2.Gather) require.NoError(t, err) tags := map[string]string{ @@ -81,7 +81,7 @@ var cg3 = &CGroup{ func TestCgroupStatistics_3(t *testing.T) { var acc testutil.Accumulator - err := cg3.Gather(&acc) + err := acc.GatherError(cg3.Gather) require.NoError(t, err) tags := map[string]string{ @@ -108,7 +108,7 @@ var cg4 = &CGroup{ func TestCgroupStatistics_4(t *testing.T) { var acc testutil.Accumulator - err := cg4.Gather(&acc) + err := acc.GatherError(cg4.Gather) require.NoError(t, err) tags := map[string]string{ @@ -140,7 +140,7 @@ var cg5 = &CGroup{ func TestCgroupStatistics_5(t *testing.T) { var acc testutil.Accumulator - err := cg5.Gather(&acc) + err := acc.GatherError(cg5.Gather) require.NoError(t, err) tags := map[string]string{ @@ -167,7 +167,7 @@ var cg6 = &CGroup{ func TestCgroupStatistics_6(t *testing.T) { var acc testutil.Accumulator - err := cg6.Gather(&acc) + err := acc.GatherError(cg6.Gather) require.NoError(t, err) tags := map[string]string{ diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index f0a067001bd85..2e8bee6714a2c 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -13,7 +13,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" internalaws "github.com/influxdata/telegraf/internal/config/aws" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -186,8 +185,6 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { if err != nil { return err } - metricCount := len(metrics) - errChan := errchan.New(metricCount) now := time.Now() @@ -202,12 +199,12 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { <-lmtr.C go func(inm *cloudwatch.Metric) { defer wg.Done() - c.gatherMetric(acc, inm, now, errChan.C) + acc.AddError(c.gatherMetric(acc, inm, now)) }(m) } wg.Wait() - return errChan.Error() + return nil } func init() { @@ -285,13 +282,11 @@ func (c *CloudWatch) gatherMetric( acc telegraf.Accumulator, metric *cloudwatch.Metric, now time.Time, - errChan chan error, -) { +) error { params := c.getStatisticsInput(metric, now) resp, err := c.client.GetMetricStatistics(params) if err != nil { - errChan <- err - return + return err } for _, point := range resp.Datapoints { @@ -326,7 +321,7 @@ func (c *CloudWatch) gatherMetric( acc.AddFields(formatMeasurement(c.Namespace), fields, tags, *point.Timestamp) } - errChan <- nil + return nil } /* diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index 3aaab7d455783..c52b3a35313f7 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -64,7 +64,7 @@ func TestGather(t *testing.T) { var acc testutil.Accumulator c.client = &mockGatherCloudWatchClient{} - c.Gather(&acc) + acc.GatherError(c.Gather) fields := map[string]interface{}{} fields["latency_minimum"] = 0.1 diff --git a/plugins/inputs/conntrack/conntrack.go b/plugins/inputs/conntrack/conntrack.go index 841aedb545a03..4df01a31f5a4b 100644 --- a/plugins/inputs/conntrack/conntrack.go +++ b/plugins/inputs/conntrack/conntrack.go @@ -11,7 +11,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "log" "path/filepath" ) @@ -93,15 +92,15 @@ func (c *Conntrack) Gather(acc telegraf.Accumulator) error { contents, err := ioutil.ReadFile(fName) if err != nil { - log.Printf("E! failed to read file '%s': %v", fName, err) + acc.AddError(fmt.Errorf("E! failed to read file '%s': %v", fName, err)) continue } v := strings.TrimSpace(string(contents)) fields[metricKey], err = strconv.ParseFloat(v, 64) if err != nil { - log.Printf("E! failed to parse metric, expected number but "+ - " found '%s': %v", v, err) + acc.AddError(fmt.Errorf("E! failed to parse metric, expected number but "+ + " found '%s': %v", v, err)) } } } diff --git a/plugins/inputs/couchbase/couchbase.go b/plugins/inputs/couchbase/couchbase.go index 48e0c1a75e135..3c3bb36717034 100644 --- a/plugins/inputs/couchbase/couchbase.go +++ b/plugins/inputs/couchbase/couchbase.go @@ -42,19 +42,17 @@ func (r *Couchbase) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error - for _, serv := range r.Servers { wg.Add(1) go func(serv string) { defer wg.Done() - outerr = r.gatherServer(serv, acc, nil) + acc.AddError(r.gatherServer(serv, acc, nil)) }(serv) } wg.Wait() - return outerr + return nil } func (r *Couchbase) gatherServer(addr string, acc telegraf.Accumulator, pool *couchbase.Pool) error { diff --git a/plugins/inputs/couchdb/couchdb.go b/plugins/inputs/couchdb/couchdb.go index bf241649a49ce..da6ba67dcaf91 100644 --- a/plugins/inputs/couchdb/couchdb.go +++ b/plugins/inputs/couchdb/couchdb.go @@ -2,13 +2,11 @@ package couchdb import ( "encoding/json" - "errors" "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "net/http" "reflect" - "strings" "sync" "time" ) @@ -83,34 +81,20 @@ func (*CouchDB) SampleConfig() string { } func (c *CouchDB) Gather(accumulator telegraf.Accumulator) error { - errorChannel := make(chan error, len(c.HOSTs)) var wg sync.WaitGroup for _, u := range c.HOSTs { wg.Add(1) go func(host string) { defer wg.Done() if err := c.fetchAndInsertData(accumulator, host); err != nil { - errorChannel <- fmt.Errorf("[host=%s]: %s", host, err) + accumulator.AddError(fmt.Errorf("[host=%s]: %s", host, err)) } }(u) } wg.Wait() - close(errorChannel) - - // If there weren't any errors, we can return nil now. - if len(errorChannel) == 0 { - return nil - } - - // There were errors, so join them all together as one big error. - errorStrings := make([]string, 0, len(errorChannel)) - for err := range errorChannel { - errorStrings = append(errorStrings, err.Error()) - } - - return errors.New(strings.Join(errorStrings, "\n")) + return nil } var tr = &http.Transport{ diff --git a/plugins/inputs/couchdb/couchdb_test.go b/plugins/inputs/couchdb/couchdb_test.go index 7b824e7483c99..4c03708523ce1 100644 --- a/plugins/inputs/couchdb/couchdb_test.go +++ b/plugins/inputs/couchdb/couchdb_test.go @@ -316,5 +316,5 @@ func TestBasic(t *testing.T) { } var acc testutil.Accumulator - require.NoError(t, plugin.Gather(&acc)) + require.NoError(t, acc.GatherError(plugin.Gather)) } diff --git a/plugins/inputs/disque/disque.go b/plugins/inputs/disque/disque.go index 0e4baf9cb8d42..6585ab88eb587 100644 --- a/plugins/inputs/disque/disque.go +++ b/plugins/inputs/disque/disque.go @@ -75,12 +75,11 @@ func (g *Disque) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error - for _, serv := range g.Servers { u, err := url.Parse(serv) if err != nil { - return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err)) + continue } else if u.Scheme == "" { // fallback to simple string based address (i.e. "10.0.0.1:10000") u.Scheme = "tcp" @@ -90,13 +89,13 @@ func (g *Disque) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(serv string) { defer wg.Done() - outerr = g.gatherServer(u, acc) + acc.AddError(g.gatherServer(u, acc)) }(serv) } wg.Wait() - return outerr + return nil } const defaultPort = "7711" diff --git a/plugins/inputs/disque/disque_test.go b/plugins/inputs/disque/disque_test.go index f060e956845b3..1e5b764f9c820 100644 --- a/plugins/inputs/disque/disque_test.go +++ b/plugins/inputs/disque/disque_test.go @@ -51,7 +51,7 @@ func TestDisqueGeneratesMetrics(t *testing.T) { var acc testutil.Accumulator - err = r.Gather(&acc) + err = acc.GatherError(r.Gather) require.NoError(t, err) fields := map[string]interface{}{ @@ -117,7 +117,7 @@ func TestDisqueCanPullStatsFromMultipleServers(t *testing.T) { var acc testutil.Accumulator - err = r.Gather(&acc) + err = acc.GatherError(r.Gather) require.NoError(t, err) fields := map[string]interface{}{ diff --git a/plugins/inputs/dns_query/dns_query.go b/plugins/inputs/dns_query/dns_query.go index 1bccc52c0b5b9..2f1fc54827d3f 100644 --- a/plugins/inputs/dns_query/dns_query.go +++ b/plugins/inputs/dns_query/dns_query.go @@ -9,7 +9,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -58,11 +57,10 @@ func (d *DnsQuery) Description() string { func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { d.setDefaultValues() - errChan := errchan.New(len(d.Domains) * len(d.Servers)) for _, domain := range d.Domains { for _, server := range d.Servers { dnsQueryTime, err := d.getDnsQueryTime(domain, server) - errChan.C <- err + acc.AddError(err) tags := map[string]string{ "server": server, "domain": domain, @@ -74,7 +72,7 @@ func (d *DnsQuery) Gather(acc telegraf.Accumulator) error { } } - return errChan.Error() + return nil } func (d *DnsQuery) setDefaultValues() { diff --git a/plugins/inputs/dns_query/dns_query_test.go b/plugins/inputs/dns_query/dns_query_test.go index aeeb7656fc21b..3f70153e4e422 100644 --- a/plugins/inputs/dns_query/dns_query_test.go +++ b/plugins/inputs/dns_query/dns_query_test.go @@ -24,7 +24,7 @@ func TestGathering(t *testing.T) { } var acc testutil.Accumulator - err := dnsConfig.Gather(&acc) + err := acc.GatherError(dnsConfig.Gather) assert.NoError(t, err) metric, ok := acc.Get("dns_query") require.True(t, ok) @@ -44,7 +44,7 @@ func TestGatheringMxRecord(t *testing.T) { var acc testutil.Accumulator dnsConfig.RecordType = "MX" - err := dnsConfig.Gather(&acc) + err := acc.GatherError(dnsConfig.Gather) assert.NoError(t, err) metric, ok := acc.Get("dns_query") require.True(t, ok) @@ -70,7 +70,7 @@ func TestGatheringRootDomain(t *testing.T) { } fields := map[string]interface{}{} - err := dnsConfig.Gather(&acc) + err := acc.GatherError(dnsConfig.Gather) assert.NoError(t, err) metric, ok := acc.Get("dns_query") require.True(t, ok) @@ -96,7 +96,7 @@ func TestMetricContainsServerAndDomainAndRecordTypeTags(t *testing.T) { } fields := map[string]interface{}{} - err := dnsConfig.Gather(&acc) + err := acc.GatherError(dnsConfig.Gather) assert.NoError(t, err) metric, ok := acc.Get("dns_query") require.True(t, ok) @@ -121,7 +121,7 @@ func TestGatheringTimeout(t *testing.T) { channel := make(chan error, 1) go func() { - channel <- dnsConfig.Gather(&acc) + channel <- acc.GatherError(dnsConfig.Gather) }() select { case res := <-channel: diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index 47d1db14b09ec..a439af0686ed2 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "log" "regexp" "strconv" "strings" @@ -159,7 +158,7 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { // Get daemon info err := d.gatherInfo(acc) if err != nil { - fmt.Println(err.Error()) + acc.AddError(err) } // List containers @@ -179,8 +178,8 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error { defer wg.Done() err := d.gatherContainer(c, acc) if err != nil { - log.Printf("E! Error gathering container %s stats: %s\n", - c.Names, err.Error()) + acc.AddError(fmt.Errorf("E! Error gathering container %s stats: %s\n", + c.Names, err.Error())) } }(container) } diff --git a/plugins/inputs/docker/docker_test.go b/plugins/inputs/docker/docker_test.go index 3e2e1607b4c08..cc385bd5d0311 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/plugins/inputs/docker/docker_test.go @@ -302,7 +302,7 @@ func TestDockerGatherInfo(t *testing.T) { testing: true, } - err := d.Gather(&acc) + err := acc.GatherError(d.Gather) require.NoError(t, err) acc.AssertContainsTaggedFields(t, diff --git a/plugins/inputs/dovecot/dovecot.go b/plugins/inputs/dovecot/dovecot.go index 56290e75924a1..a621252e5dd4b 100644 --- a/plugins/inputs/dovecot/dovecot.go +++ b/plugins/inputs/dovecot/dovecot.go @@ -12,7 +12,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -66,19 +65,18 @@ func (d *Dovecot) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - errChan := errchan.New(len(d.Servers) * len(d.Filters)) for _, server := range d.Servers { for _, filter := range d.Filters { wg.Add(1) go func(s string, f string) { defer wg.Done() - errChan.C <- d.gatherServer(s, acc, d.Type, f) + acc.AddError(d.gatherServer(s, acc, d.Type, f)) }(server, filter) } } wg.Wait() - return errChan.Error() + return nil } func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype string, filter string) error { diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 370e3fbdd3567..c11bfdd2f4d0f 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -10,7 +10,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" "io/ioutil" @@ -153,7 +152,6 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { e.client = client } - errChan := errchan.New(len(e.Servers) * 3) var wg sync.WaitGroup wg.Add(len(e.Servers)) @@ -176,24 +174,21 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { // Always gather node states if err := e.gatherNodeStats(url, acc); err != nil { - err = fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) - errChan.C <- err + acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) return } if e.ClusterHealth { url = s + "/_cluster/health?level=indices" if err := e.gatherClusterHealth(url, acc); err != nil { - err = fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) - errChan.C <- err + acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) return } } if e.ClusterStats && e.isMaster { if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil { - err = fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")) - errChan.C <- err + acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@"))) return } } @@ -201,7 +196,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { } wg.Wait() - return errChan.Error() + return nil } func (e *Elasticsearch) createHttpClient() (*http.Client, error) { diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index 59caa4306b1e9..f057cfd8bbb3b 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -71,7 +71,7 @@ func TestGather(t *testing.T) { es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse) var acc testutil.Accumulator - if err := es.Gather(&acc); err != nil { + if err := acc.GatherError(es.Gather); err != nil { t.Fatal(err) } diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 154080138c88f..e9ce4551e7463 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -15,7 +15,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/nagios" @@ -49,8 +48,7 @@ type Exec struct { parser parsers.Parser - runner Runner - errChan chan error + runner Runner } func NewExec() *Exec { @@ -150,13 +148,13 @@ func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync out, err := e.runner.Run(e, command, acc) if err != nil { - e.errChan <- err + acc.AddError(err) return } metrics, err := e.parser.Parse(out) if err != nil { - e.errChan <- err + acc.AddError(err) } else { for _, metric := range metrics { acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) @@ -193,7 +191,8 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { matches, err := filepath.Glob(cmdAndArgs[0]) if err != nil { - return err + acc.AddError(err) + continue } if len(matches) == 0 { @@ -214,15 +213,12 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { } } - errChan := errchan.New(len(commands)) - e.errChan = errChan.C - wg.Add(len(commands)) for _, command := range commands { go e.ProcessCommand(command, acc, &wg) } wg.Wait() - return errChan.Error() + return nil } func init() { diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 975eb9642668d..52bcd1fb42876 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -101,7 +101,7 @@ func TestExec(t *testing.T) { } var acc testutil.Accumulator - err := e.Gather(&acc) + err := acc.GatherError(e.Gather) require.NoError(t, err) assert.Equal(t, acc.NFields(), 8, "non-numeric measurements should be ignored") @@ -127,8 +127,7 @@ func TestExecMalformed(t *testing.T) { } var acc testutil.Accumulator - err := e.Gather(&acc) - require.Error(t, err) + require.Error(t, acc.GatherError(e.Gather)) assert.Equal(t, acc.NFields(), 0, "No new points should have been added") } @@ -141,8 +140,7 @@ func TestCommandError(t *testing.T) { } var acc testutil.Accumulator - err := e.Gather(&acc) - require.Error(t, err) + require.Error(t, acc.GatherError(e.Gather)) assert.Equal(t, acc.NFields(), 0, "No new points should have been added") } @@ -155,8 +153,7 @@ func TestLineProtocolParse(t *testing.T) { } var acc testutil.Accumulator - err := e.Gather(&acc) - require.NoError(t, err) + require.NoError(t, acc.GatherError(e.Gather)) fields := map[string]interface{}{ "usage_idle": float64(99), @@ -191,7 +188,7 @@ func TestLineProtocolShortParse(t *testing.T) { } var acc testutil.Accumulator - err := e.Gather(&acc) + err := acc.GatherError(e.Gather) require.Error(t, err) assert.Contains(t, err.Error(), "buffer too short", "A buffer too short error was expected") } @@ -205,7 +202,7 @@ func TestLineProtocolParseMultiple(t *testing.T) { } var acc testutil.Accumulator - err := e.Gather(&acc) + err := acc.GatherError(e.Gather) require.NoError(t, err) fields := map[string]interface{}{ @@ -231,7 +228,7 @@ func TestExecCommandWithGlob(t *testing.T) { e.SetParser(parser) var acc testutil.Accumulator - err := e.Gather(&acc) + err := acc.GatherError(e.Gather) require.NoError(t, err) fields := map[string]interface{}{ @@ -247,7 +244,7 @@ func TestExecCommandWithoutGlob(t *testing.T) { e.SetParser(parser) var acc testutil.Accumulator - err := e.Gather(&acc) + err := acc.GatherError(e.Gather) require.NoError(t, err) fields := map[string]interface{}{ @@ -263,7 +260,7 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) { e.SetParser(parser) var acc testutil.Accumulator - err := e.Gather(&acc) + err := acc.GatherError(e.Gather) require.NoError(t, err) fields := map[string]interface{}{ diff --git a/plugins/inputs/filestat/filestat.go b/plugins/inputs/filestat/filestat.go index 83f511a84ee06..7519e295cd788 100644 --- a/plugins/inputs/filestat/filestat.go +++ b/plugins/inputs/filestat/filestat.go @@ -48,7 +48,6 @@ func (_ *FileStat) Description() string { func (_ *FileStat) SampleConfig() string { return sampleConfig } func (f *FileStat) Gather(acc telegraf.Accumulator) error { - var errS string var err error for _, filepath := range f.Files { @@ -56,7 +55,7 @@ func (f *FileStat) Gather(acc telegraf.Accumulator) error { g, ok := f.globs[filepath] if !ok { if g, err = globpath.Compile(filepath); err != nil { - errS += err.Error() + " " + acc.AddError(err) continue } f.globs[filepath] = g @@ -92,7 +91,7 @@ func (f *FileStat) Gather(acc telegraf.Accumulator) error { if f.Md5 { md5, err := getMd5(fileName) if err != nil { - errS += err.Error() + " " + acc.AddError(err) } else { fields["md5_sum"] = md5 } @@ -102,9 +101,6 @@ func (f *FileStat) Gather(acc telegraf.Accumulator) error { } } - if errS != "" { - return fmt.Errorf(errS) - } return nil } diff --git a/plugins/inputs/filestat/filestat_test.go b/plugins/inputs/filestat/filestat_test.go index a404869d995ee..83d3c87cc9448 100644 --- a/plugins/inputs/filestat/filestat_test.go +++ b/plugins/inputs/filestat/filestat_test.go @@ -19,7 +19,7 @@ func TestGatherNoMd5(t *testing.T) { } acc := testutil.Accumulator{} - fs.Gather(&acc) + acc.GatherError(fs.Gather) tags1 := map[string]string{ "file": dir + "log1.log", @@ -59,7 +59,7 @@ func TestGatherExplicitFiles(t *testing.T) { } acc := testutil.Accumulator{} - fs.Gather(&acc) + acc.GatherError(fs.Gather) tags1 := map[string]string{ "file": dir + "log1.log", @@ -99,7 +99,7 @@ func TestGatherGlob(t *testing.T) { } acc := testutil.Accumulator{} - fs.Gather(&acc) + acc.GatherError(fs.Gather) tags1 := map[string]string{ "file": dir + "log1.log", @@ -131,7 +131,7 @@ func TestGatherSuperAsterisk(t *testing.T) { } acc := testutil.Accumulator{} - fs.Gather(&acc) + acc.GatherError(fs.Gather) tags1 := map[string]string{ "file": dir + "log1.log", diff --git a/plugins/inputs/graylog/graylog.go b/plugins/inputs/graylog/graylog.go index 52e2ef42a5f03..6dcc9b979b7ea 100644 --- a/plugins/inputs/graylog/graylog.go +++ b/plugins/inputs/graylog/graylog.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/base64" "encoding/json" - "errors" "fmt" "io/ioutil" "net" @@ -149,31 +148,17 @@ func (h *GrayLog) Gather(acc telegraf.Accumulator) error { h.client.SetHTTPClient(client) } - errorChannel := make(chan error, len(h.Servers)) - for _, server := range h.Servers { wg.Add(1) go func(server string) { defer wg.Done() - if err := h.gatherServer(acc, server); err != nil { - errorChannel <- err - } + acc.AddError(h.gatherServer(acc, server)) }(server) } wg.Wait() - close(errorChannel) - - // Get all errors and return them as one giant error - errorStrings := []string{} - for err := range errorChannel { - errorStrings = append(errorStrings, err.Error()) - } - if len(errorStrings) == 0 { - return nil - } - return errors.New(strings.Join(errorStrings, "\n")) + return nil } // Gathers data from a particular server diff --git a/plugins/inputs/graylog/graylog_test.go b/plugins/inputs/graylog/graylog_test.go index 09bca454db935..a5088cf7da5b1 100644 --- a/plugins/inputs/graylog/graylog_test.go +++ b/plugins/inputs/graylog/graylog_test.go @@ -157,7 +157,7 @@ func TestNormalResponse(t *testing.T) { for _, service := range graylog { var acc testutil.Accumulator - err := service.Gather(&acc) + err := acc.GatherError(service.Gather) require.NoError(t, err) for k, v := range expectedFields { acc.AssertContainsTaggedFields(t, k, v, validTags[k]) @@ -170,9 +170,9 @@ func TestHttpJson500(t *testing.T) { graylog := genMockGrayLog(validJSON, 500) var acc testutil.Accumulator - err := graylog[0].Gather(&acc) + err := acc.GatherError(graylog[0].Gather) - assert.NotNil(t, err) + assert.Error(t, err) assert.Equal(t, 0, acc.NFields()) } @@ -181,9 +181,9 @@ func TestHttpJsonBadJson(t *testing.T) { graylog := genMockGrayLog(invalidJSON, 200) var acc testutil.Accumulator - err := graylog[0].Gather(&acc) + err := acc.GatherError(graylog[0].Gather) - assert.NotNil(t, err) + assert.Error(t, err) assert.Equal(t, 0, acc.NFields()) } @@ -192,8 +192,8 @@ func TestHttpJsonEmptyResponse(t *testing.T) { graylog := genMockGrayLog(empty, 200) var acc testutil.Accumulator - err := graylog[0].Gather(&acc) + err := acc.GatherError(graylog[0].Gather) - assert.NotNil(t, err) + assert.Error(t, err) assert.Equal(t, 0, acc.NFields()) } diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index 8bfe22bff9ab4..ba5d7734db6eb 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -1,7 +1,6 @@ package httpjson import ( - "errors" "fmt" "io/ioutil" "net/http" @@ -145,31 +144,17 @@ func (h *HttpJson) Gather(acc telegraf.Accumulator) error { h.client.SetHTTPClient(client) } - errorChannel := make(chan error, len(h.Servers)) - for _, server := range h.Servers { wg.Add(1) go func(server string) { defer wg.Done() - if err := h.gatherServer(acc, server); err != nil { - errorChannel <- err - } + acc.AddError(h.gatherServer(acc, server)) }(server) } wg.Wait() - close(errorChannel) - - // Get all errors and return them as one giant error - errorStrings := []string{} - for err := range errorChannel { - errorStrings = append(errorStrings, err.Error()) - } - if len(errorStrings) == 0 { - return nil - } - return errors.New(strings.Join(errorStrings, "\n")) + return nil } // Gathers data from a particular server diff --git a/plugins/inputs/httpjson/httpjson_test.go b/plugins/inputs/httpjson/httpjson_test.go index 0029eb3e93ffc..629da8b5f7b51 100644 --- a/plugins/inputs/httpjson/httpjson_test.go +++ b/plugins/inputs/httpjson/httpjson_test.go @@ -210,7 +210,7 @@ func TestHttpJson200(t *testing.T) { for _, service := range httpjson { var acc testutil.Accumulator - err := service.Gather(&acc) + err := acc.GatherError(service.Gather) require.NoError(t, err) assert.Equal(t, 12, acc.NFields()) // Set responsetime @@ -245,7 +245,7 @@ func TestHttpJsonGET_URL(t *testing.T) { } var acc testutil.Accumulator - err := a.Gather(&acc) + err := acc.GatherError(a.Gather) require.NoError(t, err) // remove response_time from gathered fields because it's non-deterministic @@ -318,7 +318,7 @@ func TestHttpJsonGET(t *testing.T) { } var acc testutil.Accumulator - err := a.Gather(&acc) + err := acc.GatherError(a.Gather) require.NoError(t, err) // remove response_time from gathered fields because it's non-deterministic @@ -392,7 +392,7 @@ func TestHttpJsonPOST(t *testing.T) { } var acc testutil.Accumulator - err := a.Gather(&acc) + err := acc.GatherError(a.Gather) require.NoError(t, err) // remove response_time from gathered fields because it's non-deterministic @@ -448,9 +448,9 @@ func TestHttpJson500(t *testing.T) { httpjson := genMockHttpJson(validJSON, 500) var acc testutil.Accumulator - err := httpjson[0].Gather(&acc) + err := acc.GatherError(httpjson[0].Gather) - assert.NotNil(t, err) + assert.Error(t, err) assert.Equal(t, 0, acc.NFields()) } @@ -460,9 +460,9 @@ func TestHttpJsonBadMethod(t *testing.T) { httpjson[0].Method = "NOT_A_REAL_METHOD" var acc testutil.Accumulator - err := httpjson[0].Gather(&acc) + err := acc.GatherError(httpjson[0].Gather) - assert.NotNil(t, err) + assert.Error(t, err) assert.Equal(t, 0, acc.NFields()) } @@ -471,9 +471,9 @@ func TestHttpJsonBadJson(t *testing.T) { httpjson := genMockHttpJson(invalidJSON, 200) var acc testutil.Accumulator - err := httpjson[0].Gather(&acc) + err := acc.GatherError(httpjson[0].Gather) - assert.NotNil(t, err) + assert.Error(t, err) assert.Equal(t, 0, acc.NFields()) } @@ -482,9 +482,9 @@ func TestHttpJsonEmptyResponse(t *testing.T) { httpjson := genMockHttpJson(empty, 200) var acc testutil.Accumulator - err := httpjson[0].Gather(&acc) + err := acc.GatherError(httpjson[0].Gather) - assert.NotNil(t, err) + assert.Error(t, err) assert.Equal(t, 0, acc.NFields()) } @@ -495,7 +495,7 @@ func TestHttpJson200Tags(t *testing.T) { for _, service := range httpjson { if service.Name == "other_webapp" { var acc testutil.Accumulator - err := service.Gather(&acc) + err := acc.GatherError(service.Gather) // Set responsetime for _, p := range acc.Metrics { p.Fields["response_time"] = 1.0 @@ -533,7 +533,7 @@ func TestHttpJsonArray200Tags(t *testing.T) { for _, service := range httpjson { if service.Name == "other_webapp" { var acc testutil.Accumulator - err := service.Gather(&acc) + err := acc.GatherError(service.Gather) // Set responsetime for _, p := range acc.Metrics { p.Fields["response_time"] = 1.0 diff --git a/plugins/inputs/influxdb/influxdb.go b/plugins/inputs/influxdb/influxdb.go index 3c98eead34a38..fc84d7c928b4e 100644 --- a/plugins/inputs/influxdb/influxdb.go +++ b/plugins/inputs/influxdb/influxdb.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net/http" - "strings" "sync" "time" @@ -57,34 +56,20 @@ func (i *InfluxDB) Gather(acc telegraf.Accumulator) error { } } - errorChannel := make(chan error, len(i.URLs)) - var wg sync.WaitGroup for _, u := range i.URLs { wg.Add(1) go func(url string) { defer wg.Done() if err := i.gatherURL(acc, url); err != nil { - errorChannel <- fmt.Errorf("[url=%s]: %s", url, err) + acc.AddError(fmt.Errorf("[url=%s]: %s", url, err)) } }(u) } wg.Wait() - close(errorChannel) - - // If there weren't any errors, we can return nil now. - if len(errorChannel) == 0 { - return nil - } - // There were errors, so join them all together as one big error. - errorStrings := make([]string, 0, len(errorChannel)) - for err := range errorChannel { - errorStrings = append(errorStrings, err.Error()) - } - - return errors.New(strings.Join(errorStrings, "\n")) + return nil } type point struct { diff --git a/plugins/inputs/influxdb/influxdb_test.go b/plugins/inputs/influxdb/influxdb_test.go index c27aa77dc33f9..f24ecc24c11cf 100644 --- a/plugins/inputs/influxdb/influxdb_test.go +++ b/plugins/inputs/influxdb/influxdb_test.go @@ -25,7 +25,7 @@ func TestBasic(t *testing.T) { } var acc testutil.Accumulator - require.NoError(t, plugin.Gather(&acc)) + require.NoError(t, acc.GatherError(plugin.Gather)) require.Len(t, acc.Metrics, 3) fields := map[string]interface{}{ @@ -72,7 +72,7 @@ func TestInfluxDB(t *testing.T) { } var acc testutil.Accumulator - require.NoError(t, plugin.Gather(&acc)) + require.NoError(t, acc.GatherError(plugin.Gather)) require.Len(t, acc.Metrics, 34) @@ -132,7 +132,7 @@ func TestInfluxDB2(t *testing.T) { } var acc testutil.Accumulator - require.NoError(t, plugin.Gather(&acc)) + require.NoError(t, acc.GatherError(plugin.Gather)) require.Len(t, acc.Metrics, 34) @@ -157,7 +157,7 @@ func TestErrorHandling(t *testing.T) { } var acc testutil.Accumulator - require.Error(t, plugin.Gather(&acc)) + require.Error(t, acc.GatherError(plugin.Gather)) } func TestErrorHandling404(t *testing.T) { @@ -175,7 +175,7 @@ func TestErrorHandling404(t *testing.T) { } var acc testutil.Accumulator - require.Error(t, plugin.Gather(&acc)) + require.Error(t, acc.GatherError(plugin.Gather)) } const basicJSON = ` diff --git a/plugins/inputs/ipmi_sensor/ipmi.go b/plugins/inputs/ipmi_sensor/ipmi.go index a3beeb29702f8..73f22b3938e99 100644 --- a/plugins/inputs/ipmi_sensor/ipmi.go +++ b/plugins/inputs/ipmi_sensor/ipmi.go @@ -52,7 +52,8 @@ func (m *Ipmi) Gather(acc telegraf.Accumulator) error { for _, server := range m.Servers { err := m.parse(acc, server) if err != nil { - return err + acc.AddError(err) + continue } } } else { diff --git a/plugins/inputs/ipmi_sensor/ipmi_test.go b/plugins/inputs/ipmi_sensor/ipmi_test.go index 84bcdcac01b93..9e846065e151e 100644 --- a/plugins/inputs/ipmi_sensor/ipmi_test.go +++ b/plugins/inputs/ipmi_sensor/ipmi_test.go @@ -20,7 +20,7 @@ func TestGather(t *testing.T) { execCommand = fakeExecCommand var acc testutil.Accumulator - err := i.Gather(&acc) + err := acc.GatherError(i.Gather) require.NoError(t, err) @@ -121,7 +121,7 @@ func TestGather(t *testing.T) { Path: "ipmitool", } - err = i.Gather(&acc) + err = acc.GatherError(i.Gather) var testsWithoutServer = []struct { fields map[string]interface{} diff --git a/plugins/inputs/iptables/iptables.go b/plugins/inputs/iptables/iptables.go index eab33bf9f21f1..af39417183dc0 100644 --- a/plugins/inputs/iptables/iptables.go +++ b/plugins/inputs/iptables/iptables.go @@ -54,20 +54,19 @@ func (ipt *Iptables) Gather(acc telegraf.Accumulator) error { } // best effort : we continue through the chains even if an error is encountered, // but we keep track of the last error. - var err error for _, chain := range ipt.Chains { data, e := ipt.lister(ipt.Table, chain) if e != nil { - err = e + acc.AddError(e) continue } e = ipt.parseAndGather(data, acc) if e != nil { - err = e + acc.AddError(e) continue } } - return err + return nil } func (ipt *Iptables) chainList(table, chain string) (string, error) { diff --git a/plugins/inputs/iptables/iptables_test.go b/plugins/inputs/iptables/iptables_test.go index bd8a2a726b9f0..96190713fb35b 100644 --- a/plugins/inputs/iptables/iptables_test.go +++ b/plugins/inputs/iptables/iptables_test.go @@ -141,7 +141,7 @@ func TestIptables_Gather(t *testing.T) { }, } acc := new(testutil.Accumulator) - err := ipt.Gather(acc) + err := acc.GatherError(ipt.Gather) if !reflect.DeepEqual(tt.err, err) { t.Errorf("%d: expected error '%#v' got '%#v'", i, tt.err, err) } @@ -199,7 +199,7 @@ func TestIptables_Gather_listerError(t *testing.T) { }, } acc := new(testutil.Accumulator) - err := ipt.Gather(acc) + err := acc.GatherError(ipt.Gather) if !reflect.DeepEqual(err, errFoo) { t.Errorf("Expected error %#v got\n%#v\n", errFoo, err) } diff --git a/plugins/inputs/jolokia/jolokia_test.go b/plugins/inputs/jolokia/jolokia_test.go index cf415f36f6efb..b47ffbc26d42b 100644 --- a/plugins/inputs/jolokia/jolokia_test.go +++ b/plugins/inputs/jolokia/jolokia_test.go @@ -158,7 +158,7 @@ func TestHttpJsonMultiValue(t *testing.T) { jolokia := genJolokiaClientStub(validMultiValueJSON, 200, Servers, []Metric{HeapMetric}) var acc testutil.Accumulator - err := jolokia.Gather(&acc) + err := acc.GatherError(jolokia.Gather) assert.Nil(t, err) assert.Equal(t, 1, len(acc.Metrics)) @@ -210,7 +210,7 @@ func TestHttpJsonThreeLevelMultiValue(t *testing.T) { jolokia := genJolokiaClientStub(validThreeLevelMultiValueJSON, 200, Servers, []Metric{HeapMetric}) var acc testutil.Accumulator - err := jolokia.Gather(&acc) + err := acc.GatherError(jolokia.Gather) assert.Nil(t, err) assert.Equal(t, 1, len(acc.Metrics)) @@ -238,17 +238,18 @@ func TestHttpJsonThreeLevelMultiValue(t *testing.T) { } // Test that the proper values are ignored or collected -func TestHttpJsonOn404(t *testing.T) { +func TestHttp404(t *testing.T) { - jolokia := genJolokiaClientStub(validMultiValueJSON, 404, Servers, + jolokia := genJolokiaClientStub(invalidJSON, 404, Servers, []Metric{UsedHeapMetric}) var acc testutil.Accumulator acc.SetDebug(true) - err := jolokia.Gather(&acc) + err := acc.GatherError(jolokia.Gather) - assert.Nil(t, err) + assert.Error(t, err) assert.Equal(t, 0, len(acc.Metrics)) + assert.Contains(t, err.Error(), "has status code 404") } // Test that the proper values are ignored or collected @@ -259,8 +260,9 @@ func TestHttpInvalidJson(t *testing.T) { var acc testutil.Accumulator acc.SetDebug(true) - err := jolokia.Gather(&acc) + err := acc.GatherError(jolokia.Gather) - assert.Nil(t, err) + assert.Error(t, err) assert.Equal(t, 0, len(acc.Metrics)) + assert.Contains(t, err.Error(), "Error decoding JSON response") } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go index c1c93e7cb4155..41ce101570946 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go @@ -59,7 +59,7 @@ func TestReadsMetricsFromKafka(t *testing.T) { waitForPoint(&acc, t) // Gather points - err = k.Gather(&acc) + err = acc.GatherError(k.Gather) require.NoError(t, err) if len(acc.Metrics) == 1 { point := acc.Metrics[0] diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 04498261ca69f..5519dd0d17b44 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -92,7 +92,7 @@ func TestRunParserAndGather(t *testing.T) { in <- saramaMsg(testMsg) acc.Wait(1) - k.Gather(&acc) + acc.GatherError(k.Gather) assert.Equal(t, acc.NFields(), 1) acc.AssertContainsFields(t, "cpu_load_short", @@ -111,7 +111,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) { in <- saramaMsg(testMsgGraphite) acc.Wait(1) - k.Gather(&acc) + acc.GatherError(k.Gather) assert.Equal(t, acc.NFields(), 1) acc.AssertContainsFields(t, "cpu_load_short_graphite", @@ -130,7 +130,7 @@ func TestRunParserAndGatherJSON(t *testing.T) { in <- saramaMsg(testMsgJSON) acc.Wait(1) - k.Gather(&acc) + acc.GatherError(k.Gather) assert.Equal(t, acc.NFields(), 2) acc.AssertContainsFields(t, "kafka_json_test", diff --git a/plugins/inputs/kubernetes/kubernetes.go b/plugins/inputs/kubernetes/kubernetes.go index ee95d560f426c..87d08aa9435e1 100644 --- a/plugins/inputs/kubernetes/kubernetes.go +++ b/plugins/inputs/kubernetes/kubernetes.go @@ -11,7 +11,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -72,14 +71,13 @@ func (k *Kubernetes) Description() string { //Gather collects kubernetes metrics from a given URL func (k *Kubernetes) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - errChan := errchan.New(1) wg.Add(1) go func(k *Kubernetes) { defer wg.Done() - errChan.C <- k.gatherSummary(k.URL, acc) + acc.AddError(k.gatherSummary(k.URL, acc)) }(k) wg.Wait() - return errChan.Error() + return nil } func buildURL(endpoint string, base string) (*url.URL, error) { diff --git a/plugins/inputs/kubernetes/kubernetes_test.go b/plugins/inputs/kubernetes/kubernetes_test.go index 528299be1b9b2..289e36ae498dc 100644 --- a/plugins/inputs/kubernetes/kubernetes_test.go +++ b/plugins/inputs/kubernetes/kubernetes_test.go @@ -22,7 +22,7 @@ func TestKubernetesStats(t *testing.T) { } var acc testutil.Accumulator - err := k.Gather(&acc) + err := acc.GatherError(k.Gather) require.NoError(t, err) fields := map[string]interface{}{ diff --git a/plugins/inputs/leofs/leofs.go b/plugins/inputs/leofs/leofs.go index 06c71e932ace2..e1daf7034a86e 100644 --- a/plugins/inputs/leofs/leofs.go +++ b/plugins/inputs/leofs/leofs.go @@ -154,15 +154,16 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error { return nil } var wg sync.WaitGroup - var outerr error for _, endpoint := range l.Servers { _, err := url.Parse(endpoint) if err != nil { - return fmt.Errorf("Unable to parse the address:%s, err:%s", endpoint, err) + acc.AddError(fmt.Errorf("Unable to parse the address:%s, err:%s", endpoint, err)) + continue } port, err := retrieveTokenAfterColon(endpoint) if err != nil { - return err + acc.AddError(err) + continue } st, ok := serverTypeMapping[port] if !ok { @@ -171,11 +172,11 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(endpoint string, st ServerType) { defer wg.Done() - outerr = l.gatherServer(endpoint, st, acc) + acc.AddError(l.gatherServer(endpoint, st, acc)) }(endpoint, st) } wg.Wait() - return outerr + return nil } func (l *LeoFS) gatherServer( diff --git a/plugins/inputs/leofs/leofs_test.go b/plugins/inputs/leofs/leofs_test.go index 292cd15d0e738..a5ca30432984a 100644 --- a/plugins/inputs/leofs/leofs_test.go +++ b/plugins/inputs/leofs/leofs_test.go @@ -146,7 +146,7 @@ func testMain(t *testing.T, code string, endpoint string, serverType ServerType) var acc testutil.Accumulator acc.SetDebug(true) - err := l.Gather(&acc) + err := acc.GatherError(l.Gather) require.NoError(t, err) floatMetrics := KeyMapping[serverType] diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index c5641ba28ad05..0b5bcb811b678 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -9,7 +9,6 @@ import ( "github.com/influxdata/tail" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" @@ -118,14 +117,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { } // compile log parser patterns: - errChan := errchan.New(len(l.parsers)) + var haveError bool for _, parser := range l.parsers { if err := parser.Compile(); err != nil { - errChan.C <- err + acc.AddError(err) + haveError = true } } - if err := errChan.Error(); err != nil { - return err + if haveError { + return nil } l.wg.Add(1) @@ -143,8 +143,6 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { seek.Offset = 0 } - errChan := errchan.New(len(l.Files)) - // Create a "tailer" for each file for _, filepath := range l.Files { g, err := globpath.Compile(filepath) @@ -153,7 +151,6 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { continue } files := g.Match() - errChan = errchan.New(len(files)) for file, _ := range files { if _, ok := l.tailers[file]; ok { @@ -168,7 +165,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { Location: &seek, MustExist: true, }) - errChan.C <- err + l.acc.AddError(err) // create a goroutine for each "tailer" l.wg.Add(1) @@ -177,7 +174,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { } } - return errChan.Error() + return nil } // receiver is launched as a goroutine to continuously watch a tailed logfile diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go index db9795f286fa0..66802da4fb0d4 100644 --- a/plugins/inputs/logparser/logparser_test.go +++ b/plugins/inputs/logparser/logparser_test.go @@ -38,7 +38,10 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) { } acc := testutil.Accumulator{} - assert.Error(t, logparser.Start(&acc)) + logparser.Start(&acc) + if assert.NotEmpty(t, acc.Errors) { + assert.Error(t, acc.Errors[0]) + } logparser.Stop() } @@ -106,7 +109,7 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) { os.Symlink( thisdir+"grok/testdata/test_a.log", emptydir+"/test_a.log") - assert.NoError(t, logparser.Gather(&acc)) + assert.NoError(t, acc.GatherError(logparser.Gather)) acc.Wait(1) logparser.Stop() diff --git a/plugins/inputs/mailchimp/mailchimp_test.go b/plugins/inputs/mailchimp/mailchimp_test.go index 0c4dab56d5d12..ed6898e6029e0 100644 --- a/plugins/inputs/mailchimp/mailchimp_test.go +++ b/plugins/inputs/mailchimp/mailchimp_test.go @@ -140,7 +140,7 @@ func TestMailChimpGatherReport(t *testing.T) { } -func TestMailChimpGatherError(t *testing.T) { +func TestMailChimpGatherErroror(t *testing.T) { ts := httptest.NewServer( http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { diff --git a/plugins/inputs/memcached/memcached.go b/plugins/inputs/memcached/memcached.go index d174abedafbdb..2b6b120c83254 100644 --- a/plugins/inputs/memcached/memcached.go +++ b/plugins/inputs/memcached/memcached.go @@ -9,7 +9,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -73,16 +72,15 @@ func (m *Memcached) Gather(acc telegraf.Accumulator) error { return m.gatherServer(":11211", false, acc) } - errChan := errchan.New(len(m.Servers) + len(m.UnixSockets)) for _, serverAddress := range m.Servers { - errChan.C <- m.gatherServer(serverAddress, false, acc) + acc.AddError(m.gatherServer(serverAddress, false, acc)) } for _, unixAddress := range m.UnixSockets { - errChan.C <- m.gatherServer(unixAddress, true, acc) + acc.AddError(m.gatherServer(unixAddress, true, acc)) } - return errChan.Error() + return nil } func (m *Memcached) gatherServer( diff --git a/plugins/inputs/memcached/memcached_test.go b/plugins/inputs/memcached/memcached_test.go index 436c978f7be92..aae59dbc8f436 100644 --- a/plugins/inputs/memcached/memcached_test.go +++ b/plugins/inputs/memcached/memcached_test.go @@ -21,7 +21,7 @@ func TestMemcachedGeneratesMetrics(t *testing.T) { var acc testutil.Accumulator - err := m.Gather(&acc) + err := acc.GatherError(m.Gather) require.NoError(t, err) intMetrics := []string{"get_hits", "get_misses", "evictions", diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index e6c68bd7dd321..26b55c35295c3 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -8,7 +8,6 @@ import ( "net" "net/http" "strconv" - "strings" "sync" "time" @@ -96,16 +95,13 @@ func (m *Mesos) SetDefaults() { // Gather() metrics from given list of Mesos Masters func (m *Mesos) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var errorChannel chan error m.SetDefaults() - errorChannel = make(chan error, len(m.Masters)+2*len(m.Slaves)) - for _, v := range m.Masters { wg.Add(1) go func(c string) { - errorChannel <- m.gatherMainMetrics(c, ":5050", MASTER, acc) + acc.AddError(m.gatherMainMetrics(c, ":5050", MASTER, acc)) wg.Done() return }(v) @@ -114,7 +110,7 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error { for _, v := range m.Slaves { wg.Add(1) go func(c string) { - errorChannel <- m.gatherMainMetrics(c, ":5051", SLAVE, acc) + acc.AddError(m.gatherMainMetrics(c, ":5051", SLAVE, acc)) wg.Done() return }(v) @@ -125,26 +121,14 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error { // wg.Add(1) // go func(c string) { - // errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc) + // acc.AddError(m.gatherSlaveTaskMetrics(c, ":5051", acc)) // wg.Done() // return // }(v) } wg.Wait() - close(errorChannel) - errorStrings := []string{} - - // Gather all errors for returning them at once - for err := range errorChannel { - if err != nil { - errorStrings = append(errorStrings, err.Error()) - } - } - if len(errorStrings) > 0 { - return errors.New(strings.Join(errorStrings, "\n")) - } return nil } diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go index 5c83e294c1c33..a7705d11ee436 100644 --- a/plugins/inputs/mesos/mesos_test.go +++ b/plugins/inputs/mesos/mesos_test.go @@ -282,7 +282,7 @@ func TestMesosMaster(t *testing.T) { Timeout: 10, } - err := m.Gather(&acc) + err := acc.GatherError(m.Gather) if err != nil { t.Errorf(err.Error()) @@ -330,7 +330,7 @@ func TestMesosSlave(t *testing.T) { Timeout: 10, } - err := m.Gather(&acc) + err := acc.GatherError(m.Gather) if err != nil { t.Errorf(err.Error()) diff --git a/plugins/inputs/mongodb/mongodb.go b/plugins/inputs/mongodb/mongodb.go index a80b94690d359..510a313c17c46 100644 --- a/plugins/inputs/mongodb/mongodb.go +++ b/plugins/inputs/mongodb/mongodb.go @@ -11,7 +11,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" "gopkg.in/mgo.v2" ) @@ -73,11 +72,11 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - errChan := errchan.New(len(m.Servers)) for _, serv := range m.Servers { u, err := url.Parse(serv) if err != nil { - return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err)) + continue } else if u.Scheme == "" { u.Scheme = "mongodb" // fallback to simple string based address (i.e. "10.0.0.1:10000") @@ -89,12 +88,12 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(srv *Server) { defer wg.Done() - errChan.C <- m.gatherServer(srv, acc) + acc.AddError(m.gatherServer(srv, acc)) }(m.getMongoServer(u)) } wg.Wait() - return errChan.Error() + return nil } func (m *MongoDB) getMongoServer(url *url.URL) *Server { diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 1ff7c34215059..e3861c82f6877 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -11,7 +11,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" "github.com/go-sql-driver/mysql" @@ -137,19 +136,18 @@ func (m *Mysql) Gather(acc telegraf.Accumulator) error { m.InitMysql() } var wg sync.WaitGroup - errChan := errchan.New(len(m.Servers)) // Loop through each server and collect metrics for _, server := range m.Servers { wg.Add(1) go func(s string) { defer wg.Done() - errChan.C <- m.gatherServer(s, acc) + acc.AddError(m.gatherServer(s, acc)) }(server) } wg.Wait() - return errChan.Error() + return nil } type mapping struct { diff --git a/plugins/inputs/nginx/nginx.go b/plugins/inputs/nginx/nginx.go index 3fe8c04d1c341..f439c1eeb23b2 100644 --- a/plugins/inputs/nginx/nginx.go +++ b/plugins/inputs/nginx/nginx.go @@ -12,7 +12,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -35,23 +34,22 @@ func (n *Nginx) Description() string { func (n *Nginx) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - errChan := errchan.New(len(n.Urls)) for _, u := range n.Urls { addr, err := url.Parse(u) if err != nil { - return fmt.Errorf("Unable to parse address '%s': %s", u, err) + acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err)) } wg.Add(1) go func(addr *url.URL) { defer wg.Done() - errChan.C <- n.gatherUrl(addr, acc) + acc.AddError(n.gatherUrl(addr, acc)) }(addr) } wg.Wait() - return errChan.Error() + return nil } var tr = &http.Transport{ diff --git a/plugins/inputs/nginx/nginx_test.go b/plugins/inputs/nginx/nginx_test.go index 4c8fabfe04cad..7eb9e90b653ef 100644 --- a/plugins/inputs/nginx/nginx_test.go +++ b/plugins/inputs/nginx/nginx_test.go @@ -64,8 +64,8 @@ func TestNginxGeneratesMetrics(t *testing.T) { var acc_nginx testutil.Accumulator var acc_tengine testutil.Accumulator - err_nginx := n.Gather(&acc_nginx) - err_tengine := nt.Gather(&acc_tengine) + err_nginx := acc_nginx.GatherError(n.Gather) + err_tengine := acc_tengine.GatherError(nt.Gather) require.NoError(t, err_nginx) require.NoError(t, err_tengine) diff --git a/plugins/inputs/nsq/nsq.go b/plugins/inputs/nsq/nsq.go index 8bfd72788593b..b59420c2617f7 100644 --- a/plugins/inputs/nsq/nsq.go +++ b/plugins/inputs/nsq/nsq.go @@ -32,7 +32,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -66,17 +65,16 @@ func (n *NSQ) Description() string { func (n *NSQ) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - errChan := errchan.New(len(n.Endpoints)) for _, e := range n.Endpoints { wg.Add(1) go func(e string) { defer wg.Done() - errChan.C <- n.gatherEndpoint(e, acc) + acc.AddError(n.gatherEndpoint(e, acc)) }(e) } wg.Wait() - return errChan.Error() + return nil } var tr = &http.Transport{ diff --git a/plugins/inputs/nsq/nsq_test.go b/plugins/inputs/nsq/nsq_test.go index 23fd19a428ed6..926f99638503a 100644 --- a/plugins/inputs/nsq/nsq_test.go +++ b/plugins/inputs/nsq/nsq_test.go @@ -24,7 +24,7 @@ func TestNSQStats(t *testing.T) { } var acc testutil.Accumulator - err := n.Gather(&acc) + err := acc.GatherError(n.Gather) require.NoError(t, err) u, err := url.Parse(ts.URL) diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go index d4f4e9679d884..b82b005c952e1 100644 --- a/plugins/inputs/nsq_consumer/nsq_consumer.go +++ b/plugins/inputs/nsq_consumer/nsq_consumer.go @@ -1,7 +1,7 @@ package nsq_consumer import ( - "log" + "fmt" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" @@ -62,7 +62,7 @@ func (n *NSQConsumer) Start(acc telegraf.Accumulator) error { n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error { metrics, err := n.parser.Parse(message.Body) if err != nil { - log.Printf("E! NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error()) + acc.AddError(fmt.Errorf("E! NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error())) return nil } for _, metric := range metrics { diff --git a/plugins/inputs/ntpq/ntpq.go b/plugins/inputs/ntpq/ntpq.go index 601d5b2df4f94..8280e51c1dc81 100644 --- a/plugins/inputs/ntpq/ntpq.go +++ b/plugins/inputs/ntpq/ntpq.go @@ -5,7 +5,7 @@ package ntpq import ( "bufio" "bytes" - "log" + "fmt" "os/exec" "strconv" "strings" @@ -132,7 +132,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { case strings.HasSuffix(when, "h"): m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "h")) if err != nil { - log.Printf("E! Error ntpq: parsing int: %s", fields[index]) + acc.AddError(fmt.Errorf("E! Error ntpq: parsing int: %s", fields[index])) continue } // seconds in an hour @@ -141,7 +141,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { case strings.HasSuffix(when, "d"): m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "d")) if err != nil { - log.Printf("E! Error ntpq: parsing int: %s", fields[index]) + acc.AddError(fmt.Errorf("E! Error ntpq: parsing int: %s", fields[index])) continue } // seconds in a day @@ -150,7 +150,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { case strings.HasSuffix(when, "m"): m, err := strconv.Atoi(strings.TrimSuffix(fields[index], "m")) if err != nil { - log.Printf("E! Error ntpq: parsing int: %s", fields[index]) + acc.AddError(fmt.Errorf("E! Error ntpq: parsing int: %s", fields[index])) continue } // seconds in a day @@ -161,7 +161,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { m, err := strconv.Atoi(fields[index]) if err != nil { - log.Printf("E! Error ntpq: parsing int: %s", fields[index]) + acc.AddError(fmt.Errorf("E! Error ntpq: parsing int: %s", fields[index])) continue } mFields[key] = int64(m) @@ -178,7 +178,7 @@ func (n *NTPQ) Gather(acc telegraf.Accumulator) error { m, err := strconv.ParseFloat(fields[index], 64) if err != nil { - log.Printf("E! Error ntpq: parsing float: %s", fields[index]) + acc.AddError(fmt.Errorf("E! Error ntpq: parsing float: %s", fields[index])) continue } mFields[key] = m diff --git a/plugins/inputs/ntpq/ntpq_test.go b/plugins/inputs/ntpq/ntpq_test.go index 68abab7beed67..4b356e1f180af 100644 --- a/plugins/inputs/ntpq/ntpq_test.go +++ b/plugins/inputs/ntpq/ntpq_test.go @@ -21,7 +21,7 @@ func TestSingleNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.NoError(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(101), @@ -51,7 +51,7 @@ func TestMissingJitterField(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.NoError(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(101), @@ -80,7 +80,7 @@ func TestBadIntNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.Error(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(101), @@ -109,7 +109,7 @@ func TestBadFloatNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.Error(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(2), @@ -138,7 +138,7 @@ func TestDaysNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.NoError(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(172800), @@ -168,7 +168,7 @@ func TestHoursNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.NoError(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(7200), @@ -198,7 +198,7 @@ func TestMinutesNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.NoError(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(120), @@ -228,7 +228,7 @@ func TestBadWhenNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.Error(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "poll": int64(256), @@ -257,7 +257,7 @@ func TestMultiNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.NoError(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "delay": float64(54.033), @@ -303,7 +303,7 @@ func TestBadHeaderNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.NoError(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(101), @@ -333,7 +333,7 @@ func TestMissingDelayColumnNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.NoError(t, n.Gather(&acc)) + assert.NoError(t, acc.GatherError(n.Gather)) fields := map[string]interface{}{ "when": int64(101), @@ -361,7 +361,7 @@ func TestFailedNTPQ(t *testing.T) { } acc := testutil.Accumulator{} - assert.Error(t, n.Gather(&acc)) + assert.Error(t, acc.GatherError(n.Gather)) } type tester struct { diff --git a/plugins/inputs/phpfpm/phpfpm.go b/plugins/inputs/phpfpm/phpfpm.go index d7a14d0eee81a..36cd02f59138b 100644 --- a/plugins/inputs/phpfpm/phpfpm.go +++ b/plugins/inputs/phpfpm/phpfpm.go @@ -80,19 +80,17 @@ func (g *phpfpm) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error - for _, serv := range g.Urls { wg.Add(1) go func(serv string) { defer wg.Done() - outerr = g.gatherServer(serv, acc) + acc.AddError(g.gatherServer(serv, acc)) }(serv) } wg.Wait() - return outerr + return nil } // Request status page to get stat raw data and import it diff --git a/plugins/inputs/phpfpm/phpfpm_test.go b/plugins/inputs/phpfpm/phpfpm_test.go index c965e5a138411..80a5d4bcfdf97 100644 --- a/plugins/inputs/phpfpm/phpfpm_test.go +++ b/plugins/inputs/phpfpm/phpfpm_test.go @@ -35,7 +35,7 @@ func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) { var acc testutil.Accumulator - err := r.Gather(&acc) + err := acc.GatherError(r.Gather) require.NoError(t, err) tags := map[string]string{ @@ -75,7 +75,7 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) { } var acc testutil.Accumulator - err = r.Gather(&acc) + err = acc.GatherError(r.Gather) require.NoError(t, err) tags := map[string]string{ @@ -119,7 +119,7 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) { var acc testutil.Accumulator - err = r.Gather(&acc) + err = acc.GatherError(r.Gather) require.NoError(t, err) tags := map[string]string{ @@ -163,7 +163,7 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) { var acc testutil.Accumulator - err = r.Gather(&acc) + err = acc.GatherError(r.Gather) require.NoError(t, err) tags := map[string]string{ @@ -193,7 +193,7 @@ func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) { var acc testutil.Accumulator - err := r.Gather(&acc) + err := acc.GatherError(r.Gather) require.Error(t, err) assert.Contains(t, err.Error(), "127.0.0.1/status") } @@ -205,7 +205,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t var acc testutil.Accumulator - err := r.Gather(&acc) + err := acc.GatherError(r.Gather) require.Error(t, err) assert.Contains(t, err.Error(), `Unable to connect to phpfpm status page 'http://aninvalidone': Get http://aninvalidone: dial tcp: lookup aninvalidone`) } @@ -217,7 +217,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testi var acc testutil.Accumulator - err := r.Gather(&acc) + err := acc.GatherError(r.Gather) require.Error(t, err) assert.Equal(t, `Socket doesn't exist '/tmp/invalid.sock': stat /tmp/invalid.sock: no such file or directory`, err.Error()) diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index f5256750d5e40..de33ebee146f1 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -68,7 +68,6 @@ func (_ *Ping) SampleConfig() string { func (p *Ping) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - errorChannel := make(chan error, len(p.Urls)*2) // Spin off a go routine for each url to ping for _, url := range p.Urls { @@ -80,14 +79,14 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { out, err := p.pingHost(totalTimeout, args...) if err != nil { // Combine go err + stderr output - errorChannel <- errors.New( - strings.TrimSpace(out) + ", " + err.Error()) + acc.AddError(errors.New( + strings.TrimSpace(out) + ", " + err.Error())) } tags := map[string]string{"url": u} trans, rec, avg, stddev, err := processPingOutput(out) if err != nil { // fatal error - errorChannel <- err + acc.AddError(err) return } // Calculate packet loss percentage @@ -108,18 +107,8 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { } wg.Wait() - close(errorChannel) - // Get all errors and return them as one giant error - errorStrings := []string{} - for err := range errorChannel { - errorStrings = append(errorStrings, err.Error()) - } - - if len(errorStrings) == 0 { - return nil - } - return errors.New(strings.Join(errorStrings, "\n")) + return nil } func hostPinger(timeout float64, args ...string) (string, error) { diff --git a/plugins/inputs/ping/ping_test.go b/plugins/inputs/ping/ping_test.go index a7a6931f50bdb..22dac4e65a264 100644 --- a/plugins/inputs/ping/ping_test.go +++ b/plugins/inputs/ping/ping_test.go @@ -144,7 +144,7 @@ func TestPingGather(t *testing.T) { pingHost: mockHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ "packets_transmitted": 5, @@ -182,7 +182,7 @@ func TestLossyPingGather(t *testing.T) { pingHost: mockLossyHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ "packets_transmitted": 5, @@ -215,7 +215,7 @@ func TestBadPingGather(t *testing.T) { pingHost: mockErrorHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) tags := map[string]string{"url": "www.amazon.com"} fields := map[string]interface{}{ "packets_transmitted": 2, @@ -237,7 +237,7 @@ func TestFatalPingGather(t *testing.T) { pingHost: mockFatalHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) assert.False(t, acc.HasMeasurement("packets_transmitted"), "Fatal ping should not have packet measurements") assert.False(t, acc.HasMeasurement("packets_received"), diff --git a/plugins/inputs/ping/ping_windows_test.go b/plugins/inputs/ping/ping_windows_test.go index b55b7955b4110..70ff78f75c904 100644 --- a/plugins/inputs/ping/ping_windows_test.go +++ b/plugins/inputs/ping/ping_windows_test.go @@ -69,7 +69,7 @@ func TestPingGather(t *testing.T) { pingHost: mockHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ "packets_transmitted": 4, @@ -112,7 +112,7 @@ func TestBadPingGather(t *testing.T) { pingHost: mockErrorHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) tags := map[string]string{"url": "www.amazon.com"} fields := map[string]interface{}{ "packets_transmitted": 4, @@ -155,7 +155,7 @@ func TestLossyPingGather(t *testing.T) { pingHost: mockLossyHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ "packets_transmitted": 9, @@ -214,7 +214,7 @@ func TestFatalPingGather(t *testing.T) { pingHost: mockFatalHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) assert.True(t, acc.HasFloatField("ping", "errors"), "Fatal ping should have packet measurements") assert.False(t, acc.HasIntField("ping", "packets_transmitted"), @@ -259,7 +259,7 @@ func TestUnreachablePingGather(t *testing.T) { pingHost: mockUnreachableHostPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ @@ -305,7 +305,7 @@ func TestTTLExpiredPingGather(t *testing.T) { pingHost: mockTTLExpiredPinger, } - p.Gather(&acc) + acc.GatherError(p.Gather) tags := map[string]string{"url": "www.google.com"} fields := map[string]interface{}{ diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index b8d3be625c8e5..24326511e0b03 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -170,7 +170,8 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { if p.Query[i].Version <= db_version { rows, err := db.Query(sql_query) if err != nil { - return err + acc.AddError(err) + continue } defer rows.Close() @@ -178,7 +179,8 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { // grab the column information from the result p.OrderedColumns, err = rows.Columns() if err != nil { - return err + acc.AddError(err) + continue } else { for _, v := range p.OrderedColumns { p.AllColumns = append(p.AllColumns, v) @@ -195,7 +197,8 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { for rows.Next() { err = p.accRow(meas_name, rows, acc) if err != nil { - return err + acc.AddError(err) + break } } } diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go index f92284ee43929..0b2785cad8096 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -26,7 +26,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { }, } var acc testutil.Accumulator - err := p.Gather(&acc) + err := acc.GatherError(p.Gather) require.NoError(t, err) availableColumns := make(map[string]bool) @@ -114,7 +114,7 @@ func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { var acc testutil.Accumulator - err := p.Gather(&acc) + err := acc.GatherError(p.Gather) require.NoError(t, err) for col := range p.IgnoredColumns() { diff --git a/plugins/inputs/powerdns/powerdns.go b/plugins/inputs/powerdns/powerdns.go index 68b1696e0611f..e53373baf1ce4 100644 --- a/plugins/inputs/powerdns/powerdns.go +++ b/plugins/inputs/powerdns/powerdns.go @@ -41,7 +41,7 @@ func (p *Powerdns) Gather(acc telegraf.Accumulator) error { for _, serverSocket := range p.UnixSockets { if err := p.gatherServer(serverSocket, acc); err != nil { - return err + acc.AddError(err) } } diff --git a/plugins/inputs/powerdns/powerdns_test.go b/plugins/inputs/powerdns/powerdns_test.go index 78845c23d7bac..c2d4413e2a8db 100644 --- a/plugins/inputs/powerdns/powerdns_test.go +++ b/plugins/inputs/powerdns/powerdns_test.go @@ -90,7 +90,7 @@ func TestMemcachedGeneratesMetrics(t *testing.T) { var acc testutil.Accumulator - err = p.Gather(&acc) + err = acc.GatherError(p.Gather) require.NoError(t, err) intMetrics := []string{"corrupt-packets", "deferred-cache-inserts", diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index d689ecf3ed630..3715d390be0dd 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -71,9 +71,8 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error { procs, err := p.updateProcesses(p.procs) if err != nil { - return fmt.Errorf( - "E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s", - p.Exe, p.PidFile, p.Pattern, p.User, err.Error()) + acc.AddError(fmt.Errorf("E! Error: procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s", + p.Exe, p.PidFile, p.Pattern, p.User, err.Error())) } p.procs = procs diff --git a/plugins/inputs/procstat/procstat_test.go b/plugins/inputs/procstat/procstat_test.go index 1f6f2764253fd..97cf3458270e4 100644 --- a/plugins/inputs/procstat/procstat_test.go +++ b/plugins/inputs/procstat/procstat_test.go @@ -108,7 +108,7 @@ func TestGather_CreateProcessErrorOk(t *testing.T) { return nil, fmt.Errorf("createProcess error") }, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) } func TestGather_CreatePIDFinderError(t *testing.T) { @@ -120,7 +120,7 @@ func TestGather_CreatePIDFinderError(t *testing.T) { }, createProcess: newTestProc, } - require.Error(t, p.Gather(&acc)) + require.Error(t, acc.GatherError(p.Gather)) } func TestGather_ProcessName(t *testing.T) { @@ -132,7 +132,7 @@ func TestGather_ProcessName(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.Equal(t, "custom_name", acc.TagValue("procstat", "process_name")) } @@ -146,7 +146,7 @@ func TestGather_NoProcessNameUsesReal(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.True(t, acc.HasTag("procstat", "process_name")) } @@ -159,7 +159,7 @@ func TestGather_NoPidTag(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.True(t, acc.HasInt32Field("procstat", "pid")) assert.False(t, acc.HasTag("procstat", "pid")) } @@ -173,7 +173,7 @@ func TestGather_PidTag(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.Equal(t, "42", acc.TagValue("procstat", "pid")) assert.False(t, acc.HasInt32Field("procstat", "pid")) } @@ -187,7 +187,7 @@ func TestGather_Prefix(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.True(t, acc.HasInt32Field("procstat", "custom_prefix_num_fds")) } @@ -199,7 +199,7 @@ func TestGather_Exe(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.Equal(t, exe, acc.TagValue("procstat", "exe")) } @@ -213,7 +213,7 @@ func TestGather_User(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.Equal(t, user, acc.TagValue("procstat", "user")) } @@ -227,7 +227,7 @@ func TestGather_Pattern(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.Equal(t, pattern, acc.TagValue("procstat", "pattern")) } @@ -239,7 +239,7 @@ func TestGather_MissingPidMethod(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.Error(t, p.Gather(&acc)) + require.Error(t, acc.GatherError(p.Gather)) } func TestGather_PidFile(t *testing.T) { @@ -251,7 +251,7 @@ func TestGather_PidFile(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: newTestProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.Equal(t, pidfile, acc.TagValue("procstat", "pidfile")) } @@ -266,7 +266,7 @@ func TestGather_PercentFirstPass(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: NewProc, } - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) assert.True(t, acc.HasFloatField("procstat", "cpu_time_user")) assert.False(t, acc.HasFloatField("procstat", "cpu_usage")) @@ -282,8 +282,8 @@ func TestGather_PercentSecondPass(t *testing.T) { createPIDFinder: pidFinder([]PID{pid}, nil), createProcess: NewProc, } - require.NoError(t, p.Gather(&acc)) - require.NoError(t, p.Gather(&acc)) + require.NoError(t, acc.GatherError(p.Gather)) + require.NoError(t, acc.GatherError(p.Gather)) assert.True(t, acc.HasFloatField("procstat", "cpu_time_user")) assert.True(t, acc.HasFloatField("procstat", "cpu_usage")) diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index c1212796534d5..2e613e2c0a924 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -67,19 +67,17 @@ var ErrProtocolError = errors.New("prometheus protocol error") func (p *Prometheus) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error - for _, serv := range p.Urls { wg.Add(1) go func(serv string) { defer wg.Done() - outerr = p.gatherURL(serv, acc) + acc.AddError(p.gatherURL(serv, acc)) }(serv) } wg.Wait() - return outerr + return nil } var tr = &http.Transport{ diff --git a/plugins/inputs/prometheus/prometheus_test.go b/plugins/inputs/prometheus/prometheus_test.go index 4b316a3b48c24..64959b2e2aec0 100644 --- a/plugins/inputs/prometheus/prometheus_test.go +++ b/plugins/inputs/prometheus/prometheus_test.go @@ -41,7 +41,7 @@ func TestPrometheusGeneratesMetrics(t *testing.T) { var acc testutil.Accumulator - err := p.Gather(&acc) + err := acc.GatherError(p.Gather) require.NoError(t, err) assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index c33b11e66d3f4..d8bf459133fff 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -10,7 +10,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -136,7 +135,7 @@ type Node struct { } // gatherFunc ... -type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) +type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator) var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} @@ -198,16 +197,15 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup wg.Add(len(gatherFunctions)) - errChan := errchan.New(len(gatherFunctions)) for _, f := range gatherFunctions { go func(gf gatherFunc) { defer wg.Done() - gf(r, acc, errChan.C) + gf(r, acc) }(f) } wg.Wait() - return errChan.Error() + return nil } func (r *RabbitMQ) requestJSON(u string, target interface{}) error { @@ -245,17 +243,17 @@ func (r *RabbitMQ) requestJSON(u string, target interface{}) error { return nil } -func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { +func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) { overview := &OverviewResponse{} err := r.requestJSON("/api/overview", &overview) if err != nil { - errChan <- err + acc.AddError(err) return } if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil { - errChan <- fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue") + acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue")) return } @@ -277,16 +275,14 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { "messages_published": overview.MessageStats.Publish, } acc.AddFields("rabbitmq_overview", fields, tags) - - errChan <- nil } -func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { +func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { nodes := make([]Node, 0) // Gather information about nodes err := r.requestJSON("/api/nodes", &nodes) if err != nil { - errChan <- err + acc.AddError(err) return } now := time.Now() @@ -314,16 +310,14 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { } acc.AddFields("rabbitmq_node", fields, tags, now) } - - errChan <- nil } -func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { +func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { // Gather information about queues queues := make([]Queue, 0) err := r.requestJSON("/api/queues", &queues) if err != nil { - errChan <- err + acc.AddError(err) return } @@ -371,8 +365,6 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { tags, ) } - - errChan <- nil } func (r *RabbitMQ) shouldGatherNode(node Node) bool { diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index 4bdc980dbd53b..7ff0a5d5781a9 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -399,7 +399,7 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { var acc testutil.Accumulator - err := r.Gather(&acc) + err := acc.GatherError(r.Gather) require.NoError(t, err) intMetrics := []string{ diff --git a/plugins/inputs/raindrops/raindrops.go b/plugins/inputs/raindrops/raindrops.go index 6851f5d93b52e..bcbf773689f33 100644 --- a/plugins/inputs/raindrops/raindrops.go +++ b/plugins/inputs/raindrops/raindrops.go @@ -35,24 +35,24 @@ func (r *Raindrops) Description() string { func (r *Raindrops) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error for _, u := range r.Urls { addr, err := url.Parse(u) if err != nil { - return fmt.Errorf("Unable to parse address '%s': %s", u, err) + acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err)) + continue } wg.Add(1) go func(addr *url.URL) { defer wg.Done() - outerr = r.gatherUrl(addr, acc) + acc.AddError(r.gatherUrl(addr, acc)) }(addr) } wg.Wait() - return outerr + return nil } func (r *Raindrops) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/raindrops/raindrops_test.go b/plugins/inputs/raindrops/raindrops_test.go index 0dee9b1cc95ad..b0b601cec49cc 100644 --- a/plugins/inputs/raindrops/raindrops_test.go +++ b/plugins/inputs/raindrops/raindrops_test.go @@ -68,7 +68,7 @@ func TestRaindropsGeneratesMetrics(t *testing.T) { var acc testutil.Accumulator - err := n.Gather(&acc) + err := acc.GatherError(n.Gather) require.NoError(t, err) fields := map[string]interface{}{ diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go index 2dd947a2aec80..cb976cadac071 100644 --- a/plugins/inputs/redis/redis.go +++ b/plugins/inputs/redis/redis.go @@ -12,7 +12,6 @@ import ( "time" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal/errchan" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -66,7 +65,6 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - errChan := errchan.New(len(r.Servers)) for _, serv := range r.Servers { if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") { serv = "tcp://" + serv @@ -74,7 +72,8 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { u, err := url.Parse(serv) if err != nil { - return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err)) + continue } else if u.Scheme == "" { // fallback to simple string based address (i.e. "10.0.0.1:10000") u.Scheme = "tcp" @@ -91,12 +90,12 @@ func (r *Redis) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(serv string) { defer wg.Done() - errChan.C <- r.gatherServer(u, acc) + acc.AddError(r.gatherServer(u, acc)) }(serv) } wg.Wait() - return errChan.Error() + return nil } func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/redis/redis_test.go b/plugins/inputs/redis/redis_test.go index 0c378150026df..904a80d3720bb 100644 --- a/plugins/inputs/redis/redis_test.go +++ b/plugins/inputs/redis/redis_test.go @@ -25,7 +25,7 @@ func TestRedisConnect(t *testing.T) { var acc testutil.Accumulator - err := r.Gather(&acc) + err := acc.GatherError(r.Gather) require.NoError(t, err) } diff --git a/plugins/inputs/rethinkdb/rethinkdb.go b/plugins/inputs/rethinkdb/rethinkdb.go index 32237a80f372c..e32f79731d699 100644 --- a/plugins/inputs/rethinkdb/rethinkdb.go +++ b/plugins/inputs/rethinkdb/rethinkdb.go @@ -44,12 +44,11 @@ func (r *RethinkDB) Gather(acc telegraf.Accumulator) error { var wg sync.WaitGroup - var outerr error - for _, serv := range r.Servers { u, err := url.Parse(serv) if err != nil { - return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) + acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err)) + continue } else if u.Scheme == "" { // fallback to simple string based address (i.e. "10.0.0.1:10000") u.Host = serv @@ -57,13 +56,13 @@ func (r *RethinkDB) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(serv string) { defer wg.Done() - outerr = r.gatherServer(&Server{Url: u}, acc) + acc.AddError(r.gatherServer(&Server{Url: u}, acc)) }(serv) } wg.Wait() - return outerr + return nil } func (r *RethinkDB) gatherServer(server *Server, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/riak/riak.go b/plugins/inputs/riak/riak.go index 19bf7df04a118..9ddbbfa651351 100644 --- a/plugins/inputs/riak/riak.go +++ b/plugins/inputs/riak/riak.go @@ -104,9 +104,7 @@ func (r *Riak) Gather(acc telegraf.Accumulator) error { // Range over all servers, gathering stats. Returns early in case of any error. for _, s := range r.Servers { - if err := r.gatherServer(s, acc); err != nil { - return err - } + acc.AddError(r.gatherServer(s, acc)) } return nil diff --git a/plugins/inputs/snmp_legacy/snmp_legacy.go b/plugins/inputs/snmp_legacy/snmp_legacy.go index e5dbbc459dbc4..57f9f4fe24738 100644 --- a/plugins/inputs/snmp_legacy/snmp_legacy.go +++ b/plugins/inputs/snmp_legacy/snmp_legacy.go @@ -1,6 +1,7 @@ package snmp_legacy import ( + "fmt" "io/ioutil" "log" "net" @@ -394,16 +395,16 @@ func (s *Snmp) Gather(acc telegraf.Accumulator) error { // only if len(s.OidInstanceMapping) == 0 if len(host.OidInstanceMapping) >= 0 { if err := host.SNMPMap(acc, s.nameToOid, s.subTableMap); err != nil { - log.Printf("E! SNMP Mapping error for host '%s': %s", host.Address, err) + acc.AddError(fmt.Errorf("E! SNMP Mapping error for host '%s': %s", host.Address, err)) continue } } // Launch Get requests if err := host.SNMPGet(acc, s.initNode); err != nil { - log.Printf("E! SNMP Error for host '%s': %s", host.Address, err) + acc.AddError(fmt.Errorf("E! SNMP Error for host '%s': %s", host.Address, err)) } if err := host.SNMPBulk(acc, s.initNode); err != nil { - log.Printf("E! SNMP Error for host '%s': %s", host.Address, err) + acc.AddError(fmt.Errorf("E! SNMP Error for host '%s': %s", host.Address, err)) } } return nil diff --git a/plugins/inputs/sqlserver/sqlserver.go b/plugins/inputs/sqlserver/sqlserver.go index 5afbb067eae8f..1e7f92dff3a8a 100644 --- a/plugins/inputs/sqlserver/sqlserver.go +++ b/plugins/inputs/sqlserver/sqlserver.go @@ -79,20 +79,19 @@ func (s *SQLServer) Gather(acc telegraf.Accumulator) error { } var wg sync.WaitGroup - var outerr error for _, serv := range s.Servers { for _, query := range queries { wg.Add(1) go func(serv string, query Query) { defer wg.Done() - outerr = s.gatherServer(serv, query, acc) + acc.AddError(s.gatherServer(serv, query, acc)) }(serv, query) } } wg.Wait() - return outerr + return nil } func (s *SQLServer) gatherServer(server string, query Query, acc telegraf.Accumulator) error { diff --git a/plugins/inputs/sysstat/sysstat.go b/plugins/inputs/sysstat/sysstat.go index 27e18100263fc..551cd37aad53d 100644 --- a/plugins/inputs/sysstat/sysstat.go +++ b/plugins/inputs/sysstat/sysstat.go @@ -5,7 +5,6 @@ package sysstat import ( "bufio" "encoding/csv" - "errors" "fmt" "io" "log" @@ -149,34 +148,20 @@ func (s *Sysstat) Gather(acc telegraf.Accumulator) error { return err } var wg sync.WaitGroup - errorChannel := make(chan error, len(s.Options)*2) for option := range s.Options { wg.Add(1) go func(acc telegraf.Accumulator, option string) { defer wg.Done() - if err := s.parse(acc, option, ts); err != nil { - errorChannel <- err - } + acc.AddError(s.parse(acc, option, ts)) }(acc, option) } wg.Wait() - close(errorChannel) - - errorStrings := []string{} - for err := range errorChannel { - errorStrings = append(errorStrings, err.Error()) - } if _, err := os.Stat(s.tmpFile); err == nil { - if err := os.Remove(s.tmpFile); err != nil { - errorStrings = append(errorStrings, err.Error()) - } + acc.AddError(os.Remove(s.tmpFile)) } - if len(errorStrings) == 0 { - return nil - } - return errors.New(strings.Join(errorStrings, "\n")) + return nil } // collect collects sysstat data with the collector utility sadc. diff --git a/plugins/inputs/sysstat/sysstat_interval_test.go b/plugins/inputs/sysstat/sysstat_interval_test.go index 747158392bd80..972eb9af936de 100644 --- a/plugins/inputs/sysstat/sysstat_interval_test.go +++ b/plugins/inputs/sysstat/sysstat_interval_test.go @@ -26,14 +26,14 @@ func TestInterval(t *testing.T) { s.interval = 0 wantedInterval := 3 - err := s.Gather(&acc) + err := acc.GatherError(s.Gather) if err != nil { t.Fatal(err) } time.Sleep(time.Duration(wantedInterval) * time.Second) - err = s.Gather(&acc) + err = acc.GatherError(s.Gather) if err != nil { t.Fatal(err) } diff --git a/plugins/inputs/sysstat/sysstat_test.go b/plugins/inputs/sysstat/sysstat_test.go index e344d307ddc44..876e6d2c80169 100644 --- a/plugins/inputs/sysstat/sysstat_test.go +++ b/plugins/inputs/sysstat/sysstat_test.go @@ -37,7 +37,7 @@ func TestGather(t *testing.T) { defer func() { execCommand = exec.Command }() var acc testutil.Accumulator - err := s.Gather(&acc) + err := acc.GatherError(s.Gather) if err != nil { t.Fatal(err) } @@ -160,7 +160,7 @@ func TestGatherGrouped(t *testing.T) { defer func() { execCommand = exec.Command }() var acc testutil.Accumulator - err := s.Gather(&acc) + err := acc.GatherError(s.Gather) if err != nil { t.Fatal(err) } diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index f57d970cf3a56..151b3c685211b 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -80,7 +80,6 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { } } - var errS string // Create a "tailer" for each file for _, filepath := range t.Files { g, err := globpath.Compile(filepath) @@ -97,7 +96,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { Pipe: t.Pipe, }) if err != nil { - errS += err.Error() + " " + acc.AddError(err) continue } // create a goroutine for each "tailer" @@ -107,9 +106,6 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { } } - if errS != "" { - return fmt.Errorf(errS) - } return nil } diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 7ddb502f946dc..9e3888a1d9b60 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -30,7 +30,7 @@ func TestTailFromBeginning(t *testing.T) { acc := testutil.Accumulator{} require.NoError(t, tt.Start(&acc)) - require.NoError(t, tt.Gather(&acc)) + require.NoError(t, acc.GatherError(tt.Gather)) acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu", @@ -67,7 +67,7 @@ func TestTailFromEnd(t *testing.T) { _, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n") require.NoError(t, err) - require.NoError(t, tt.Gather(&acc)) + require.NoError(t, acc.GatherError(tt.Gather)) acc.Wait(1) acc.AssertContainsTaggedFields(t, "cpu", @@ -98,7 +98,7 @@ func TestTailBadLine(t *testing.T) { _, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n") require.NoError(t, err) - require.NoError(t, tt.Gather(&acc)) + require.NoError(t, acc.GatherError(tt.Gather)) acc.WaitError(1) assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line") diff --git a/plugins/inputs/varnish/varnish.go b/plugins/inputs/varnish/varnish.go index c750412219d8d..896215aa021e3 100644 --- a/plugins/inputs/varnish/varnish.go +++ b/plugins/inputs/varnish/varnish.go @@ -6,7 +6,6 @@ import ( "bufio" "bytes" "fmt" - "os" "os/exec" "strconv" "strings" @@ -124,8 +123,8 @@ func (s *Varnish) Gather(acc telegraf.Accumulator) error { sectionMap[section][field], err = strconv.ParseUint(value, 10, 64) if err != nil { - fmt.Fprintf(os.Stderr, "Expected a numeric value for %s = %v\n", - stat, value) + acc.AddError(fmt.Errorf("Expected a numeric value for %s = %v\n", + stat, value)) } } diff --git a/plugins/inputs/webhooks/webhooks.go b/plugins/inputs/webhooks/webhooks.go index 7ed1ccd5191c2..9a86b7effe690 100644 --- a/plugins/inputs/webhooks/webhooks.go +++ b/plugins/inputs/webhooks/webhooks.go @@ -79,7 +79,7 @@ func (wb *Webhooks) Listen(acc telegraf.Accumulator) { err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r) if err != nil { - log.Printf("E! Error starting server: %v", err) + acc.AddError(fmt.Errorf("E! Error starting server: %v", err)) } } diff --git a/plugins/inputs/zookeeper/zookeeper.go b/plugins/inputs/zookeeper/zookeeper.go index c11b55f68822a..977a1f1edae08 100644 --- a/plugins/inputs/zookeeper/zookeeper.go +++ b/plugins/inputs/zookeeper/zookeeper.go @@ -47,9 +47,7 @@ func (z *Zookeeper) Gather(acc telegraf.Accumulator) error { } for _, serverAddress := range z.Servers { - if err := z.gatherServer(serverAddress, acc); err != nil { - return err - } + acc.AddError(z.gatherServer(serverAddress, acc)) } return nil } diff --git a/plugins/inputs/zookeeper/zookeeper_test.go b/plugins/inputs/zookeeper/zookeeper_test.go index bc02ffb9d62b1..e8bcc11d5f671 100644 --- a/plugins/inputs/zookeeper/zookeeper_test.go +++ b/plugins/inputs/zookeeper/zookeeper_test.go @@ -19,8 +19,7 @@ func TestZookeeperGeneratesMetrics(t *testing.T) { var acc testutil.Accumulator - err := z.Gather(&acc) - require.NoError(t, err) + require.NoError(t, acc.GatherError(z.Gather)) intMetrics := []string{ "avg_latency", diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 9ebf77cf729c5..75472f944d500 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -187,6 +187,17 @@ func (a *Accumulator) TagValue(measurement string, key string) string { return "" } +// Calls the given Gather function and returns the first error found. +func (a *Accumulator) GatherError(gf func(telegraf.Accumulator) error) error { + if err := gf(a); err != nil { + return err + } + if len(a.Errors) > 0 { + return a.Errors[0] + } + return nil +} + // NFields returns the total number of fields in the accumulator, across all // measurements func (a *Accumulator) NFields() int {