diff --git a/CHANGELOG.md b/CHANGELOG.md index 1202c163dfd..168aea16a09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ * [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott) * [ENHANCEMENT] Collection of query-frontend changes to reduce allocs. [#4242]https://github.com/grafana/tempo/pull/4242 (@joe-elliott) * [ENHANCEMENT] Added `insecure-skip-verify` option in tempo-cli to skip SSL certificate validation when connecting to the S3 backend. [#44236](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov) +* [ENHANCEMENT] Add `invalid_utf8` to reasons spanmetrics will discard spans. [#4293](https://github.com/grafana/tempo/pull/4293) (@zalegrala) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) * [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno) diff --git a/docs/sources/tempo/troubleshooting/metrics-generator.md b/docs/sources/tempo/troubleshooting/metrics-generator.md index c3bcd276336..f6d36f0ca67 100644 --- a/docs/sources/tempo/troubleshooting/metrics-generator.md +++ b/docs/sources/tempo/troubleshooting/metrics-generator.md @@ -57,6 +57,8 @@ If spans are regularly exceeding this value you may want to consider reviewing y Note that increasing this value allows the generator to consume more spans, but does reduce the accuracy of metrics because spans farther away from "now" are included. +Spans could also be discarded if the attributes are not valid UTF-8 characters when those attributes are converted to metric labels. + ### Max active series The generator protects itself and your remote-write target by having a maximum number of series the generator produces. diff --git a/integration/e2e/metrics_generator_test.go b/integration/e2e/metrics_generator_test.go index c660cf5a773..d6c5d5b9e42 100644 --- a/integration/e2e/metrics_generator_test.go +++ b/integration/e2e/metrics_generator_test.go @@ -130,6 +130,24 @@ func TestMetricsGenerator(t *testing.T) { }) require.NoError(t, err) + // also send one with an invalid label value + err = c.EmitBatch(context.Background(), &thrift.Batch{ + Process: &thrift.Process{ServiceName: "app"}, + Spans: []*thrift.Span{ + { + TraceIdLow: traceIDLow, + TraceIdHigh: traceIDHigh, + SpanId: r.Int63(), + ParentSpanId: parentSpanID, + OperationName: "\xff\xff", + StartTime: time.Now().UnixMicro(), + Duration: int64(2 * time.Second / time.Microsecond), + Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}}, + }, + }, + }) + require.NoError(t, err) + // Fetch metrics from Prometheus once they are received var metricFamilies map[string]*io_prometheus_client.MetricFamily for { @@ -191,8 +209,8 @@ func TestMetricsGenerator(t *testing.T) { assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_latency_sum", lbls)) // Verify metrics - assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(4), "tempo_metrics_generator_spans_received_total")) - assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_spans_discarded_total")) + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(5), "tempo_metrics_generator_spans_received_total")) + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(3), "tempo_metrics_generator_spans_discarded_total")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1000), "tempo_metrics_generator_registry_max_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_series_added_total")) diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 3056a63bf65..a3c8a0db6a3 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -61,6 +61,7 @@ var ( const ( reasonOutsideTimeRangeSlack = "outside_metrics_ingestion_slack" reasonSpanMetricsFiltered = "span_metrics_filtered" + reasonInvalidUTF8 = "invalid_utf8" ) type instance struct { @@ -290,7 +291,8 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error switch processorName { case spanmetrics.Name: filteredSpansCounter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonSpanMetricsFiltered) - newProcessor, err = spanmetrics.New(cfg.SpanMetrics, i.registry, filteredSpansCounter) + invalidUTF8Counter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonInvalidUTF8) + newProcessor, err = spanmetrics.New(cfg.SpanMetrics, i.registry, filteredSpansCounter, invalidUTF8Counter) if err != nil { return err } diff --git a/modules/generator/processor/spanmetrics/spanmetrics.go b/modules/generator/processor/spanmetrics/spanmetrics.go index 492edabd7c5..954c2ddf6e2 100644 --- a/modules/generator/processor/spanmetrics/spanmetrics.go +++ b/modules/generator/processor/spanmetrics/spanmetrics.go @@ -2,7 +2,9 @@ package spanmetrics import ( "context" + "fmt" "time" + "unicode/utf8" "github.com/prometheus/prometheus/util/strutil" "go.opentelemetry.io/otel" @@ -40,12 +42,13 @@ type Processor struct { filter *spanfilter.SpanFilter filteredSpansCounter prometheus.Counter + invalidUTF8Counter prometheus.Counter // for testing now func() time.Time } -func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counter) (gen.Processor, error) { +func New(cfg Config, reg registry.Registry, filteredSpansCounter, invalidUTF8Counter prometheus.Counter) (gen.Processor, error) { labels := make([]string, 0, 4+len(cfg.Dimensions)) if cfg.IntrinsicDimensions.Service { @@ -72,13 +75,19 @@ func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counte labels = append(labels, sanitizeLabelNameWithCollisions(m.Name)) } + err := validateLabelValues(labels) + if err != nil { + return nil, err + } + p := &Processor{ Cfg: cfg, registry: reg, spanMetricsTargetInfo: reg.NewGauge(targetInfo), now: time.Now, labels: labels, - filteredSpansCounter: spanDiscardCounter, + filteredSpansCounter: filteredSpansCounter, + invalidUTF8Counter: invalidUTF8Counter, } if cfg.Subprocessors[Latency] { @@ -96,7 +105,6 @@ func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counte return nil, err } - p.filteredSpansCounter = spanDiscardCounter p.filter = filter return p, nil } @@ -203,6 +211,12 @@ func (p *Processor) aggregateMetricsForSpan(svcName string, jobName string, inst spanMultiplier := processor_util.GetSpanMultiplier(p.Cfg.SpanMultiplierKey, span, rs) + err := validateLabelValues(labelValues) + if err != nil { + p.invalidUTF8Counter.Inc() + return + } + registryLabelValues := p.registry.NewLabelValueCombo(labels, labelValues) if p.Cfg.Subprocessors[Count] { @@ -259,6 +273,16 @@ func sanitizeLabelNameWithCollisions(name string) string { return sanitized } +func validateLabelValues(v []string) error { + for _, value := range v { + if !utf8.ValidString(value) { + return fmt.Errorf("invalid utf8 string: %s", value) + } + } + + return nil +} + func isIntrinsicDimension(name string) bool { return processor_util.Contains(name, []string{dimJob, dimSpanName, dimSpanKind, dimStatusCode, dimStatusMessage, dimInstance}) } diff --git a/modules/generator/processor/spanmetrics/spanmetrics_test.go b/modules/generator/processor/spanmetrics/spanmetrics_test.go index 6b8dc937044..325c6f81c5a 100644 --- a/modules/generator/processor/spanmetrics/spanmetrics_test.go +++ b/modules/generator/processor/spanmetrics/spanmetrics_test.go @@ -35,12 +35,13 @@ func TestSpanMetrics(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -72,13 +73,14 @@ func TestSpanMetrics(t *testing.T) { func TestSpanMetricsTargetInfoEnabled(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -110,6 +112,7 @@ func TestSpanMetrics_dimensions(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -118,7 +121,7 @@ func TestSpanMetrics_dimensions(t *testing.T) { cfg.IntrinsicDimensions.StatusMessage = true cfg.Dimensions = []string{"foo", "bar", "does-not-exist"} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -166,6 +169,7 @@ func TestSpanMetrics_collisions(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -173,7 +177,7 @@ func TestSpanMetrics_collisions(t *testing.T) { cfg.Dimensions = []string{"span.kind", "span_name"} cfg.IntrinsicDimensions.SpanKind = false - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -215,12 +219,14 @@ func TestSpanMetrics_collisions(t *testing.T) { func TestJobLabelWithNamespaceAndInstanceID(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") + cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -263,6 +269,7 @@ func TestJobLabelWithNamespaceAndInstanceID(t *testing.T) { func TestSpanMetrics_applyFilterPolicy(t *testing.T) { filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cases := []struct { filterPolicies []filterconfig.FilterPolicy @@ -353,7 +360,7 @@ func TestSpanMetrics_applyFilterPolicy(t *testing.T) { cfg.FilterPolicies = tc.filterPolicies testRegistry := registry.NewTestRegistry() - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -407,13 +414,14 @@ func TestJobLabelWithNamespaceAndNoServiceName(t *testing.T) { // but service will still be there testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -459,13 +467,14 @@ func TestJobLabelWithNamespaceAndNoServiceName(t *testing.T) { func TestLabelsWithDifferentBatches(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -537,13 +546,14 @@ func TestTargetInfoEnabled(t *testing.T) { // if the only labels are job and instance then target_info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.EnableTargetInfo = true cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -584,13 +594,14 @@ func TestTargetInfoEnabled(t *testing.T) { func TestTargetInfoDisabled(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.EnableTargetInfo = false cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -629,6 +640,7 @@ func TestTargetInfoWithExclusion(t *testing.T) { // if the only labels are job and instance then target_info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -636,7 +648,7 @@ func TestTargetInfoWithExclusion(t *testing.T) { cfg.TargetInfoExcludedDimensions = []string{"container", "container.id"} cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -690,13 +702,14 @@ func TestTargetInfoSanitizeLabelName(t *testing.T) { // if the only labels are job and instance then target_info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.EnableTargetInfo = true cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -739,13 +752,14 @@ func TestTargetInfoWithJobAndInstanceOnly(t *testing.T) { // if the only labels are job and instance then target_info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} cfg.EnableTargetInfo = true - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -773,12 +787,13 @@ func TestTargetInfoNoJobAndNoInstance(t *testing.T) { // if both job and instance are missing, target info should not exist testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -820,13 +835,14 @@ func TestTargetInfoNoJobAndNoInstance(t *testing.T) { func TestTargetInfoWithDifferentBatches(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.EnableTargetInfo = true cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -890,6 +906,7 @@ func TestTargetInfoWithDifferentBatches(t *testing.T) { func TestSpanMetricsDimensionMapping(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -904,7 +921,7 @@ func TestSpanMetricsDimensionMapping(t *testing.T) { }, } - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -949,6 +966,7 @@ func TestSpanMetricsDimensionMapping(t *testing.T) { func TestSpanMetricsDimensionMappingMissingLabels(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) @@ -976,7 +994,7 @@ func TestSpanMetricsDimensionMappingMissingLabels(t *testing.T) { }, } - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -1027,12 +1045,13 @@ func TestSpanMetricsDimensionMappingMissingLabels(t *testing.T) { func TestSpanMetricsNegativeLatency(t *testing.T) { testRegistry := registry.NewTestRegistry() filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.HistogramBuckets = []float64{0.5, 1} - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(t, err) defer p.Shutdown(context.Background()) @@ -1158,13 +1177,14 @@ func BenchmarkSpanMetrics_applyFilterPolicyMedium(b *testing.B) { func benchmarkFilterPolicy(b *testing.B, policies []filterconfig.FilterPolicy, batch *trace_v1.ResourceSpans) { filteredSpansCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "filtered") + invalidSpanLabelsCounter := metricSpansDiscarded.WithLabelValues("test-tenant", "invalid") testRegistry := registry.NewTestRegistry() cfg := Config{} cfg.RegisterFlagsAndApplyDefaults("", nil) cfg.FilterPolicies = policies - p, err := New(cfg, testRegistry, filteredSpansCounter) + p, err := New(cfg, testRegistry, filteredSpansCounter, invalidSpanLabelsCounter) require.NoError(b, err) defer p.Shutdown(context.Background()) b.ResetTimer()