Skip to content

Commit

Permalink
Exemplar fixes (#4366)
Browse files Browse the repository at this point in the history
* Fix exemplars based on duration to convert to seconds, fix various other issues

* changelog
  • Loading branch information
mdisibio authored Nov 22, 2024
1 parent 1a21818 commit 29ef5ab
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#4236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar)
* [BUGFIX] Fix traceql metrics time range handling at the cutoff between recent and backend data [#4257](https://github.com/grafana/tempo/issues/4257) (@mdisibio)
* [BUGFIX] Fix several issues with exemplar values for traceql metrics [#4366](https://github.com/grafana/tempo/pull/4366) (@mdisibio)
* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
* [BUGFIX] Gave context to orphaned spans related to various maintenance processes. [#4260](https://github.com/grafana/tempo/pull/4260) (@joe-elliott)
* [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#4259](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
Expand Down
62 changes: 37 additions & 25 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,33 +1106,23 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
var innerAgg func() VectorAggregator
var byFunc func(Span) (Static, bool)
var byFuncLabel string
var exemplarFn getExemplar

switch a.op {
case metricsAggregateCountOverTime:
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
a.simpleAggregationOp = sumAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateMinOverTime:
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, minAggregation) }
a.simpleAggregationOp = minAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateMaxOverTime:
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, maxAggregation) }
a.simpleAggregationOp = maxAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }
a.simpleAggregationOp = sumAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateHistogramOverTime, metricsAggregateQuantileOverTime:
// Histograms and quantiles are implemented as count_over_time() by(2^log2(attr)) for now
Expand All @@ -1144,16 +1134,9 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
case IntrinsicDurationAttribute:
// Optimal implementation for duration attribute
byFunc = a.bucketizeSpanDuration
exemplarFn = func(s Span) (float64, uint64) {
return float64(s.DurationNanos()), a.spanStartTimeMs(s)
}
default:
// Basic implementation for all other attributes
byFunc = a.bucketizeAttribute
exemplarFn = func(s Span) (float64, uint64) {
v, _ := FloatizeAttribute(s, a.attr)
return v, a.spanStartTimeMs(s)
}
}
}

Expand All @@ -1170,11 +1153,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
a.agg = NewGroupingAggregator(a.op.String(), func() RangeAggregator {
return NewStepAggregator(q.Start, q.End, q.Step, innerAgg)
}, a.by, byFunc, byFuncLabel)
a.exemplarFn = exemplarFn
}

func (a *MetricsAggregate) spanStartTimeMs(s Span) uint64 {
return s.StartTimeUnixNanos() / uint64(time.Millisecond)
a.exemplarFn = exemplarFnFor(a.attr)
}

func (a *MetricsAggregate) bucketizeSpanDuration(s Span) (Static, bool) {
Expand Down Expand Up @@ -1209,6 +1188,39 @@ func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) {
}
}

func exemplarFnFor(a Attribute) func(Span) (float64, uint64) {
switch a {
case IntrinsicDurationAttribute:
return exemplarDuration
case Attribute{}:
// This records exemplars without a value, and they
// are attached to the series at the end.
return exemplarNaN
default:
return exemplarAttribute(a)
}
}

func exemplarNaN(s Span) (float64, uint64) {
return math.NaN(), s.StartTimeUnixNanos() / uint64(time.Millisecond)
}

func exemplarDuration(s Span) (float64, uint64) {
v := float64(s.DurationNanos()) / float64(time.Second)
t := s.StartTimeUnixNanos() / uint64(time.Millisecond)
return v, t
}

// exemplarAttribute captures a closure around the attribute so it doesn't have to be passed along with every span.
// should be more efficient.
func exemplarAttribute(a Attribute) func(Span) (float64, uint64) {
return func(s Span) (float64, uint64) {
v, _ := FloatizeAttribute(s, a)
t := s.StartTimeUnixNanos() / uint64(time.Millisecond)
return v, t
}
}

func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) {
// Currently all metrics are summed by job to produce
// intermediate results. This will change when adding min/max/topk/etc
Expand Down
19 changes: 13 additions & 6 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -1053,7 +1054,7 @@ func (e *MetricsEvalulator) Do(ctx context.Context, f SpansetFetcher, fetcherSta
}

if len(ss.Spans) > 0 && e.sampleExemplar(ss.TraceID) {
e.metricsPipeline.observeExemplar(ss.Spans[0]) // Randomly sample the first span
e.metricsPipeline.observeExemplar(ss.Spans[rand.Intn(len(ss.Spans))])
}

e.mtx.Unlock()
Expand Down Expand Up @@ -1232,19 +1233,25 @@ func (b *SimpleAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing *
Value: StaticFromAnyValue(l.Value),
})
}
value := exemplar.Value
if math.IsNaN(value) {
value = 0 // TODO: Use the value of the series at the same timestamp
}
existing.Exemplars = append(existing.Exemplars, Exemplar{
Labels: labels,
Value: value,
Value: exemplar.Value,
TimestampMs: uint64(exemplar.TimestampMs),
})
}
}

func (b *SimpleAggregator) Results() SeriesSet {
// Attach placeholder exemplars to the output
for _, ts := range b.ss {
for i, e := range ts.Exemplars {
if math.IsNaN(e.Value) {
interval := IntervalOfMs(int64(e.TimestampMs), b.start, b.end, b.step)
ts.Exemplars[i].Value = ts.Values[interval]
}
}
}

return b.ss
}

Expand Down
19 changes: 6 additions & 13 deletions pkg/traceql/engine_metrics_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ func newAverageOverTimeMetricsAggregator(attr Attribute, by []Attribute) *averag
}

func (a *averageOverTimeAggregator) init(q *tempopb.QueryRangeRequest, mode AggregateMode) {
exemplarFn := func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

a.seriesAgg = &averageOverTimeSeriesAggregator{
weightedAverageSeries: make(map[string]averageSeries),
weightedAverageSeries: make(map[string]*averageSeries),
len: IntervalCount(q.Start, q.End, q.Step),
start: q.Start,
end: q.End,
Expand All @@ -50,8 +46,8 @@ func (a *averageOverTimeAggregator) init(q *tempopb.QueryRangeRequest, mode Aggr
a.agg = newAvgOverTimeSpanAggregator(a.attr, a.by, q.Start, q.End, q.Step)
}

a.exemplarFn = exemplarFn
a.mode = mode
a.exemplarFn = exemplarFnFor(a.attr)
}

func (a *averageOverTimeAggregator) observe(span Span) {
Expand Down Expand Up @@ -110,10 +106,6 @@ func (a *averageOverTimeAggregator) validate() error {
return nil
}

func (a *averageOverTimeAggregator) spanStartTimeMs(s Span) uint64 {
return s.StartTimeUnixNanos() / uint64(time.Millisecond)
}

func (a *averageOverTimeAggregator) String() string {
s := strings.Builder{}

Expand All @@ -138,7 +130,7 @@ func (a *averageOverTimeAggregator) String() string {
}

type averageOverTimeSeriesAggregator struct {
weightedAverageSeries map[string]averageSeries
weightedAverageSeries map[string]*averageSeries
len int
start, end, step uint64
exemplarBuckets *bucketSet
Expand Down Expand Up @@ -279,7 +271,8 @@ func (b *averageOverTimeSeriesAggregator) Combine(in []*tempopb.TimeSeries) {
countPosMapper[avgSeriesPromLabel] = i
} else if !ok {
promLabels := getLabels(ts.Labels, "")
b.weightedAverageSeries[ts.PromLabels] = newAverageSeries(b.len, len(ts.Exemplars), promLabels)
s := newAverageSeries(b.len, len(ts.Exemplars), promLabels)
b.weightedAverageSeries[ts.PromLabels] = &s
}
}
for _, ts := range in {
Expand All @@ -302,7 +295,7 @@ func (b *averageOverTimeSeriesAggregator) Combine(in []*tempopb.TimeSeries) {
}
}

func (b *averageOverTimeSeriesAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing averageSeries) {
func (b *averageOverTimeSeriesAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing *averageSeries) {
for _, exemplar := range ts.Exemplars {
if b.exemplarBuckets.testTotal() {
break
Expand Down

0 comments on commit 29ef5ab

Please sign in to comment.