diff --git a/config/config.go b/config/config.go index b7c11a95ff8c4..097fff385f531 100644 --- a/config/config.go +++ b/config/config.go @@ -1388,6 +1388,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error) c.getFieldString(tbl, "template", &sc.Template) c.getFieldStringSlice(tbl, "templates", &sc.Templates) c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format) + c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar) c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes) c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields) @@ -1449,9 +1450,9 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, func (c *Config) missingTomlField(_ reflect.Type, key string) error { switch key { - case "alias", "carbon2_format", "collectd_auth_file", "collectd_parse_multivalue", - "collectd_security_level", "collectd_typesdb", "collection_jitter", "csv_column_names", - "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count", + case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file", + "collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter", + "csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count", "csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns", "csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values", "data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path", diff --git a/plugins/outputs/sumologic/sumologic_test.go b/plugins/outputs/sumologic/sumologic_test.go index d6fe2731fcd3e..b7fc917b43368 100644 --- a/plugins/outputs/sumologic/sumologic_test.go +++ b/plugins/outputs/sumologic/sumologic_test.go @@ -96,7 +96,7 @@ func TestMethod(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar) require.NoError(t, err) plugin := tt.plugin() @@ -173,7 +173,7 @@ func TestStatusCode(t *testing.T) { w.WriteHeader(tt.statusCode) }) - serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar) require.NoError(t, err) tt.plugin.SetSerializer(serializer) @@ -199,7 +199,7 @@ func TestContentType(t *testing.T) { s.headers = map[string]string{ contentTypeHeader: carbon2ContentType, } - sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) + sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar) require.NoError(t, err) s.SetSerializer(sr) return s @@ -213,7 +213,7 @@ func TestContentType(t *testing.T) { s.headers = map[string]string{ contentTypeHeader: carbon2ContentType, } - sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField)) + sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField), carbon2.DefaultSanitizeReplaceChar) require.NoError(t, err) s.SetSerializer(sr) return s @@ -310,7 +310,7 @@ func TestContentEncodingGzip(t *testing.T) { w.WriteHeader(http.StatusNoContent) }) - serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar) require.NoError(t, err) plugin := tt.plugin() @@ -345,7 +345,7 @@ func TestDefaultUserAgent(t *testing.T) { MaxRequstBodySize: Default().MaxRequstBodySize, } - serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar) require.NoError(t, err) plugin.SetSerializer(serializer) @@ -594,7 +594,7 @@ func TestMaxRequestBodySize(t *testing.T) { w.WriteHeader(http.StatusOK) }) - serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar) require.NoError(t, err) plugin := tt.plugin() @@ -626,7 +626,7 @@ func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) { plugin := Default() plugin.URL = u.String() - serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate)) + serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar) require.NoError(t, err) plugin.SetSerializer(serializer) diff --git a/plugins/serializers/carbon2/README.md b/plugins/serializers/carbon2/README.md index e32a420aec0af..3ad54a1699d3a 100644 --- a/plugins/serializers/carbon2/README.md +++ b/plugins/serializers/carbon2/README.md @@ -21,6 +21,11 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f ## * "metric_includes_field" ## * "" - defaults to "field_separate" # carbon2_format = "field_separate" + + ## Character used for replacing sanitized characters. By default ":" is used. + ## The following character set is being replaced with sanitize replace char: + ## !@#$%^&*()+`'\"[]{};<>,?/\\|= + # carbon2_sanitize_replace_char = ":" ``` Standard form: @@ -52,6 +57,17 @@ metric=name_field_2 host=foo 4 1234567890 metric=name_field_N host=foo 59 1234567890 ``` +### Metric name sanitization + +In order to sanitize the metric name one can specify `carbon2_sanitize_replace_char` +in order to replace the following characters in the metric name: + +``` +!@#$%^&*()+`'\"[]{};<>,?/\\|= +``` + +By default they will be replaced with `:`. + ## Metrics The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. diff --git a/plugins/serializers/carbon2/carbon2.go b/plugins/serializers/carbon2/carbon2.go index 1b05d4cb2d4c7..4eb5798d64a69 100644 --- a/plugins/serializers/carbon2/carbon2.go +++ b/plugins/serializers/carbon2/carbon2.go @@ -2,6 +2,7 @@ package carbon2 import ( "bytes" + "errors" "fmt" "strconv" "strings" @@ -23,11 +24,23 @@ var formats = map[format]struct{}{ Carbon2FormatMetricIncludesField: {}, } +const ( + DefaultSanitizeReplaceChar = ":" + sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|=" +) + type Serializer struct { - metricsFormat format + metricsFormat format + sanitizeReplacer *strings.Replacer } -func NewSerializer(metricsFormat string) (*Serializer, error) { +func NewSerializer(metricsFormat string, sanitizeReplaceChar string) (*Serializer, error) { + if sanitizeReplaceChar == "" { + sanitizeReplaceChar = DefaultSanitizeReplaceChar + } else if len(sanitizeReplaceChar) > 1 { + return nil, errors.New("sanitize replace char has to be a singular character") + } + var f = format(metricsFormat) if _, ok := formats[f]; !ok { @@ -40,7 +53,8 @@ func NewSerializer(metricsFormat string) (*Serializer, error) { } return &Serializer{ - metricsFormat: f, + metricsFormat: f, + sanitizeReplacer: createSanitizeReplacer(sanitizedChars, rune(sanitizeReplaceChar[0])), }, nil } @@ -65,15 +79,17 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte { continue } + name := s.sanitizeReplacer.Replace(metric.Name()) + switch metricsFormat { case Carbon2FormatFieldSeparate: m.WriteString(serializeMetricFieldSeparate( - metric.Name(), fieldName, + name, fieldName, )) case Carbon2FormatMetricIncludesField: m.WriteString(serializeMetricIncludeField( - metric.Name(), fieldName, + name, fieldName, )) } @@ -152,3 +168,13 @@ func bool2int(b bool) int { } return i } + +// createSanitizeReplacer creates string replacer replacing all provided +// characters with the replaceChar. +func createSanitizeReplacer(sanitizedChars string, replaceChar rune) *strings.Replacer { + sanitizeCharPairs := make([]string, 0, 2*len(sanitizedChars)) + for _, c := range sanitizedChars { + sanitizeCharPairs = append(sanitizeCharPairs, string(c), string(replaceChar)) + } + return strings.NewReplacer(sanitizeCharPairs...) +} diff --git a/plugins/serializers/carbon2/carbon2_test.go b/plugins/serializers/carbon2/carbon2_test.go index 1d6359858dd9e..4afc0932cc7ba 100644 --- a/plugins/serializers/carbon2/carbon2_test.go +++ b/plugins/serializers/carbon2/carbon2_test.go @@ -46,7 +46,7 @@ func TestSerializeMetricFloat(t *testing.T) { for _, tc := range testcases { t.Run(string(tc.format), func(t *testing.T) { - s, err := NewSerializer(string(tc.format)) + s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar) require.NoError(t, err) buf, err := s.Serialize(m) @@ -84,7 +84,7 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) { for _, tc := range testcases { t.Run(string(tc.format), func(t *testing.T) { - s, err := NewSerializer(string(tc.format)) + s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar) require.NoError(t, err) buf, err := s.Serialize(m) @@ -122,7 +122,7 @@ func TestSerializeWithSpaces(t *testing.T) { for _, tc := range testcases { t.Run(string(tc.format), func(t *testing.T) { - s, err := NewSerializer(string(tc.format)) + s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar) require.NoError(t, err) buf, err := s.Serialize(m) @@ -160,7 +160,7 @@ func TestSerializeMetricInt(t *testing.T) { for _, tc := range testcases { t.Run(string(tc.format), func(t *testing.T) { - s, err := NewSerializer(string(tc.format)) + s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar) require.NoError(t, err) buf, err := s.Serialize(m) @@ -198,7 +198,7 @@ func TestSerializeMetricString(t *testing.T) { for _, tc := range testcases { t.Run(string(tc.format), func(t *testing.T) { - s, err := NewSerializer(string(tc.format)) + s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar) require.NoError(t, err) buf, err := s.Serialize(m) @@ -255,7 +255,7 @@ func TestSerializeMetricBool(t *testing.T) { for _, tc := range testcases { t.Run(string(tc.format), func(t *testing.T) { - s, err := NewSerializer(string(tc.format)) + s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar) require.NoError(t, err) buf, err := s.Serialize(tc.metric) @@ -300,7 +300,7 @@ metric=cpu_value 42 0 for _, tc := range testcases { t.Run(string(tc.format), func(t *testing.T) { - s, err := NewSerializer(string(tc.format)) + s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar) require.NoError(t, err) buf, err := s.SerializeBatch(metrics) @@ -310,3 +310,113 @@ metric=cpu_value 42 0 }) } } + +func TestSerializeMetricIsProperlySanitized(t *testing.T) { + now := time.Now() + + testcases := []struct { + metricFunc func() (telegraf.Metric, error) + format format + expected string + replaceChar string + expectedErr bool + }{ + { + metricFunc: func() (telegraf.Metric, error) { + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + return metric.New("cpu=1", nil, fields, now) + }, + format: Carbon2FormatFieldSeparate, + expected: fmt.Sprintf("metric=cpu:1 field=usage_idle 91.5 %d\n", now.Unix()), + replaceChar: DefaultSanitizeReplaceChar, + }, + { + metricFunc: func() (telegraf.Metric, error) { + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + return metric.New("cpu=1", nil, fields, now) + }, + format: Carbon2FormatFieldSeparate, + expected: fmt.Sprintf("metric=cpu_1 field=usage_idle 91.5 %d\n", now.Unix()), + replaceChar: "_", + }, + { + metricFunc: func() (telegraf.Metric, error) { + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + return metric.New("cpu=1=tmp$custom", nil, fields, now) + }, + format: Carbon2FormatFieldSeparate, + expected: fmt.Sprintf("metric=cpu:1:tmp:custom field=usage_idle 91.5 %d\n", now.Unix()), + replaceChar: DefaultSanitizeReplaceChar, + }, + { + metricFunc: func() (telegraf.Metric, error) { + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now) + }, + format: Carbon2FormatFieldSeparate, + expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace field=usage_idle 91.5 %d\n", now.Unix()), + replaceChar: DefaultSanitizeReplaceChar, + }, + { + metricFunc: func() (telegraf.Metric, error) { + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now) + }, + format: Carbon2FormatMetricIncludesField, + expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace_usage_idle 91.5 %d\n", now.Unix()), + replaceChar: DefaultSanitizeReplaceChar, + }, + { + metricFunc: func() (telegraf.Metric, error) { + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now) + }, + format: Carbon2FormatMetricIncludesField, + expected: fmt.Sprintf("metric=cpu_1_tmp_custom_namespace_usage_idle 91.5 %d\n", now.Unix()), + replaceChar: "_", + }, + { + metricFunc: func() (telegraf.Metric, error) { + fields := map[string]interface{}{ + "usage_idle": float64(91.5), + } + return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now) + }, + format: Carbon2FormatMetricIncludesField, + expectedErr: true, + replaceChar: "___", + }, + } + + for _, tc := range testcases { + t.Run(string(tc.format), func(t *testing.T) { + m, err := tc.metricFunc() + require.NoError(t, err) + + s, err := NewSerializer(string(tc.format), tc.replaceChar) + if tc.expectedErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + + buf, err := s.Serialize(m) + require.NoError(t, err) + + assert.Equal(t, tc.expected, string(buf)) + }) + } +} diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index f6c62fc12cbda..247324d4ab4f5 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -53,6 +53,9 @@ type Config struct { // Carbon2 metric format. Carbon2Format string `toml:"carbon2_format"` + // Character used for metric name sanitization in Carbon2. + Carbon2SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"` + // Support tags in graphite protocol GraphiteTagSupport bool `toml:"graphite_tag_support"` @@ -123,7 +126,7 @@ func NewSerializer(config *Config) (Serializer, error) { case "nowmetric": serializer, err = NewNowSerializer() case "carbon2": - serializer, err = NewCarbon2Serializer(config.Carbon2Format) + serializer, err = NewCarbon2Serializer(config.Carbon2Format, config.Carbon2SanitizeReplaceChar) case "wavefront": serializer, err = NewWavefrontSerializer(config.Prefix, config.WavefrontUseStrict, config.WavefrontSourceOverride) case "prometheus": @@ -186,8 +189,8 @@ func NewJSONSerializer(timestampUnits time.Duration) (Serializer, error) { return json.NewSerializer(timestampUnits) } -func NewCarbon2Serializer(carbon2format string) (Serializer, error) { - return carbon2.NewSerializer(carbon2format) +func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) { + return carbon2.NewSerializer(carbon2format, carbon2SanitizeReplaceChar) } func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool) (Serializer, error) {