Skip to content

Commit

Permalink
New prometheus remote write parser (influxdata#8967)
Browse files Browse the repository at this point in the history
  • Loading branch information
helenosheaa authored Mar 18, 2021
1 parent cc6c51c commit 67f588c
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Protocol or in JSON format.
- [Logfmt](/plugins/parsers/logfmt)
- [Nagios](/plugins/parsers/nagios)
- [Prometheus](/plugins/parsers/prometheus)
- [PrometheusRemoteWrite](/plugins/parsers/prometheusremotewrite)
- [Value](/plugins/parsers/value), ie: 45 or "booyah"
- [Wavefront](/plugins/parsers/wavefront)
- [XML](/plugins/parsers/xml)
Expand Down
44 changes: 44 additions & 0 deletions plugins/parsers/prometheusremotewrite/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Prometheus remote write

Converts prometheus remote write samples directly into Telegraf metrics. It can be used with [http_listener_v2](/plugins/inputs/http_listener_v2). There are no additional configuration options for Prometheus Remote Write Samples.

### Configuration

```toml
[[inputs.http_listener_v2]]
## Address and port to host HTTP listener on
service_address = ":1234"

## Path to listen to.
path = "/recieve"

## Data format to consume.
data_format = "prometheusremotewrite"
```

### Example

**Example Input**
```
prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
{
Labels: []*prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "localhost:9090"},
{Name: "job", Value: "prometheus"},
{Name: "quantile", Value: "0.99"},
},
Samples: []prompb.Sample{
{Value: 4.63, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
},
},
},
}
```

**Example Output**
```
prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_gc_duration_seconds=4.63 1614889298859000000
```
88 changes: 88 additions & 0 deletions plugins/parsers/prometheusremotewrite/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package prometheusremotewrite

import (
"fmt"
"math"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"

"github.com/gogo/protobuf/proto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)

type Parser struct {
DefaultTags map[string]string
}

func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
var err error
var metrics []telegraf.Metric
var req prompb.WriteRequest

if err := proto.Unmarshal(buf, &req); err != nil {
return nil, fmt.Errorf("unable to unmarshal request body: %s", err)
}

now := time.Now()

for _, ts := range req.Timeseries {
tags := map[string]string{}
for key, value := range p.DefaultTags {
tags[key] = value
}

for _, l := range ts.Labels {
tags[l.Name] = l.Value
}

metricName := tags[model.MetricNameLabel]
if metricName == "" {
return nil, fmt.Errorf("metric name %q not found in tag-set or empty", model.MetricNameLabel)
}
delete(tags, model.MetricNameLabel)

for _, s := range ts.Samples {
fields := make(map[string]interface{})
if !math.IsNaN(s.Value) {
fields[metricName] = s.Value
}
// converting to telegraf metric
if len(fields) > 0 {
t := now
if s.Timestamp > 0 {
t = time.Unix(0, s.Timestamp*1000000)
}
m, err := metric.New("prometheus_remote_write", tags, fields, t)
if err != nil {
return nil, fmt.Errorf("unable to convert to telegraf metric: %s", err)
}
metrics = append(metrics, m)
}
}
}
return metrics, err
}

func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, fmt.Errorf("No metrics in line")
}

if len(metrics) > 1 {
return nil, fmt.Errorf("More than one metric in line")
}

return metrics[0], nil
}

func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
157 changes: 157 additions & 0 deletions plugins/parsers/prometheusremotewrite/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package prometheusremotewrite

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
)

func TestParse(t *testing.T) {
prompbInput := prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
{
Labels: []*prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "quantile", Value: "0.99"},
},
Samples: []prompb.Sample{
{Value: 4.63, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
},
},
{
Labels: []*prompb.Label{
{Name: "__name__", Value: "prometheus_target_interval_length_seconds"},
{Name: "job", Value: "prometheus"},
},
Samples: []prompb.Sample{
{Value: 14.99, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
},
},
},
}

inoutBytes, err := prompbInput.Marshal()
assert.NoError(t, err)

expected := []telegraf.Metric{
testutil.MustMetric(
"prometheus_remote_write",
map[string]string{
"quantile": "0.99",
},
map[string]interface{}{
"go_gc_duration_seconds": float64(4.63),
},
time.Unix(0, 0),
),
testutil.MustMetric(
"prometheus_remote_write",
map[string]string{
"job": "prometheus",
},
map[string]interface{}{
"prometheus_target_interval_length_seconds": float64(14.99),
},
time.Unix(0, 0),
),
}

parser := Parser{
DefaultTags: map[string]string{},
}

metrics, err := parser.Parse(inoutBytes)
assert.NoError(t, err)
assert.Len(t, metrics, 2)
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics())
}

func TestDefaultTags(t *testing.T) {
prompbInput := prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
{
Labels: []*prompb.Label{
{Name: "__name__", Value: "foo"},
{Name: "__eg__", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixNano()},
},
},
},
}

inoutBytes, err := prompbInput.Marshal()
assert.NoError(t, err)

expected := []telegraf.Metric{
testutil.MustMetric(
"prometheus_remote_write",
map[string]string{
"defaultTag": "defaultTagValue",
"__eg__": "bar",
},
map[string]interface{}{
"foo": float64(1),
},
time.Unix(0, 0),
),
}

parser := Parser{
DefaultTags: map[string]string{
"defaultTag": "defaultTagValue",
},
}

metrics, err := parser.Parse(inoutBytes)
assert.NoError(t, err)
assert.Len(t, metrics, 1)
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics())
}

func TestMetricsWithTimestamp(t *testing.T) {
testTime := time.Date(2020, time.October, 4, 17, 0, 0, 0, time.UTC)
testTimeUnix := testTime.UnixNano() / int64(time.Millisecond)
prompbInput := prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
{
Labels: []*prompb.Label{
{Name: "__name__", Value: "foo"},
{Name: "__eg__", Value: "bar"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: testTimeUnix},
},
},
},
}

inoutBytes, err := prompbInput.Marshal()
assert.NoError(t, err)

expected := []telegraf.Metric{
testutil.MustMetric(
"prometheus_remote_write",
map[string]string{
"__eg__": "bar",
},
map[string]interface{}{
"foo": float64(1),
},
testTime,
),
}
parser := Parser{
DefaultTags: map[string]string{},
}

metrics, err := parser.Parse(inoutBytes)
assert.NoError(t, err)
assert.Len(t, metrics, 1)
testutil.RequireMetricsEqual(t, expected, metrics, testutil.SortMetrics())
}
9 changes: 9 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
"github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
"github.com/influxdata/telegraf/plugins/parsers/xml"
Expand Down Expand Up @@ -248,6 +249,8 @@ func NewParser(config *Config) (Parser, error) {
)
case "prometheus":
parser, err = NewPrometheusParser(config.DefaultTags)
case "prometheusremotewrite":
parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags)
case "xml":
parser, err = NewXMLParser(config.MetricName, config.DefaultTags, config.XMLConfig)
default:
Expand Down Expand Up @@ -361,6 +364,12 @@ func NewPrometheusParser(defaultTags map[string]string) (Parser, error) {
}, nil
}

func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, error) {
return &prometheusremotewrite.Parser{
DefaultTags: defaultTags,
}, nil
}

func NewXMLParser(metricName string, defaultTags map[string]string, xmlConfigs []XMLConfig) (Parser, error) {
// Convert the config formats which is a one-to-one copy
configs := make([]xml.Config, len(xmlConfigs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package prometheusremotewrite
import (
"bytes"
"fmt"
"strings"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"strings"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
Expand Down

0 comments on commit 67f588c

Please sign in to comment.