From 67f588cbce90b04bb98518033a61fbc6238db6b6 Mon Sep 17 00:00:00 2001 From: Helen Weller <38860767+helenosheaa@users.noreply.github.com> Date: Thu, 18 Mar 2021 11:33:58 -0400 Subject: [PATCH] New prometheus remote write parser (#8967) --- docs/DATA_FORMATS_INPUT.md | 1 + .../parsers/prometheusremotewrite/README.md | 44 +++++ .../parsers/prometheusremotewrite/parser.go | 88 ++++++++++ .../prometheusremotewrite/parser_test.go | 157 ++++++++++++++++++ plugins/parsers/registry.go | 9 + .../prometheusremotewrite_test.go | 7 +- 6 files changed, 303 insertions(+), 3 deletions(-) create mode 100644 plugins/parsers/prometheusremotewrite/README.md create mode 100644 plugins/parsers/prometheusremotewrite/parser.go create mode 100644 plugins/parsers/prometheusremotewrite/parser_test.go diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 3e7dd107becf5..2550e7e1044cc 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -15,6 +15,7 @@ Protocol or in JSON format. - [Logfmt](/plugins/parsers/logfmt) - [Nagios](/plugins/parsers/nagios) - [Prometheus](/plugins/parsers/prometheus) +- [PrometheusRemoteWrite](/plugins/parsers/prometheusremotewrite) - [Value](/plugins/parsers/value), ie: 45 or "booyah" - [Wavefront](/plugins/parsers/wavefront) - [XML](/plugins/parsers/xml) diff --git a/plugins/parsers/prometheusremotewrite/README.md b/plugins/parsers/prometheusremotewrite/README.md new file mode 100644 index 0000000000000..1bad5bd6004ea --- /dev/null +++ b/plugins/parsers/prometheusremotewrite/README.md @@ -0,0 +1,44 @@ +# Prometheus remote write + +Converts prometheus remote write samples directly into Telegraf metrics. It can be used with [http_listener_v2](/plugins/inputs/http_listener_v2). There are no additional configuration options for Prometheus Remote Write Samples. + +### Configuration + +```toml +[[inputs.http_listener_v2]] + ## Address and port to host HTTP listener on + service_address = ":1234" + + ## Path to listen to. + path = "/recieve" + + ## Data format to consume. + data_format = "prometheusremotewrite" +``` + +### Example + +**Example Input** +``` +prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + {Name: "__name__", Value: "go_gc_duration_seconds"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "prometheus"}, + {Name: "quantile", Value: "0.99"}, + }, + Samples: []prompb.Sample{ + {Value: 4.63, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + }, + }, + } + +``` + +**Example Output** +``` +prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_gc_duration_seconds=4.63 1614889298859000000 +``` diff --git a/plugins/parsers/prometheusremotewrite/parser.go b/plugins/parsers/prometheusremotewrite/parser.go new file mode 100644 index 0000000000000..90921dfb14e7a --- /dev/null +++ b/plugins/parsers/prometheusremotewrite/parser.go @@ -0,0 +1,88 @@ +package prometheusremotewrite + +import ( + "fmt" + "math" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" +) + +type Parser struct { + DefaultTags map[string]string +} + +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + var err error + var metrics []telegraf.Metric + var req prompb.WriteRequest + + if err := proto.Unmarshal(buf, &req); err != nil { + return nil, fmt.Errorf("unable to unmarshal request body: %s", err) + } + + now := time.Now() + + for _, ts := range req.Timeseries { + tags := map[string]string{} + for key, value := range p.DefaultTags { + tags[key] = value + } + + for _, l := range ts.Labels { + tags[l.Name] = l.Value + } + + metricName := tags[model.MetricNameLabel] + if metricName == "" { + return nil, fmt.Errorf("metric name %q not found in tag-set or empty", model.MetricNameLabel) + } + delete(tags, model.MetricNameLabel) + + for _, s := range ts.Samples { + fields := make(map[string]interface{}) + if !math.IsNaN(s.Value) { + fields[metricName] = s.Value + } + // converting to telegraf metric + if len(fields) > 0 { + t := now + if s.Timestamp > 0 { + t = time.Unix(0, s.Timestamp*1000000) + } + m, err := metric.New("prometheus_remote_write", tags, fields, t) + if err != nil { + return nil, fmt.Errorf("unable to convert to telegraf metric: %s", err) + } + metrics = append(metrics, m) + } + } + } + return metrics, err +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, fmt.Errorf("No metrics in line") + } + + if len(metrics) > 1 { + return nil, fmt.Errorf("More than one metric in line") + } + + return metrics[0], nil +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} diff --git a/plugins/parsers/prometheusremotewrite/parser_test.go b/plugins/parsers/prometheusremotewrite/parser_test.go new file mode 100644 index 0000000000000..d32b90673fdb3 --- /dev/null +++ b/plugins/parsers/prometheusremotewrite/parser_test.go @@ -0,0 +1,157 @@ +package prometheusremotewrite + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" +) + +func TestParse(t *testing.T) { + prompbInput := prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + {Name: "__name__", Value: "go_gc_duration_seconds"}, + {Name: "quantile", Value: "0.99"}, + }, + Samples: []prompb.Sample{ + {Value: 4.63, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + }, + { + Labels: []*prompb.Label{ + {Name: "__name__", Value: "prometheus_target_interval_length_seconds"}, + {Name: "job", Value: "prometheus"}, + }, + Samples: []prompb.Sample{ + {Value: 14.99, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + }, + }, + } + + inoutBytes, err := prompbInput.Marshal() + assert.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus_remote_write", + map[string]string{ + "quantile": "0.99", + }, + map[string]interface{}{ + "go_gc_duration_seconds": float64(4.63), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "prometheus_remote_write", + map[string]string{ + "job": "prometheus", + }, + map[string]interface{}{ + "prometheus_target_interval_length_seconds": float64(14.99), + }, + time.Unix(0, 0), + ), + } + + parser := Parser{ + DefaultTags: map[string]string{}, + } + + metrics, err := parser.Parse(inoutBytes) + assert.NoError(t, err) + assert.Len(t, metrics, 2) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +} + +func TestDefaultTags(t *testing.T) { + prompbInput := prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + {Name: "__name__", Value: "foo"}, + {Name: "__eg__", Value: "bar"}, + }, + Samples: []prompb.Sample{ + {Value: 1, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()}, + }, + }, + }, + } + + inoutBytes, err := prompbInput.Marshal() + assert.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus_remote_write", + map[string]string{ + "defaultTag": "defaultTagValue", + "__eg__": "bar", + }, + map[string]interface{}{ + "foo": float64(1), + }, + time.Unix(0, 0), + ), + } + + parser := Parser{ + DefaultTags: map[string]string{ + "defaultTag": "defaultTagValue", + }, + } + + metrics, err := parser.Parse(inoutBytes) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +} + +func TestMetricsWithTimestamp(t *testing.T) { + testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC) + testTimeUnix := testTime.UnixNano() / int64(time.Millisecond) + prompbInput := prompb.WriteRequest{ + Timeseries: []*prompb.TimeSeries{ + { + Labels: []*prompb.Label{ + {Name: "__name__", Value: "foo"}, + {Name: "__eg__", Value: "bar"}, + }, + Samples: []prompb.Sample{ + {Value: 1, Timestamp: testTimeUnix}, + }, + }, + }, + } + + inoutBytes, err := prompbInput.Marshal() + assert.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus_remote_write", + map[string]string{ + "__eg__": "bar", + }, + map[string]interface{}{ + "foo": float64(1), + }, + testTime, + ), + } + parser := Parser{ + DefaultTags: map[string]string{}, + } + + metrics, err := parser.Parse(inoutBytes) + assert.NoError(t, err) + assert.Len(t, metrics, 1) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.SortMetrics()) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index d01b0ee676565..b2e66636cb1b8 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/prometheus" + "github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite" "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/plugins/parsers/wavefront" "github.com/influxdata/telegraf/plugins/parsers/xml" @@ -248,6 +249,8 @@ func NewParser(config *Config) (Parser, error) { ) case "prometheus": parser, err = NewPrometheusParser(config.DefaultTags) + case "prometheusremotewrite": + parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags) case "xml": parser, err = NewXMLParser(config.MetricName, config.DefaultTags, config.XMLConfig) default: @@ -361,6 +364,12 @@ func NewPrometheusParser(defaultTags map[string]string) (Parser, error) { }, nil } +func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, error) { + return &prometheusremotewrite.Parser{ + DefaultTags: defaultTags, + }, nil +} + func NewXMLParser(metricName string, defaultTags map[string]string, xmlConfigs []XMLConfig) (Parser, error) { // Convert the config formats which is a one-to-one copy configs := make([]xml.Config, len(xmlConfigs)) diff --git a/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go b/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go index 8aecd8ebca9bf..32aba632082b6 100644 --- a/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go +++ b/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go @@ -3,13 +3,14 @@ package prometheusremotewrite import ( "bytes" "fmt" + "strings" + "testing" + "time" + "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" - "strings" - "testing" - "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil"