Skip to content

Commit

Permalink
add exemplar to baseline and selection series
Browse files Browse the repository at this point in the history
  • Loading branch information
javiermolinar committed Nov 6, 2024
1 parent e717218 commit 0800d12
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 21 deletions.
25 changes: 19 additions & 6 deletions pkg/traceql/engine_metrics_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,24 +262,37 @@ func (m *MetricsCompare) result() SeriesSet {
addTotals(internalLabelTypeSelectionTotal, m.selectionTotals)

// Add exemplars
addExemplar := func(prefix Label, e Exemplar) {
addExemplar := func(prefix Label, e Exemplar, usedExemplars map[string]bool) {
for _, l := range e.Labels {
seriesLabels := Labels{
prefix,
{Name: l.Name},
{Name: l.Name, Value: l.Value},
}
if ts, ok := ss[seriesLabels.String()]; ok {
seriesLabelsProm := seriesLabels.String()
usedExemplarKey := fmt.Sprintf("%s-%d", seriesLabelsProm, e.TimestampMs)

if usedExemplars[usedExemplarKey] {
continue
}
usedExemplars[usedExemplarKey] = true

if ts, ok := ss[seriesLabelsProm]; ok {
ts.Exemplars = append(ts.Exemplars, e)
ss[seriesLabels.String()] = ts
ss[seriesLabelsProm] = ts
}
}
}

alreadyUsedExemplars := make(map[string]bool, len(m.baselineExemplars)+len(m.selectionExemplars))

for _, e := range m.baselineExemplars {
addExemplar(internalLabelTypeBaselineTotal, e)
addExemplar(internalLabelTypeBaseline, e, alreadyUsedExemplars)
}

clear(alreadyUsedExemplars)

for _, e := range m.selectionExemplars {
addExemplar(internalLabelTypeSelectionTotal, e)
addExemplar(internalLabelTypeSelection, e, alreadyUsedExemplars)
}

return ss
Expand Down
82 changes: 67 additions & 15 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestQuantileOverTime(t *testing.T) {
},
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)
require.Equal(t, out, result)
}

Expand Down Expand Up @@ -470,7 +470,7 @@ func TestCountOverTime(t *testing.T) {
},
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)
require.Equal(t, out, result)
}

Expand Down Expand Up @@ -498,7 +498,7 @@ func TestMinOverTimeForDuration(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)

fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
Expand Down Expand Up @@ -539,7 +539,7 @@ func TestMinOverTimeWithNoMatch(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)

// Test that empty timeseries are not included
ts := result.ToProto(req)
Expand Down Expand Up @@ -587,7 +587,7 @@ func TestMinOverTimeForSpanAttribute(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 400).WithDuration(512),
}

result := runTraceQLMetric(t, req, in, in2)
result := runTraceQLMetric(t, 0, req, in, in2)

fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
Expand Down Expand Up @@ -641,7 +641,7 @@ func TestAvgOverTimeForDuration(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(300),
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)

fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
Expand Down Expand Up @@ -680,7 +680,7 @@ func TestAvgOverTimeForDurationWithoutAggregation(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "bar").WithDuration(300),
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)
avg := result[`{__name__="avg_over_time"}`]

assert.Equal(t, 100., avg.Values[0]*float64(time.Second))
Expand Down Expand Up @@ -727,7 +727,7 @@ func TestAvgOverTimeForSpanAttribute(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 200).WithDuration(512),
}

result := runTraceQLMetric(t, req, in, in2)
result := runTraceQLMetric(t, 0, req, in, in2)

fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
Expand Down Expand Up @@ -757,6 +757,55 @@ func TestAvgOverTimeForSpanAttribute(t *testing.T) {
}
}

func TestCompareForSpanAttribute(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
Query: "{span.http.status_code = 200} | compare({span.http.status_code >= 300}) with (exemplars=true)",
}

// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 404).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(8),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 200).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 400).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 300).WithDuration(512),
}

in2 := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(64),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithSpanInt("http.status_code", 200).WithDuration(8),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 200).WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 200).WithDuration(1024),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 200).WithDuration(512),
}

result := runTraceQLMetric(t, 100, req, in, in2)
exemplarCount := 0
for _, s := range result {
exemplarCount += len(s.Exemplars)
}

assert.NotNil(t, result)
assert.GreaterOrEqual(t, exemplarCount, 1)
}

func TestAvgOverTimeWithNoMatch(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
Expand All @@ -781,7 +830,7 @@ func TestAvgOverTimeWithNoMatch(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)

// Test that empty timeseries are not included
ts := result.ToProto(req)
Expand Down Expand Up @@ -888,7 +937,7 @@ func TestMaxOverTimeForDuration(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)

fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
Expand Down Expand Up @@ -929,7 +978,7 @@ func TestMaxOverTimeWithNoMatch(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 500).WithDuration(512),
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)

// Test that empty timeseries are not included
ts := result.ToProto(req)
Expand Down Expand Up @@ -977,7 +1026,7 @@ func TestMaxOverTimeForSpanAttribute(t *testing.T) {
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithSpanInt("http.status_code", 400).WithDuration(512),
}

result := runTraceQLMetric(t, req, in, in2)
result := runTraceQLMetric(t, 0, req, in, in2)

fooBaz := result[`{span.foo="baz"}`]
fooBar := result[`{span.foo="bar"}`]
Expand Down Expand Up @@ -1074,11 +1123,11 @@ func TestHistogramOverTime(t *testing.T) {
},
}

result := runTraceQLMetric(t, req, in)
result := runTraceQLMetric(t, 0, req, in)
require.Equal(t, out, result)
}

func runTraceQLMetric(t *testing.T, req *tempopb.QueryRangeRequest, inSpans ...[]Span) SeriesSet {
func runTraceQLMetric(t *testing.T, maxExemplars int, req *tempopb.QueryRangeRequest, inSpans ...[]Span) SeriesSet {
e := NewEngine()

layer2, err := e.CompileMetricsQueryRangeNonRaw(req, AggregateModeSum)
Expand All @@ -1088,10 +1137,13 @@ func runTraceQLMetric(t *testing.T, req *tempopb.QueryRangeRequest, inSpans ...[
require.NoError(t, err)

for _, spanSet := range inSpans {
layer1, err := e.CompileMetricsQueryRange(req, 0, 0, false)
layer1, err := e.CompileMetricsQueryRange(req, maxExemplars, 0, false)
require.NoError(t, err)
for _, s := range spanSet {
layer1.metricsPipeline.observe(s)
if maxExemplars > 0 {
layer1.metricsPipeline.observeExemplar(s)
}
}
res := layer1.Results()
// Pass layer 1 to layer 2
Expand Down

0 comments on commit 0800d12

Please sign in to comment.