Skip to content

Commit

Permalink
Carbon2 serializer: sanitize metric name (influxdata#9026)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek-sumo authored Apr 8, 2021
1 parent f0c8549 commit 2b41a1e
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 26 deletions.
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
c.getFieldString(tbl, "template", &sc.Template)
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format)
c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar)
c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes)

c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
Expand Down Expand Up @@ -1449,9 +1450,9 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,

func (c *Config) missingTomlField(_ reflect.Type, key string) error {
switch key {
case "alias", "carbon2_format", "collectd_auth_file", "collectd_parse_multivalue",
"collectd_security_level", "collectd_typesdb", "collection_jitter", "csv_column_names",
"csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns",
"csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
Expand Down
16 changes: 8 additions & 8 deletions plugins/outputs/sumologic/sumologic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode)
})

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

tt.plugin.SetSerializer(serializer)
Expand All @@ -199,7 +199,7 @@ func TestContentType(t *testing.T) {
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
s.SetSerializer(sr)
return s
Expand All @@ -213,7 +213,7 @@ func TestContentType(t *testing.T) {
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField))
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
s.SetSerializer(sr)
return s
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
})

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -345,7 +345,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize,
}

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

plugin.SetSerializer(serializer)
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK)
})

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)

plugin := tt.plugin()
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
plugin := Default()
plugin.URL = u.String()

serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
plugin.SetSerializer(serializer)

Expand Down
16 changes: 16 additions & 0 deletions plugins/serializers/carbon2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f
## * "metric_includes_field"
## * "" - defaults to "field_separate"
# carbon2_format = "field_separate"

## Character used for replacing sanitized characters. By default ":" is used.
## The following character set is being replaced with sanitize replace char:
## !@#$%^&*()+`'\"[]{};<>,?/\\|=
# carbon2_sanitize_replace_char = ":"
```

Standard form:
Expand Down Expand Up @@ -52,6 +57,17 @@ metric=name_field_2 host=foo 4 1234567890
metric=name_field_N host=foo 59 1234567890
```

### Metric name sanitization

In order to sanitize the metric name one can specify `carbon2_sanitize_replace_char`
in order to replace the following characters in the metric name:

```
!@#$%^&*()+`'\"[]{};<>,?/\\|=
```

By default they will be replaced with `:`.

## Metrics

The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields.
Expand Down
36 changes: 31 additions & 5 deletions plugins/serializers/carbon2/carbon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package carbon2

import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -23,11 +24,23 @@ var formats = map[format]struct{}{
Carbon2FormatMetricIncludesField: {},
}

const (
DefaultSanitizeReplaceChar = ":"
sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|="
)

type Serializer struct {
metricsFormat format
metricsFormat format
sanitizeReplacer *strings.Replacer
}

func NewSerializer(metricsFormat string) (*Serializer, error) {
func NewSerializer(metricsFormat string, sanitizeReplaceChar string) (*Serializer, error) {
if sanitizeReplaceChar == "" {
sanitizeReplaceChar = DefaultSanitizeReplaceChar
} else if len(sanitizeReplaceChar) > 1 {
return nil, errors.New("sanitize replace char has to be a singular character")
}

var f = format(metricsFormat)

if _, ok := formats[f]; !ok {
Expand All @@ -40,7 +53,8 @@ func NewSerializer(metricsFormat string) (*Serializer, error) {
}

return &Serializer{
metricsFormat: f,
metricsFormat: f,
sanitizeReplacer: createSanitizeReplacer(sanitizedChars, rune(sanitizeReplaceChar[0])),
}, nil
}

Expand All @@ -65,15 +79,17 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
continue
}

name := s.sanitizeReplacer.Replace(metric.Name())

switch metricsFormat {
case Carbon2FormatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName,
name, fieldName,
))

case Carbon2FormatMetricIncludesField:
m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName,
name, fieldName,
))
}

Expand Down Expand Up @@ -152,3 +168,13 @@ func bool2int(b bool) int {
}
return i
}

// createSanitizeReplacer creates string replacer replacing all provided
// characters with the replaceChar.
func createSanitizeReplacer(sanitizedChars string, replaceChar rune) *strings.Replacer {
sanitizeCharPairs := make([]string, 0, 2*len(sanitizedChars))
for _, c := range sanitizedChars {
sanitizeCharPairs = append(sanitizeCharPairs, string(c), string(replaceChar))
}
return strings.NewReplacer(sanitizeCharPairs...)
}
124 changes: 117 additions & 7 deletions plugins/serializers/carbon2/carbon2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestSerializeMetricFloat(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSerializeWithSpaces(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestSerializeMetricInt(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestSerializeMetricString(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(m)
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestSerializeMetricBool(t *testing.T) {

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.Serialize(tc.metric)
Expand Down Expand Up @@ -300,7 +300,7 @@ metric=cpu_value 42 0

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)

buf, err := s.SerializeBatch(metrics)
Expand All @@ -310,3 +310,113 @@ metric=cpu_value 42 0
})
}
}

func TestSerializeMetricIsProperlySanitized(t *testing.T) {
now := time.Now()

testcases := []struct {
metricFunc func() (telegraf.Metric, error)
format format
expected string
replaceChar string
expectedErr bool
}{
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1 field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu_1 field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: "_",
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace_usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_1_tmp_custom_namespace_usage_idle 91.5 %d\n", now.Unix()),
replaceChar: "_",
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expectedErr: true,
replaceChar: "___",
},
}

for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
m, err := tc.metricFunc()
require.NoError(t, err)

s, err := NewSerializer(string(tc.format), tc.replaceChar)
if tc.expectedErr {
require.Error(t, err)
return
}

require.NoError(t, err)

buf, err := s.Serialize(m)
require.NoError(t, err)

assert.Equal(t, tc.expected, string(buf))
})
}
}
Loading

0 comments on commit 2b41a1e

Please sign in to comment.