From f74687dcc0ceb128027414c7099f7c70df86d1ad Mon Sep 17 00:00:00 2001 From: Sebastian Borza Date: Fri, 12 May 2017 16:42:18 -0500 Subject: [PATCH] split metrics based on UDPPayload size (#2795) --- plugins/outputs/influxdb/client/udp.go | 10 +++++++--- plugins/outputs/influxdb/influxdb.go | 16 +++++++++++----- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/plugins/outputs/influxdb/client/udp.go b/plugins/outputs/influxdb/client/udp.go index d542ecf634b2f..89d6894a15baf 100644 --- a/plugins/outputs/influxdb/client/udp.go +++ b/plugins/outputs/influxdb/client/udp.go @@ -25,6 +25,7 @@ type UDPConfig struct { PayloadSize int } +// NewUDP will return an instance of the telegraf UDP output plugin for influxdb func NewUDP(config UDPConfig) (Client, error) { p, err := url.Parse(config.URL) if err != nil { @@ -55,20 +56,22 @@ type udpClient struct { buffer []byte } +// Query will send the provided query command to the client, returning an error if any issues arise func (c *udpClient) Query(command string) error { return nil } +// Write will send the byte stream to the given UDP client endpoint func (c *udpClient) Write(b []byte) (int, error) { return c.WriteStream(bytes.NewReader(b), -1) } -// write params are ignored by the UDP client +// WriteWithParams are ignored by the UDP client, will forward to WriteStream func (c *udpClient) WriteWithParams(b []byte, wp WriteParams) (int, error) { return c.WriteStream(bytes.NewReader(b), -1) } -// contentLength is ignored by the UDP client. +// WriteStream will send the provided data through to the client, contentLength is ignored by the UDP client func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { var totaln int for { @@ -88,12 +91,13 @@ func (c *udpClient) WriteStream(r io.Reader, contentLength int) (int, error) { return totaln, nil } -// contentLength is ignored by the UDP client. +// WriteStreamWithParams will forward the stream to the client backend, contentLength is ignored by the UDP client // write params are ignored by the UDP client func (c *udpClient) WriteStreamWithParams(r io.Reader, contentLength int, wp WriteParams) (int, error) { return c.WriteStream(r, -1) } +// Close will terminate the provided client connection func (c *udpClient) Close() error { return c.conn.Close() } diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 2dac7a00aa488..d127d3fb4b8ef 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/plugins/outputs/influxdb/client" ) +// InfluxDB struct is the primary data structure for the plugin type InfluxDB struct { // URL is only for backwards compatability URL string @@ -79,11 +80,10 @@ var sampleConfig = ` # insecure_skip_verify = false ` +// Connect initiates the primary connection to the range of provided URLs func (i *InfluxDB) Connect() error { var urls []string - for _, u := range i.URLs { - urls = append(urls, u) - } + urls = append(urls, i.URLs...) // Backward-compatability with single Influx URL config files // This could eventually be removed in favor of specifying the urls as a list @@ -144,26 +144,32 @@ func (i *InfluxDB) Connect() error { return nil } +// Close will terminate the session to the backend, returning error if an issue arises func (i *InfluxDB) Close() error { return nil } +// SampleConfig returns the formatted sample configuration for the plugin func (i *InfluxDB) SampleConfig() string { return sampleConfig } +// Description returns the human-readable function definition of the plugin func (i *InfluxDB) Description() string { return "Configuration for influxdb server to send metrics to" } -// Choose a random server in the cluster to write to until a successful write +// Write will choose a random server in the cluster to write to until a successful write // occurs, logging each unsuccessful. If all servers fail, return error. func (i *InfluxDB) Write(metrics []telegraf.Metric) error { bufsize := 0 + splitData := make([]telegraf.Metric, 0) + for _, m := range metrics { bufsize += m.Len() + splitData = append(splitData, m.Split(i.UDPPayload)...) } - r := metric.NewReader(metrics) + r := metric.NewReader(splitData) // This will get set to nil if a successful write occurs err := fmt.Errorf("Could not write to any InfluxDB server in cluster")