Skip to content

Commit

Permalink
Add invalid_utf8 to reasons spans could be rejected
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Nov 6, 2024
1 parent 3449ef6 commit a854b4a
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 25 deletions.
22 changes: 20 additions & 2 deletions integration/e2e/metrics_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,24 @@ func TestMetricsGenerator(t *testing.T) {
})
require.NoError(t, err)

// also send one with an invalid label value
err = c.EmitBatch(context.Background(), &thrift.Batch{
Process: &thrift.Process{ServiceName: "app"},
Spans: []*thrift.Span{
{
TraceIdLow: traceIDLow,
TraceIdHigh: traceIDHigh,
SpanId: r.Int63(),
ParentSpanId: parentSpanID,
OperationName: "\xff\xff",
StartTime: time.Now().Add(10 * 24 * time.Hour).UnixMicro(),
Duration: int64(1 * time.Second / time.Microsecond),
Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}},
},
},
})
require.NoError(t, err)

// Fetch metrics from Prometheus once they are received
var metricFamilies map[string]*io_prometheus_client.MetricFamily
for {
Expand Down Expand Up @@ -191,8 +209,8 @@ func TestMetricsGenerator(t *testing.T) {
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_latency_sum", lbls))

// Verify metrics
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(4), "tempo_metrics_generator_spans_received_total"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_spans_discarded_total"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(5), "tempo_metrics_generator_spans_received_total"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(3), "tempo_metrics_generator_spans_discarded_total"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_active_series"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1000), "tempo_metrics_generator_registry_max_active_series"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_series_added_total"))
Expand Down
4 changes: 3 additions & 1 deletion modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
const (
reasonOutsideTimeRangeSlack = "outside_metrics_ingestion_slack"
reasonSpanMetricsFiltered = "span_metrics_filtered"
reasonInvalidUTF8 = "invalid_utf8"
)

type instance struct {
Expand Down Expand Up @@ -290,7 +291,8 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error
switch processorName {
case spanmetrics.Name:
filteredSpansCounter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonSpanMetricsFiltered)
newProcessor, err = spanmetrics.New(cfg.SpanMetrics, i.registry, filteredSpansCounter)
invalidUTF8Counter := metricSpansDiscarded.WithLabelValues(i.instanceID, reasonInvalidUTF8)
newProcessor, err = spanmetrics.New(cfg.SpanMetrics, i.registry, filteredSpansCounter, invalidUTF8Counter)
if err != nil {
return err
}
Expand Down
30 changes: 27 additions & 3 deletions modules/generator/processor/spanmetrics/spanmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package spanmetrics

import (
"context"
"fmt"
"time"
"unicode/utf8"

"github.com/prometheus/prometheus/util/strutil"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -40,12 +42,13 @@ type Processor struct {

filter *spanfilter.SpanFilter
filteredSpansCounter prometheus.Counter
invalidUTF8Counter prometheus.Counter

// for testing
now func() time.Time
}

func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counter) (gen.Processor, error) {
func New(cfg Config, reg registry.Registry, filteredSpansCounter, invalidUTF8Counter prometheus.Counter) (gen.Processor, error) {
labels := make([]string, 0, 4+len(cfg.Dimensions))

if cfg.IntrinsicDimensions.Service {
Expand All @@ -72,13 +75,19 @@ func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counte
labels = append(labels, sanitizeLabelNameWithCollisions(m.Name))
}

err := validateLabelValues(labels)
if err != nil {
return nil, err
}

p := &Processor{
Cfg: cfg,
registry: reg,
spanMetricsTargetInfo: reg.NewGauge(targetInfo),
now: time.Now,
labels: labels,
filteredSpansCounter: spanDiscardCounter,
filteredSpansCounter: filteredSpansCounter,
invalidUTF8Counter: invalidUTF8Counter,
}

if cfg.Subprocessors[Latency] {
Expand All @@ -96,7 +105,6 @@ func New(cfg Config, reg registry.Registry, spanDiscardCounter prometheus.Counte
return nil, err
}

p.filteredSpansCounter = spanDiscardCounter
p.filter = filter
return p, nil
}
Expand Down Expand Up @@ -203,6 +211,12 @@ func (p *Processor) aggregateMetricsForSpan(svcName string, jobName string, inst

spanMultiplier := processor_util.GetSpanMultiplier(p.Cfg.SpanMultiplierKey, span, rs)

err := validateLabelValues(labelValues)
if err != nil {
p.invalidUTF8Counter.Inc()
return
}

registryLabelValues := p.registry.NewLabelValueCombo(labels, labelValues)

if p.Cfg.Subprocessors[Count] {
Expand Down Expand Up @@ -259,6 +273,16 @@ func sanitizeLabelNameWithCollisions(name string) string {
return sanitized
}

func validateLabelValues(v []string) error {
for _, value := range v {
if !utf8.ValidString(value) {
return fmt.Errorf("invalid utf8 string: %s", value)
}
}

return nil
}

func isIntrinsicDimension(name string) bool {
return processor_util.Contains(name, []string{dimJob, dimSpanName, dimSpanKind, dimStatusCode, dimStatusMessage, dimInstance})
}
Loading

0 comments on commit a854b4a

Please sign in to comment.