Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TraceQL query hint to retrieve most recent results ordered by trace start time #4238

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero)
* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [ENHANCEMENT] Added most_recent=true query hint to TraceQL to return most recent results. [#4238](https://github.com/grafana/tempo/pull/4238) (@joe-elliott)
* [ENHANCEMENT] chore: remove gofakeit dependency [#4274](https://github.com/grafana/tempo/pull/4274) (@javiermolinar)
* [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott)
* [ENHANCEMENT] Collection of query-frontend changes to reduce allocs. [#4242]https://github.com/grafana/tempo/pull/4242 (@joe-elliott)
Expand Down
8 changes: 6 additions & 2 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,12 @@ query_frontend:
[throughput_bytes_slo: <float> | default = 0 ]

# The number of shards to break ingester queries into.
[ingester_shards]: <int> | default = 1]

[ingester_shards: <int> | default = 1]

# The number of time windows to break a search up into when doing a most recent TraceQL search. This only impacts TraceQL
# searches with (most_recent=true)
[most_recent_shards: <int> | default = 200]

# SLO configuration for Metadata (tags and tag values) endpoints.
metadata_slo:
# If set to a non-zero value, it's value will be used to decide if metadata query is within SLO or not.
Expand Down
9 changes: 9 additions & 0 deletions docs/sources/tempo/traceql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,15 @@ TraceQL can select arbitrary fields from spans. This is particularly performant
{ status=error } | select(span.http.status_code, span.http.url)
```

## Retrieving most recent results (experimental)

The TraceQL query hint `most_recent=true` can be used with any TraceQL selection query to force Tempo to return the most recent results ordered by time. Examples:

```
{} with (most_recent=true)
{ span.foo = "bar" } >> { status = error } with (most_recent=true)
```

## Experimental TraceQL metrics

TraceQL metrics are experimental, but easy to get started with. Refer to [the TraceQL metrics]({{< relref "../operations/traceql-metrics.md" >}}) documentation for more information.
Expand Down
13 changes: 13 additions & 0 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type TResponse interface {
type PipelineResponse interface {
HTTPResponse() *http.Response
RequestData() any

IsMetadata() bool // todo: search and query range pass back metadata responses through a normal http response. update to use this instead.
}

type genericCombiner[T TResponse] struct {
Expand All @@ -32,6 +34,7 @@ type genericCombiner[T TResponse] struct {

new func() T
combine func(partial T, final T, resp PipelineResponse) error
metadata func(resp PipelineResponse, final T) error
finalize func(T) (T, error)
diff func(T) (T, error) // currently only implemented by the search combiner. required for streaming
quit func(T) bool
Expand All @@ -51,6 +54,16 @@ func initHTTPCombiner[T TResponse](c *genericCombiner[T], marshalingFormat strin

// AddResponse is used to add a http response to the combiner.
func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {
if r.IsMetadata() && c.metadata != nil {
c.mu.Lock()
defer c.mu.Unlock()

if err := c.metadata(r, c.current); err != nil {
return fmt.Errorf("error processing metadata: %w", err)
}
return nil
}

res := r.HTTPResponse()
if res == nil {
return nil
Expand Down
9 changes: 7 additions & 2 deletions modules/frontend/combiner/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ func TestGenericCombinerDoesntRace(t *testing.T) {
}

type testPipelineResponse struct {
r *http.Response
r *http.Response
responseData any
}

func newTestResponse(t *testing.T) *testPipelineResponse {
Expand Down Expand Up @@ -230,7 +231,11 @@ func (p *testPipelineResponse) HTTPResponse() *http.Response {
}

func (p *testPipelineResponse) RequestData() any {
return nil
return p.responseData
}

func (p *testPipelineResponse) IsMetadata() bool {
return false
}

func newTestCombiner() *genericCombiner[*tempopb.ServiceStats] {
Expand Down
193 changes: 153 additions & 40 deletions modules/frontend/combiner/search.go
Original file line number Diff line number Diff line change
@@ -1,54 +1,80 @@
package combiner

import (
"sort"
"net/http"

"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)

var _ PipelineResponse = (*SearchJobResponse)(nil)

type SearchShards struct {
TotalJobs uint32
CompletedThroughSeconds uint32
}

type SearchJobResponse struct {
TotalBlocks int
TotalJobs int
TotalBytes uint64
Shards []SearchShards
}

func (s *SearchJobResponse) HTTPResponse() *http.Response {
return nil
}

func (s *SearchJobResponse) RequestData() any {
return nil
}

func (s *SearchJobResponse) IsMetadata() bool {
return true
}

var _ GRPCCombiner[*tempopb.SearchResponse] = (*genericCombiner[*tempopb.SearchResponse])(nil)

// NewSearch returns a search combiner
func NewSearch(limit int) Combiner {
metadataCombiner := traceql.NewMetadataCombiner()
func NewSearch(limit int, keepMostRecent bool) Combiner {
metadataCombiner := traceql.NewMetadataCombiner(limit, keepMostRecent)
diffTraces := map[string]struct{}{}
completedThroughTracker := &ShardCompletionTracker{}

c := &genericCombiner[*tempopb.SearchResponse]{
httpStatusCode: 200,
new: func() *tempopb.SearchResponse { return &tempopb.SearchResponse{} },
current: &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}},
combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, _ PipelineResponse) error {
combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, resp PipelineResponse) error {
requestIdx, ok := resp.RequestData().(int)
if ok {
completedThroughTracker.addShardIdx(requestIdx)
}

for _, t := range partial.Traces {
// if we've reached the limit and this is NOT a new trace then skip it
if limit > 0 &&
metadataCombiner.Count() >= limit &&
!metadataCombiner.Exists(t.TraceID) {
continue
if metadataCombiner.AddMetadata(t) {
// record modified traces
diffTraces[t.TraceID] = struct{}{}
}

metadataCombiner.AddMetadata(t)
// record modified traces
diffTraces[t.TraceID] = struct{}{}
}

if partial.Metrics != nil {
// there is a coordination with the search sharder here. normal responses
// will never have total jobs set, but they will have valid Inspected* values
// a special response is sent back from the sharder with no traces but valid Total* values
// if TotalJobs is nonzero then assume its the special response
if partial.Metrics.TotalJobs == 0 {
final.Metrics.CompletedJobs++

final.Metrics.InspectedBytes += partial.Metrics.InspectedBytes
final.Metrics.InspectedTraces += partial.Metrics.InspectedTraces
} else {
final.Metrics.TotalBlocks += partial.Metrics.TotalBlocks
final.Metrics.TotalJobs += partial.Metrics.TotalJobs
final.Metrics.TotalBlockBytes += partial.Metrics.TotalBlockBytes
}
final.Metrics.CompletedJobs++
final.Metrics.InspectedBytes += partial.Metrics.InspectedBytes
final.Metrics.InspectedTraces += partial.Metrics.InspectedTraces
}

return nil
},
metadata: func(resp PipelineResponse, final *tempopb.SearchResponse) error {
if sj, ok := resp.(*SearchJobResponse); ok && sj != nil {
final.Metrics.TotalBlocks += uint32(sj.TotalBlocks)
final.Metrics.TotalJobs += uint32(sj.TotalJobs)
final.Metrics.TotalBlockBytes += sj.TotalBytes

completedThroughTracker.addShards(sj.Shards)
}

return nil
Expand All @@ -67,34 +93,48 @@ func NewSearch(limit int) Combiner {
Metrics: current.Metrics,
}

for _, tr := range metadataCombiner.Metadata() {
metadataFn := metadataCombiner.Metadata
if keepMostRecent {
metadataFn = func() []*tempopb.TraceSearchMetadata {
completedThroughSeconds := completedThroughTracker.completedThroughSeconds
// if all jobs are completed then let's just return everything the combiner has
if current.Metrics.CompletedJobs == current.Metrics.TotalJobs && current.Metrics.TotalJobs > 0 {
completedThroughSeconds = 1
}

// if we've not completed any shards, then return nothing
if completedThroughSeconds == 0 {
return nil
}

return metadataCombiner.MetadataAfter(completedThroughSeconds)
}
}

for _, tr := range metadataFn() {
// if not in the map, skip. we haven't seen an update
if _, ok := diffTraces[tr.TraceID]; !ok {
continue
}

delete(diffTraces, tr.TraceID)
diff.Traces = append(diff.Traces, tr)
}

sort.Slice(diff.Traces, func(i, j int) bool {
return diff.Traces[i].StartTimeUnixNano > diff.Traces[j].StartTimeUnixNano
})

addRootSpanNotReceivedText(diff.Traces)

// wipe out diff traces for the next time
clear(diffTraces)

return diff, nil
},
// search combiner doesn't use current in the way i would have expected. it only tracks metrics through current and uses the results map for the actual traces.
// should we change this?
quit: func(_ *tempopb.SearchResponse) bool {
if limit <= 0 {
return false
completedThroughSeconds := completedThroughTracker.completedThroughSeconds
// have we completed any shards?
if completedThroughSeconds == 0 {
completedThroughSeconds = traceql.TimestampNever
}

return metadataCombiner.Count() >= limit
return metadataCombiner.IsCompleteFor(completedThroughSeconds)
},
}
initHTTPCombiner(c, api.HeaderAcceptJSON)
Expand All @@ -109,6 +149,79 @@ func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) {
}
}

func NewTypedSearch(limit int) GRPCCombiner[*tempopb.SearchResponse] {
return NewSearch(limit).(GRPCCombiner[*tempopb.SearchResponse])
func NewTypedSearch(limit int, keepMostRecent bool) GRPCCombiner[*tempopb.SearchResponse] {
return NewSearch(limit, keepMostRecent).(GRPCCombiner[*tempopb.SearchResponse])
}

// ShardCompletionTracker
type ShardCompletionTracker struct {
shards []SearchShards
foundResponses []int

completedThroughSeconds uint32
curShard int
}

func (s *ShardCompletionTracker) addShards(shards []SearchShards) uint32 {
if len(shards) == 0 {
return s.completedThroughSeconds
}

s.shards = shards

// grow foundResponses to match while keeping the existing values
if len(s.shards) > len(s.foundResponses) {
temp := make([]int, len(s.shards))
copy(temp, s.foundResponses)
s.foundResponses = temp
}

s.incrementCurShardIfComplete()

return s.completedThroughSeconds
}

// Add adds a response to the tracker and returns the allowed completedThroughSeconds
func (s *ShardCompletionTracker) addShardIdx(shardIdx int) uint32 {
// we haven't received shards yet
if len(s.shards) == 0 {
// if shardIdx doesn't fit in foundResponses then alloc a new slice and copy foundResponses forward
if shardIdx >= len(s.foundResponses) {
temp := make([]int, shardIdx+1)
copy(temp, s.foundResponses)
s.foundResponses = temp
}

// and record this idx for when we get shards
s.foundResponses[shardIdx]++

return 0
}

//
if shardIdx >= len(s.foundResponses) {
return s.completedThroughSeconds
}

s.foundResponses[shardIdx]++
s.incrementCurShardIfComplete()

return s.completedThroughSeconds
}

// incrementCurShardIfComplete tests to see if the current shard is complete and increments it if so.
// it does this repeatedly until it finds a shard that is not complete.
func (s *ShardCompletionTracker) incrementCurShardIfComplete() {
for {
if s.curShard >= len(s.shards) {
break
}

if s.foundResponses[s.curShard] == int(s.shards[s.curShard].TotalJobs) {
s.completedThroughSeconds = s.shards[s.curShard].CompletedThroughSeconds
s.curShard++
} else {
break
}
}
}
Loading