Skip to content

Commit

Permalink
[Frontend] Two fixes for gRPC query range streaming (#4576)
Browse files Browse the repository at this point in the history
* Revert "Bugfix: Default step for gRPC streaming query range queries (#4546)"

This reverts commit 424274a.

* default step if not set

Signed-off-by: Joe Elliott <[email protected]>

* fix dumping of output to json

Signed-off-by: Joe Elliott <[email protected]>

* attachExemplars in gRPC diff

Signed-off-by: Joe Elliott <[email protected]>

* added test and bench

Signed-off-by: Joe Elliott <[email protected]>

* remove step

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* fix default step

Signed-off-by: Joe Elliott <[email protected]>

* lint

Signed-off-by: Joe Elliott <[email protected]>

* corrected proto import

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Jan 17, 2025
1 parent 89b9f7e commit 9acc16d
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 13 deletions.
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio)
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4546) (@joe-elliott)
Fix an issue where the tempo-cli was not correctly dumping exemplar results.
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4576) (@joe-elliott)
Correctly copy exemplars for metrics like `| rate()` when gRPC streaming.
* [BUGFIX] Fix performance bottleneck and file cleanup in block builder [#4550](https://github.com/grafana/tempo/pull/4550) (@mdisibio)
* [BUGFIX] TraceQL results caching bug for floats ending in .0 [#4539](https://github.com/grafana/tempo/pull/4539) (@carles-grafana)


# v2.7.0

* [CHANGE] Disable gRPC compression in the querier and distributor for performance reasons [#4429](https://github.com/grafana/tempo/pull/4429) (@carles-grafana)
Expand Down
5 changes: 2 additions & 3 deletions cmd/tempo-cli/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id backen
}, nil
}

func printAsJSON(value proto.Message) error {
func printAsJSON(pb proto.Message) error {
m := jsonpb.Marshaler{}

traceJSON, err := m.MarshalToString(value)
traceJSON, err := m.MarshalToString(pb)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions modules/frontend/combiner/metrics_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e
}
sortResponse(resp)
attachExemplars(req, resp)

return resp, nil
},
diff: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) {
Expand All @@ -53,6 +54,8 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e
resp = &tempopb.QueryRangeResponse{}
}
sortResponse(resp)
attachExemplars(req, resp)

return resp, nil
},
}
Expand Down
123 changes: 123 additions & 0 deletions modules/frontend/combiner/metrics_query_range_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package combiner

import (
"math"
"math/rand/v2"
"testing"
"time"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
"github.com/stretchr/testify/require"
)

func TestAttachExemplars(t *testing.T) {
start := uint64(10 * time.Second)
end := uint64(20 * time.Second)
step := traceql.DefaultQueryRangeStep(start, end)

req := &tempopb.QueryRangeRequest{
Start: start,
End: end,
Step: step,
}

tcs := []struct {
name string
include func(i int) bool
}{
{
name: "include all",
include: func(_ int) bool { return true },
},
{
name: "include none",
include: func(_ int) bool { return false },
},
{
name: "include every other",
include: func(i int) bool { return i%2 == 0 },
},
{
name: "include rando",
include: func(_ int) bool { return rand.Int()%2 == 0 },
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
resp, expectedSeries := buildSeriesForExemplarTest(start, end, step, tc.include)

attachExemplars(req, resp)
require.Equal(t, expectedSeries, resp.Series)
})
}
}

func BenchmarkAttachExemplars(b *testing.B) {
start := uint64(1 * time.Second)
end := uint64(10000 * time.Second)
step := uint64(time.Second)

req := &tempopb.QueryRangeRequest{
Start: start,
End: end,
Step: step,
}

resp, _ := buildSeriesForExemplarTest(start, end, step, func(_ int) bool { return true })

b.ResetTimer()
for i := 0; i < b.N; i++ {
attachExemplars(req, resp)
}
}

func buildSeriesForExemplarTest(start, end, step uint64, include func(i int) bool) (*tempopb.QueryRangeResponse, []*tempopb.TimeSeries) {
resp := &tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
{},
},
}

expectedSeries := []*tempopb.TimeSeries{
{},
}

// populate series and expected series based on step
idx := 0
for i := start; i < end; i += step {
idx++
tsMS := int64(i / uint64(time.Millisecond))
val := float64(idx)

sample := tempopb.Sample{
TimestampMs: tsMS,
Value: val,
}
nanExemplar := tempopb.Exemplar{
TimestampMs: tsMS,
Value: math.NaN(),
}
valExamplar := tempopb.Exemplar{
TimestampMs: tsMS,
Value: val,
}

includeExemplar := include(idx)

// copy the sample and nan exemplar into the response. the nan exemplar should be overwritten
resp.Series[0].Samples = append(resp.Series[0].Samples, sample)
if includeExemplar {
resp.Series[0].Exemplars = append(resp.Series[0].Exemplars, nanExemplar)
}

// copy the sample and val exemplar into the expected response
expectedSeries[0].Samples = append(expectedSeries[0].Samples, sample)
if includeExemplar {
expectedSeries[0].Exemplars = append(expectedSeries[0].Exemplars, valExamplar)
}
}

return resp, expectedSeries
}
6 changes: 6 additions & 0 deletions modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)

// newQueryRangeStreamingGRPCHandler returns a handler that streams results from the HTTP handler
Expand All @@ -29,6 +30,11 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp

headers := headersFromGrpcContext(ctx)

// default step if not set
if req.Step == 0 {
req.Step = traceql.DefaultQueryRangeStep(req.Start, req.End)
}

httpReq := api.BuildQueryRangeRequest(&http.Request{
URL: &url.URL{Path: downstreamPath},
Header: headers,
Expand Down
10 changes: 3 additions & 7 deletions pkg/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,16 +456,12 @@ func BuildQueryRangeRequest(req *http.Request, searchReq *tempopb.QueryRangeRequ
return req
}

// 0 is an invalid step, so we need to calculate it if it's not provided
step := searchReq.Step
if step == 0 {
step = traceql.DefaultQueryRangeStep(searchReq.Start, searchReq.End)
}

qb := newQueryBuilder("")
qb.addParam(urlParamStart, strconv.FormatUint(searchReq.Start, 10))
qb.addParam(urlParamEnd, strconv.FormatUint(searchReq.End, 10))
qb.addParam(urlParamStep, time.Duration(step).String())
if searchReq.Step != 0 { // if step != 0 leave the param out and Tempo will calculate it
qb.addParam(urlParamStep, time.Duration(searchReq.Step).String())
}
qb.addParam(QueryModeKey, searchReq.QueryMode)
// New RF1 params
qb.addParam(urlParamBlockID, searchReq.BlockID)
Expand Down

0 comments on commit 9acc16d

Please sign in to comment.