diff --git a/CHANGELOG.md b/CHANGELOG.md index 1202c163dfd..b0f764f8f69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ * [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero) * [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero) * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) +* [ENHANCEMENT] Added most_recent=true query hint to TraceQL to return most recent results. [#4238](https://github.com/grafana/tempo/pull/4238) (@joe-elliott) * [ENHANCEMENT] chore: remove gofakeit dependency [#4274](https://github.com/grafana/tempo/pull/4274) (@javiermolinar) * [ENHANCEMENT] Add a max flush attempts and metric to the metrics generator [#4254](https://github.com/grafana/tempo/pull/4254) (@joe-elliott) * [ENHANCEMENT] Collection of query-frontend changes to reduce allocs. [#4242]https://github.com/grafana/tempo/pull/4242 (@joe-elliott) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 3e48575f5e6..57d3a5e34fd 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -639,8 +639,12 @@ query_frontend: [throughput_bytes_slo: | default = 0 ] # The number of shards to break ingester queries into. - [ingester_shards]: | default = 1] - + [ingester_shards: | default = 1] + + # The number of time windows to break a search up into when doing a most recent TraceQL search. This only impacts TraceQL + # searches with (most_recent=true) + [most_recent_shards: | default = 200] + # SLO configuration for Metadata (tags and tag values) endpoints. metadata_slo: # If set to a non-zero value, it's value will be used to decide if metadata query is within SLO or not. diff --git a/docs/sources/tempo/traceql/_index.md b/docs/sources/tempo/traceql/_index.md index 7bc570be6ae..1f3b51ae0bc 100644 --- a/docs/sources/tempo/traceql/_index.md +++ b/docs/sources/tempo/traceql/_index.md @@ -451,6 +451,15 @@ TraceQL can select arbitrary fields from spans. This is particularly performant { status=error } | select(span.http.status_code, span.http.url) ``` +## Retrieving most recent results (experimental) + +The TraceQL query hint `most_recent=true` can be used with any TraceQL selection query to force Tempo to return the most recent results ordered by time. Examples: + +``` +{} with (most_recent=true) +{ span.foo = "bar" } >> { status = error } with (most_recent=true) +``` + ## Experimental TraceQL metrics TraceQL metrics are experimental, but easy to get started with. Refer to [the TraceQL metrics]({{< relref "../operations/traceql-metrics.md" >}}) documentation for more information. diff --git a/modules/frontend/combiner/common.go b/modules/frontend/combiner/common.go index 7b73a93ac36..cd2e9adbb81 100644 --- a/modules/frontend/combiner/common.go +++ b/modules/frontend/combiner/common.go @@ -23,6 +23,8 @@ type TResponse interface { type PipelineResponse interface { HTTPResponse() *http.Response RequestData() any + + IsMetadata() bool // todo: search and query range pass back metadata responses through a normal http response. update to use this instead. } type genericCombiner[T TResponse] struct { @@ -32,6 +34,7 @@ type genericCombiner[T TResponse] struct { new func() T combine func(partial T, final T, resp PipelineResponse) error + metadata func(resp PipelineResponse, final T) error finalize func(T) (T, error) diff func(T) (T, error) // currently only implemented by the search combiner. required for streaming quit func(T) bool @@ -51,6 +54,16 @@ func initHTTPCombiner[T TResponse](c *genericCombiner[T], marshalingFormat strin // AddResponse is used to add a http response to the combiner. func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error { + if r.IsMetadata() && c.metadata != nil { + c.mu.Lock() + defer c.mu.Unlock() + + if err := c.metadata(r, c.current); err != nil { + return fmt.Errorf("error processing metadata: %w", err) + } + return nil + } + res := r.HTTPResponse() if res == nil { return nil diff --git a/modules/frontend/combiner/common_test.go b/modules/frontend/combiner/common_test.go index 8bee69c1bb4..166943e5017 100644 --- a/modules/frontend/combiner/common_test.go +++ b/modules/frontend/combiner/common_test.go @@ -198,7 +198,8 @@ func TestGenericCombinerDoesntRace(t *testing.T) { } type testPipelineResponse struct { - r *http.Response + r *http.Response + responseData any } func newTestResponse(t *testing.T) *testPipelineResponse { @@ -230,7 +231,11 @@ func (p *testPipelineResponse) HTTPResponse() *http.Response { } func (p *testPipelineResponse) RequestData() any { - return nil + return p.responseData +} + +func (p *testPipelineResponse) IsMetadata() bool { + return false } func newTestCombiner() *genericCombiner[*tempopb.ServiceStats] { diff --git a/modules/frontend/combiner/search.go b/modules/frontend/combiner/search.go index a4c44dad859..f4dca2cddde 100644 --- a/modules/frontend/combiner/search.go +++ b/modules/frontend/combiner/search.go @@ -1,7 +1,7 @@ package combiner import ( - "sort" + "net/http" "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/search" @@ -9,46 +9,72 @@ import ( "github.com/grafana/tempo/pkg/traceql" ) +var _ PipelineResponse = (*SearchJobResponse)(nil) + +type SearchShards struct { + TotalJobs uint32 + CompletedThroughSeconds uint32 +} + +type SearchJobResponse struct { + TotalBlocks int + TotalJobs int + TotalBytes uint64 + Shards []SearchShards +} + +func (s *SearchJobResponse) HTTPResponse() *http.Response { + return nil +} + +func (s *SearchJobResponse) RequestData() any { + return nil +} + +func (s *SearchJobResponse) IsMetadata() bool { + return true +} + var _ GRPCCombiner[*tempopb.SearchResponse] = (*genericCombiner[*tempopb.SearchResponse])(nil) // NewSearch returns a search combiner -func NewSearch(limit int) Combiner { - metadataCombiner := traceql.NewMetadataCombiner() +func NewSearch(limit int, keepMostRecent bool) Combiner { + metadataCombiner := traceql.NewMetadataCombiner(limit, keepMostRecent) diffTraces := map[string]struct{}{} + completedThroughTracker := &ShardCompletionTracker{} c := &genericCombiner[*tempopb.SearchResponse]{ httpStatusCode: 200, new: func() *tempopb.SearchResponse { return &tempopb.SearchResponse{} }, current: &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, - combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, _ PipelineResponse) error { + combine: func(partial *tempopb.SearchResponse, final *tempopb.SearchResponse, resp PipelineResponse) error { + requestIdx, ok := resp.RequestData().(int) + if ok { + completedThroughTracker.addShardIdx(requestIdx) + } + for _, t := range partial.Traces { - // if we've reached the limit and this is NOT a new trace then skip it - if limit > 0 && - metadataCombiner.Count() >= limit && - !metadataCombiner.Exists(t.TraceID) { - continue + if metadataCombiner.AddMetadata(t) { + // record modified traces + diffTraces[t.TraceID] = struct{}{} } - - metadataCombiner.AddMetadata(t) - // record modified traces - diffTraces[t.TraceID] = struct{}{} } if partial.Metrics != nil { - // there is a coordination with the search sharder here. normal responses - // will never have total jobs set, but they will have valid Inspected* values - // a special response is sent back from the sharder with no traces but valid Total* values - // if TotalJobs is nonzero then assume its the special response - if partial.Metrics.TotalJobs == 0 { - final.Metrics.CompletedJobs++ - - final.Metrics.InspectedBytes += partial.Metrics.InspectedBytes - final.Metrics.InspectedTraces += partial.Metrics.InspectedTraces - } else { - final.Metrics.TotalBlocks += partial.Metrics.TotalBlocks - final.Metrics.TotalJobs += partial.Metrics.TotalJobs - final.Metrics.TotalBlockBytes += partial.Metrics.TotalBlockBytes - } + final.Metrics.CompletedJobs++ + final.Metrics.InspectedBytes += partial.Metrics.InspectedBytes + final.Metrics.InspectedTraces += partial.Metrics.InspectedTraces + } + + return nil + }, + metadata: func(resp PipelineResponse, final *tempopb.SearchResponse) error { + if sj, ok := resp.(*SearchJobResponse); ok && sj != nil { + final.Metrics.TotalBlocks += uint32(sj.TotalBlocks) + final.Metrics.TotalJobs += uint32(sj.TotalJobs) + final.Metrics.TotalBlockBytes += sj.TotalBytes + + completedThroughTracker.addShards(sj.Shards) } return nil @@ -67,34 +93,48 @@ func NewSearch(limit int) Combiner { Metrics: current.Metrics, } - for _, tr := range metadataCombiner.Metadata() { + metadataFn := metadataCombiner.Metadata + if keepMostRecent { + metadataFn = func() []*tempopb.TraceSearchMetadata { + completedThroughSeconds := completedThroughTracker.completedThroughSeconds + // if all jobs are completed then let's just return everything the combiner has + if current.Metrics.CompletedJobs == current.Metrics.TotalJobs && current.Metrics.TotalJobs > 0 { + completedThroughSeconds = 1 + } + + // if we've not completed any shards, then return nothing + if completedThroughSeconds == 0 { + return nil + } + + return metadataCombiner.MetadataAfter(completedThroughSeconds) + } + } + + for _, tr := range metadataFn() { // if not in the map, skip. we haven't seen an update if _, ok := diffTraces[tr.TraceID]; !ok { continue } + delete(diffTraces, tr.TraceID) diff.Traces = append(diff.Traces, tr) } - sort.Slice(diff.Traces, func(i, j int) bool { - return diff.Traces[i].StartTimeUnixNano > diff.Traces[j].StartTimeUnixNano - }) - addRootSpanNotReceivedText(diff.Traces) - // wipe out diff traces for the next time - clear(diffTraces) - return diff, nil }, // search combiner doesn't use current in the way i would have expected. it only tracks metrics through current and uses the results map for the actual traces. // should we change this? quit: func(_ *tempopb.SearchResponse) bool { - if limit <= 0 { - return false + completedThroughSeconds := completedThroughTracker.completedThroughSeconds + // have we completed any shards? + if completedThroughSeconds == 0 { + completedThroughSeconds = traceql.TimestampNever } - return metadataCombiner.Count() >= limit + return metadataCombiner.IsCompleteFor(completedThroughSeconds) }, } initHTTPCombiner(c, api.HeaderAcceptJSON) @@ -109,6 +149,79 @@ func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) { } } -func NewTypedSearch(limit int) GRPCCombiner[*tempopb.SearchResponse] { - return NewSearch(limit).(GRPCCombiner[*tempopb.SearchResponse]) +func NewTypedSearch(limit int, keepMostRecent bool) GRPCCombiner[*tempopb.SearchResponse] { + return NewSearch(limit, keepMostRecent).(GRPCCombiner[*tempopb.SearchResponse]) +} + +// ShardCompletionTracker +type ShardCompletionTracker struct { + shards []SearchShards + foundResponses []int + + completedThroughSeconds uint32 + curShard int +} + +func (s *ShardCompletionTracker) addShards(shards []SearchShards) uint32 { + if len(shards) == 0 { + return s.completedThroughSeconds + } + + s.shards = shards + + // grow foundResponses to match while keeping the existing values + if len(s.shards) > len(s.foundResponses) { + temp := make([]int, len(s.shards)) + copy(temp, s.foundResponses) + s.foundResponses = temp + } + + s.incrementCurShardIfComplete() + + return s.completedThroughSeconds +} + +// Add adds a response to the tracker and returns the allowed completedThroughSeconds +func (s *ShardCompletionTracker) addShardIdx(shardIdx int) uint32 { + // we haven't received shards yet + if len(s.shards) == 0 { + // if shardIdx doesn't fit in foundResponses then alloc a new slice and copy foundResponses forward + if shardIdx >= len(s.foundResponses) { + temp := make([]int, shardIdx+1) + copy(temp, s.foundResponses) + s.foundResponses = temp + } + + // and record this idx for when we get shards + s.foundResponses[shardIdx]++ + + return 0 + } + + // + if shardIdx >= len(s.foundResponses) { + return s.completedThroughSeconds + } + + s.foundResponses[shardIdx]++ + s.incrementCurShardIfComplete() + + return s.completedThroughSeconds +} + +// incrementCurShardIfComplete tests to see if the current shard is complete and increments it if so. +// it does this repeatedly until it finds a shard that is not complete. +func (s *ShardCompletionTracker) incrementCurShardIfComplete() { + for { + if s.curShard >= len(s.shards) { + break + } + + if s.foundResponses[s.curShard] == int(s.shards[s.curShard].TotalJobs) { + s.completedThroughSeconds = s.shards[s.curShard].CompletedThroughSeconds + s.curShard++ + } else { + break + } + } } diff --git a/modules/frontend/combiner/search_test.go b/modules/frontend/combiner/search_test.go index e1c23724689..778666bc7c1 100644 --- a/modules/frontend/combiner/search_test.go +++ b/modules/frontend/combiner/search_test.go @@ -2,7 +2,6 @@ package combiner import ( "io" - "math" "net/http" "strings" "testing" @@ -17,35 +16,35 @@ import ( "google.golang.org/grpc/codes" ) -func TestSearchProgressShouldQuit(t *testing.T) { +func TestSearchProgressShouldQuitAny(t *testing.T) { // new combiner should not quit - c := NewSearch(0) + c := NewSearch(0, false) should := c.ShouldQuit() require.False(t, should) // 500 response should quit - c = NewSearch(0) + c = NewSearch(0, false) err := c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 500)) require.NoError(t, err) should = c.ShouldQuit() require.True(t, should) // 429 response should quit - c = NewSearch(0) + c = NewSearch(0, false) err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 429)) require.NoError(t, err) should = c.ShouldQuit() require.True(t, should) // unparseable body should not quit, but should return an error - c = NewSearch(0) - err = c.AddResponse(&pipelineResponse{&http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}}) + c = NewSearch(0, false) + err = c.AddResponse(&testPipelineResponse{r: &http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}}) require.Error(t, err) should = c.ShouldQuit() require.False(t, should) // under limit should not quit - c = NewSearch(2) + c = NewSearch(2, false) err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { @@ -58,7 +57,7 @@ func TestSearchProgressShouldQuit(t *testing.T) { require.False(t, should) // over limit should quit - c = NewSearch(1) + c = NewSearch(1, false) err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { @@ -74,331 +73,776 @@ func TestSearchProgressShouldQuit(t *testing.T) { require.True(t, should) } -func TestSearchCombinesResults(t *testing.T) { - start := time.Date(1, 2, 3, 4, 5, 6, 7, time.UTC) - traceID := "traceID" +func TestSearchProgressShouldQuitMostRecent(t *testing.T) { + // new combiner should not quit + c := NewSearch(0, true) + should := c.ShouldQuit() + require.False(t, should) - c := NewSearch(10) - sr := toHTTPResponse(t, &tempopb.SearchResponse{ + // 500 response should quit + c = NewSearch(0, true) + err := c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 500)) + require.NoError(t, err) + should = c.ShouldQuit() + require.True(t, should) + + // 429 response should quit + c = NewSearch(0, true) + err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{}, 429)) + require.NoError(t, err) + should = c.ShouldQuit() + require.True(t, should) + + // unparseable body should not quit, but should return an error + c = NewSearch(0, true) + err = c.AddResponse(&testPipelineResponse{r: &http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}}) + require.Error(t, err) + should = c.ShouldQuit() + require.False(t, should) + + // under limit should not quit + c = NewSearch(2, true) + err = c.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: traceID, - StartTimeUnixNano: uint64(start.Add(time.Second).UnixNano()), - DurationMs: uint32(time.Second.Milliseconds()), - }, // 1 second after start and shorter duration - { - TraceID: traceID, - StartTimeUnixNano: uint64(start.UnixNano()), - DurationMs: uint32(time.Hour.Milliseconds()), - }, // earliest start time and longer duration - { - TraceID: traceID, - StartTimeUnixNano: uint64(start.Add(time.Hour).UnixNano()), - DurationMs: uint32(time.Millisecond.Milliseconds()), - }, // 1 hour after start and shorter duration + TraceID: "1", + }, }, - Metrics: &tempopb.SearchMetrics{}, - }, 200) - err := c.AddResponse(sr) + }, 200)) require.NoError(t, err) + should = c.ShouldQuit() + require.False(t, should) - expected := &tempopb.SearchResponse{ + // over limit but no search job response, should not quit + c = NewSearch(1, true) + err = c.AddResponse(toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: traceID, - StartTimeUnixNano: uint64(start.UnixNano()), - DurationMs: uint32(time.Hour.Milliseconds()), - RootServiceName: search.RootSpanNotYetReceivedText, + TraceID: "1", + StartTimeUnixNano: uint64(100 * time.Second), + }, + { + TraceID: "2", + StartTimeUnixNano: uint64(200 * time.Second), }, }, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 1, + }, 200, 0)) // 0 is the shard index + require.NoError(t, err) + should = c.ShouldQuit() + require.False(t, should) + + // send shards. should not quit b/c completed through is 300 + err = c.AddResponse(&SearchJobResponse{ + TotalJobs: 3, + Shards: []SearchShards{ + { + TotalJobs: 1, + CompletedThroughSeconds: 300, + }, + { + TotalJobs: 1, + CompletedThroughSeconds: 150, + }, + { + TotalJobs: 1, + CompletedThroughSeconds: 50, + }, }, - } + }) + require.NoError(t, err) + should = c.ShouldQuit() + require.False(t, should) - resp, err := c.HTTPFinal() + // add complete the second shard. quit should be true b/c completed through is 150, our limit is one and we have a trace at 200 + err = c.AddResponse(toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "3", + StartTimeUnixNano: uint64(50 * time.Second), + }, + }, + }, 200, 1)) // 1 is the shard index require.NoError(t, err) + should = c.ShouldQuit() + require.True(t, should) +} - actual := &tempopb.SearchResponse{} - fromHTTPResponse(t, resp, actual) +func TestSearchCombinesResults(t *testing.T) { + for _, keepMostRecent := range []bool{true, false} { + start := time.Date(1, 2, 3, 4, 5, 6, 7, time.UTC) + traceID := "traceID" + + c := NewSearch(10, keepMostRecent) + sr := toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: traceID, + StartTimeUnixNano: uint64(start.Add(time.Second).UnixNano()), + DurationMs: uint32(time.Second.Milliseconds()), + }, // 1 second after start and shorter duration + { + TraceID: traceID, + StartTimeUnixNano: uint64(start.UnixNano()), + DurationMs: uint32(time.Hour.Milliseconds()), + }, // earliest start time and longer duration + { + TraceID: traceID, + StartTimeUnixNano: uint64(start.Add(time.Hour).UnixNano()), + DurationMs: uint32(time.Millisecond.Milliseconds()), + }, // 1 hour after start and shorter duration + }, + Metrics: &tempopb.SearchMetrics{}, + }, 200) + err := c.AddResponse(sr) + require.NoError(t, err) + + expected := &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: traceID, + StartTimeUnixNano: uint64(start.UnixNano()), + DurationMs: uint32(time.Hour.Milliseconds()), + RootServiceName: search.RootSpanNotYetReceivedText, + }, + }, + Metrics: &tempopb.SearchMetrics{ + CompletedJobs: 1, + }, + } - require.Equal(t, expected, actual) + resp, err := c.HTTPFinal() + require.NoError(t, err) + + actual := &tempopb.SearchResponse{} + fromHTTPResponse(t, resp, actual) + + require.Equal(t, expected, actual) + } } func TestSearchResponseCombiner(t *testing.T) { + for _, keepMostRecent := range []bool{true, false} { + tests := []struct { + name string + response1 PipelineResponse + response2 PipelineResponse + + expectedStatus int + expectedResponse *tempopb.SearchResponse + expectedHTTPError error + expectedGRPCError error + }{ + { + name: "empty returns", + response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + expectedStatus: 200, + expectedResponse: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{}, + Metrics: &tempopb.SearchMetrics{ + CompletedJobs: 2, + }, + }, + }, + { + name: "404+200", + response1: toHTTPResponse(t, nil, 404), + response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + expectedStatus: 404, + expectedGRPCError: status.Error(codes.NotFound, ""), + }, + { + name: "200+400", + response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + response2: toHTTPResponse(t, nil, 400), + expectedStatus: 400, + expectedGRPCError: status.Error(codes.InvalidArgument, ""), + }, + { + name: "200+429", + response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + response2: toHTTPResponse(t, nil, 429), + expectedStatus: 429, + expectedGRPCError: status.Error(codes.ResourceExhausted, ""), + }, + { + name: "500+404", + response1: toHTTPResponse(t, nil, 500), + response2: toHTTPResponse(t, nil, 404), + expectedStatus: 500, + expectedGRPCError: status.Error(codes.Internal, ""), + }, + { + name: "404+500 - first bad response wins", + response1: toHTTPResponse(t, nil, 404), + response2: toHTTPResponse(t, nil, 500), + expectedStatus: 404, + expectedGRPCError: status.Error(codes.NotFound, ""), + }, + { + name: "500+200", + response1: toHTTPResponse(t, nil, 500), + response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + expectedStatus: 500, + expectedGRPCError: status.Error(codes.Internal, ""), + }, + { + name: "200+500", + response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), + response2: toHTTPResponse(t, nil, 500), + expectedStatus: 500, + expectedGRPCError: status.Error(codes.Internal, ""), + }, + { + name: "respects total blocks message", + response1: &SearchJobResponse{ + TotalBlocks: 5, + TotalJobs: 10, + TotalBytes: 15, + }, + response2: toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "5678", + StartTimeUnixNano: 0, + }, + }, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 5, + InspectedBytes: 7, + }, + }, 200), + expectedStatus: 200, + expectedResponse: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "5678", + StartTimeUnixNano: 0, + RootServiceName: search.RootSpanNotYetReceivedText, + }, + }, + Metrics: &tempopb.SearchMetrics{ + TotalBlocks: 5, + TotalJobs: 10, + TotalBlockBytes: 15, + InspectedTraces: 5, + InspectedBytes: 7, + CompletedJobs: 1, + }, + }, + }, + { + name: "200+200", + response1: toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "1234", + StartTimeUnixNano: 1, + }, + }, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 1, + TotalBlocks: 2, + InspectedBytes: 3, + }, + }, 200), + response2: toHTTPResponse(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "5678", + StartTimeUnixNano: 0, + }, + }, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 5, + TotalBlocks: 6, + InspectedBytes: 7, + }, + }, 200), + expectedStatus: 200, + expectedResponse: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "1234", + StartTimeUnixNano: 1, + RootServiceName: search.RootSpanNotYetReceivedText, + }, + { + TraceID: "5678", + StartTimeUnixNano: 0, + RootServiceName: search.RootSpanNotYetReceivedText, + }, + }, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 6, + InspectedBytes: 10, + CompletedJobs: 2, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + combiner := NewTypedSearch(20, keepMostRecent) + + err := combiner.AddResponse(tc.response1) + require.NoError(t, err) + err = combiner.AddResponse(tc.response2) + require.NoError(t, err) + + httpResp, err := combiner.HTTPFinal() + require.Equal(t, tc.expectedStatus, httpResp.StatusCode) + require.Equal(t, tc.expectedHTTPError, err) + + grpcresp, err := combiner.GRPCFinal() + require.Equal(t, tc.expectedGRPCError, err) + require.Equal(t, tc.expectedResponse, grpcresp) + }) + } + } +} + +func TestCombinerShards(t *testing.T) { tests := []struct { - name string - response1 PipelineResponse - response2 PipelineResponse - - expectedStatus int - expectedResponse *tempopb.SearchResponse - expectedHTTPError error - expectedGRPCError error + name string + pipelineResponse PipelineResponse + expected *tempopb.SearchResponse }{ { - name: "empty returns", - response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - expectedStatus: 200, - expectedResponse: &tempopb.SearchResponse{ + name: "initial state", + pipelineResponse: nil, + expected: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{}, + Metrics: &tempopb.SearchMetrics{}, + }, + }, + { + name: "add job metadata", + pipelineResponse: &SearchJobResponse{ + TotalBlocks: 5, + TotalJobs: 6, + TotalBytes: 15, + Shards: []SearchShards{ // 5 shards, 2 jobs each. starting at 500 seconds and walking back 100 seconds each + { + TotalJobs: 2, + CompletedThroughSeconds: 500, + }, + { + TotalJobs: 1, + CompletedThroughSeconds: 400, + }, + { + TotalJobs: 1, + CompletedThroughSeconds: 300, + }, + { + TotalJobs: 1, + CompletedThroughSeconds: 200, + }, + { + TotalJobs: 1, + CompletedThroughSeconds: 100, + }, + }, + }, + expected: &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{}, Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 2, + TotalBlocks: 5, + TotalJobs: 6, + TotalBlockBytes: 15, }, }, }, { - name: "404+200", - response1: toHTTPResponse(t, nil, 404), - response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - expectedStatus: 404, - expectedGRPCError: status.Error(codes.NotFound, ""), - }, - { - name: "200+400", - response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - response2: toHTTPResponse(t, nil, 400), - expectedStatus: 400, - expectedGRPCError: status.Error(codes.InvalidArgument, ""), - }, - { - name: "200+429", - response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - response2: toHTTPResponse(t, nil, 429), - expectedStatus: 429, - expectedGRPCError: status.Error(codes.ResourceExhausted, ""), - }, - { - name: "500+404", - response1: toHTTPResponse(t, nil, 500), - response2: toHTTPResponse(t, nil, 404), - expectedStatus: 500, - expectedGRPCError: status.Error(codes.Internal, ""), - }, - { - name: "404+500 - first bad response wins", - response1: toHTTPResponse(t, nil, 404), - response2: toHTTPResponse(t, nil, 500), - expectedStatus: 404, - expectedGRPCError: status.Error(codes.NotFound, ""), - }, - { - name: "500+200", - response1: toHTTPResponse(t, nil, 500), - response2: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - expectedStatus: 500, - expectedGRPCError: status.Error(codes.Internal, ""), - }, - { - name: "200+500", - response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200), - response2: toHTTPResponse(t, nil, 500), - expectedStatus: 500, - expectedGRPCError: status.Error(codes.Internal, ""), - }, - { - name: "respects total blocks message", - response1: toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: nil, + name: "add response results", + pipelineResponse: toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "450", + RootServiceName: "root-450", + StartTimeUnixNano: uint64(450 * time.Second), + }, + { + TraceID: "550", + RootServiceName: "root-550", + StartTimeUnixNano: uint64(550 * time.Second), + }, + }, Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 1, + InspectedBytes: 2, + }, + }, 200, 0), // shard 0 + expected: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{}, // no traces b/c only one job has finished and the first shard has 2 jobs + Metrics: &tempopb.SearchMetrics{ // metadata is incrementing + CompletedJobs: 1, + InspectedTraces: 1, + InspectedBytes: 2, TotalBlocks: 5, - TotalJobs: 10, + TotalJobs: 6, TotalBlockBytes: 15, }, - }, 200), - response2: toHTTPResponse(t, &tempopb.SearchResponse{ + }, + }, + { + name: "add second job to finish the first shard and get one result", + pipelineResponse: toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: "5678", - StartTimeUnixNano: 0, + TraceID: "350", + RootServiceName: "root-350", + StartTimeUnixNano: uint64(350 * time.Second), }, }, Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 5, - InspectedBytes: 7, + InspectedTraces: 1, + InspectedBytes: 2, }, - }, 200), - expectedStatus: 200, - expectedResponse: &tempopb.SearchResponse{ + }, 200, 0), // shard 0, + expected: &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: "5678", - StartTimeUnixNano: 0, - RootServiceName: search.RootSpanNotYetReceivedText, + TraceID: "550", + RootServiceName: "root-550", + StartTimeUnixNano: uint64(550 * time.Second), }, }, - Metrics: &tempopb.SearchMetrics{ + Metrics: &tempopb.SearchMetrics{ // metadata is incrementing + CompletedJobs: 2, + InspectedTraces: 2, + InspectedBytes: 4, TotalBlocks: 5, - TotalJobs: 10, + TotalJobs: 6, TotalBlockBytes: 15, - InspectedTraces: 5, - InspectedBytes: 7, - CompletedJobs: 1, }, }, }, { - name: "200+200", - response1: toHTTPResponse(t, &tempopb.SearchResponse{ + name: "update response results", + pipelineResponse: toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: "1234", - StartTimeUnixNano: 1, + TraceID: "550", + RootServiceName: "root-550", + RootTraceName: "root-550", + StartTimeUnixNano: uint64(550 * time.Second), }, }, Metrics: &tempopb.SearchMetrics{ InspectedTraces: 1, - TotalBlocks: 2, - InspectedBytes: 3, + InspectedBytes: 2, }, - }, 200), - response2: toHTTPResponse(t, &tempopb.SearchResponse{ + }, 200, 1), // complete shard 1 + expected: &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "5678", - StartTimeUnixNano: 0, + { // included b/c updated + TraceID: "550", + RootServiceName: "root-550", + RootTraceName: "root-550", + StartTimeUnixNano: uint64(550 * time.Second), + }, + { // included b/c second shard is done + TraceID: "450", + RootServiceName: "root-450", + StartTimeUnixNano: uint64(450 * time.Second), }, }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 5, - TotalBlocks: 6, - InspectedBytes: 7, + Metrics: &tempopb.SearchMetrics{ // metadata is incrementing + CompletedJobs: 3, + InspectedTraces: 3, + InspectedBytes: 6, + TotalBlocks: 5, + TotalJobs: 6, + TotalBlockBytes: 15, }, - }, 200), - expectedStatus: 200, - expectedResponse: &tempopb.SearchResponse{ + }, + }, + { + name: "skip a shard and see no change", + pipelineResponse: toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: "1234", - StartTimeUnixNano: 1, - RootServiceName: search.RootSpanNotYetReceivedText, + TraceID: "50", + RootServiceName: "root-50", + StartTimeUnixNano: uint64(50 * time.Second), }, + }, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 1, + InspectedBytes: 2, + }, + }, 200, 3), // complete shard 3, + expected: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{}, // no traces b/c we skipped shard 2 and we can't include results from 3 until 2 is done + Metrics: &tempopb.SearchMetrics{ // metadata is incrementing + CompletedJobs: 4, + InspectedTraces: 4, + InspectedBytes: 8, + TotalBlocks: 5, + TotalJobs: 6, + TotalBlockBytes: 15, + }, + }, + }, + { + name: "fill in shard 2 and see results", + pipelineResponse: toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{}, + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 1, + InspectedBytes: 2, + }, + }, 200, 2), // complete shard 2, + expected: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: "5678", - StartTimeUnixNano: 0, - RootServiceName: search.RootSpanNotYetReceivedText, + TraceID: "350", + RootServiceName: "root-350", + StartTimeUnixNano: uint64(350 * time.Second), }, }, + Metrics: &tempopb.SearchMetrics{ // metadata is incrementing + CompletedJobs: 5, + InspectedTraces: 5, + InspectedBytes: 10, + TotalBlocks: 5, + TotalJobs: 6, + TotalBlockBytes: 15, + }, + }, + }, + { + name: "complete all shards which dumps all results", + pipelineResponse: toHTTPResponseWithResponseData(t, &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{}, Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 1, + InspectedBytes: 2, + }, + }, 200, 4), // complete shard 4, + expected: &tempopb.SearchResponse{ + Traces: []*tempopb.TraceSearchMetadata{ + { + TraceID: "50", + RootServiceName: "root-50", + StartTimeUnixNano: uint64(50 * time.Second), + }, + }, // 50 is BEFORE the earliest trace shard, but it's still returned b/c at this point we have completed all jobs + Metrics: &tempopb.SearchMetrics{ // metadata is incrementing + CompletedJobs: 6, InspectedTraces: 6, - InspectedBytes: 10, - CompletedJobs: 2, + InspectedBytes: 12, + TotalBlocks: 5, + TotalJobs: 6, + TotalBlockBytes: 15, }, }, }, } + // apply tests one at a time to the combiner and check expected results + + combiner := NewTypedSearch(5, true) for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - combiner := NewTypedSearch(20) + if tc.pipelineResponse != nil { + err := combiner.AddResponse(tc.pipelineResponse) + require.NoError(t, err) + } - err := combiner.AddResponse(tc.response1) - require.NoError(t, err) - err = combiner.AddResponse(tc.response2) + resp, err := combiner.GRPCDiff() require.NoError(t, err) - - httpResp, err := combiner.HTTPFinal() - require.Equal(t, tc.expectedStatus, httpResp.StatusCode) - require.Equal(t, tc.expectedHTTPError, err) - - grpcresp, err := combiner.GRPCFinal() - require.Equal(t, tc.expectedGRPCError, err) - require.Equal(t, tc.expectedResponse, grpcresp) + require.Equal(t, tc.expected, resp) }) } } -func TestSearchDiffsResults(t *testing.T) { - traceID := "traceID" - - c := NewTypedSearch(10) - sr := toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: traceID, +func TestCompletionTracker(t *testing.T) { + tcs := []struct { + name string + add []int // -1 means send shards + shards []SearchShards + exp uint32 + }{ + // shards only + { + name: "shards only", + add: []int{-1}, + shards: []SearchShards{ + { + TotalJobs: 1, + CompletedThroughSeconds: 100, + }, }, + exp: 0, }, - Metrics: &tempopb.SearchMetrics{}, - }, 200) - expectedDiff := &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: traceID, - RootServiceName: search.RootSpanNotYetReceivedText, + // indexes only + { + name: "indexes only", + add: []int{1, 0, 1, 3, 2, 0, 1, 1}, + shards: []SearchShards{ + { + TotalJobs: 1, + CompletedThroughSeconds: 100, + }, }, + exp: 0, }, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 1, + // first shard complete, shards first + { + name: "first shard complete, shards first", + add: []int{-1, 0}, + shards: []SearchShards{ + { + TotalJobs: 1, + CompletedThroughSeconds: 100, + }, + }, + exp: 100, }, - } - expectedNoDiff := &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{}, - Metrics: &tempopb.SearchMetrics{}, - } - - // haven't added anything yet - actual, err := c.GRPCDiff() - require.NoError(t, err) - require.Equal(t, expectedNoDiff, actual) - - // add a trace and get it back in diff - err = c.AddResponse(sr) - require.NoError(t, err) - - actual, err = c.GRPCDiff() - require.NoError(t, err) - require.Equal(t, expectedDiff, actual) - - // now we should get no diff again (with 1 completed job) - expectedNoDiff.Metrics.CompletedJobs = 1 - actual, err = c.GRPCDiff() - require.NoError(t, err) - require.Equal(t, expectedNoDiff, actual) - - // let's add a different trace and get it back in diff - traceID2 := "traceID2" - sr2 := toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: traceID2, + // first shard complete, index first + { + name: "first shard complete, index first", + add: []int{0, -1}, + shards: []SearchShards{ + { + TotalJobs: 1, + CompletedThroughSeconds: 100, + }, }, + exp: 100, }, - Metrics: &tempopb.SearchMetrics{}, - }, 200) - expectedDiff2 := &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: traceID2, - RootServiceName: search.RootSpanNotYetReceivedText, + // shards received at various times + { + name: "shards received at various times", + add: []int{-1, 0, 0, 1, 1}, + shards: []SearchShards{ + { + TotalJobs: 2, + CompletedThroughSeconds: 100, + }, + { + TotalJobs: 2, + CompletedThroughSeconds: 200, + }, }, + exp: 200, }, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 2, // we will have 2 completed jobs at this point + { + name: "shards received at various times", + add: []int{0, -1, 0, 1, 1}, + shards: []SearchShards{ + { + TotalJobs: 2, + CompletedThroughSeconds: 100, + }, + { + TotalJobs: 2, + CompletedThroughSeconds: 200, + }, + }, + exp: 200, + }, + { + name: "shards received at various times", + add: []int{0, 0, 1, -1, 1}, + shards: []SearchShards{ + { + TotalJobs: 2, + CompletedThroughSeconds: 100, + }, + { + TotalJobs: 2, + CompletedThroughSeconds: 200, + }, + }, + exp: 200, + }, + { + name: "shards received at various times", + add: []int{0, 0, 1, 1, -1}, + shards: []SearchShards{ + { + TotalJobs: 2, + CompletedThroughSeconds: 100, + }, + { + TotalJobs: 2, + CompletedThroughSeconds: 200, + }, + }, + exp: 200, + }, + // bad data received + { + name: "bad data received last", + add: []int{-1, 0, 0, 2}, + shards: []SearchShards{ + { + TotalJobs: 2, + CompletedThroughSeconds: 100, + }, + { + TotalJobs: 2, + CompletedThroughSeconds: 200, + }, + }, + exp: 100, + }, + { + name: "bad data immediately after shards", + add: []int{-1, 2, 0, 0}, + shards: []SearchShards{ + { + TotalJobs: 2, + CompletedThroughSeconds: 100, + }, + { + TotalJobs: 2, + CompletedThroughSeconds: 200, + }, + }, + exp: 100, + }, + { + name: "bad data immediately before shards", + add: []int{0, 0, 2, -1}, + shards: []SearchShards{ + { + TotalJobs: 2, + CompletedThroughSeconds: 100, + }, + { + TotalJobs: 2, + CompletedThroughSeconds: 200, + }, + }, + exp: 100, }, } - err = c.AddResponse(sr2) - require.NoError(t, err) - - actual, err = c.GRPCDiff() - require.NoError(t, err) - require.Equal(t, expectedDiff2, actual) -} - -type pipelineResponse struct { - r *http.Response -} + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tracker := &ShardCompletionTracker{} -func (p *pipelineResponse) HTTPResponse() *http.Response { - return p.r -} + ct := uint32(0) + for _, sc := range tc.add { + if sc == -1 { + ct = tracker.addShards(tc.shards) + continue + } -func (p *pipelineResponse) RequestData() any { - return nil + ct = tracker.addShardIdx(sc) + } + require.Equal(t, int(tc.exp), int(ct)) + }) + } } -func toHTTPResponse(t *testing.T, pb proto.Message, statusCode int) PipelineResponse { +func toHTTPResponseWithResponseData(t *testing.T, pb proto.Message, statusCode int, responseData any) PipelineResponse { var body string if pb != nil { @@ -408,198 +852,20 @@ func toHTTPResponse(t *testing.T, pb proto.Message, statusCode int) PipelineResp require.NoError(t, err) } - return &pipelineResponse{&http.Response{ - Body: io.NopCloser(strings.NewReader(body)), - StatusCode: statusCode, - }} + return &testPipelineResponse{ + responseData: responseData, + r: &http.Response{ + Body: io.NopCloser(strings.NewReader(body)), + StatusCode: statusCode, + }, + } } -func fromHTTPResponse(t *testing.T, r *http.Response, pb proto.Message) { - err := jsonpb.Unmarshal(r.Body, pb) - require.NoError(t, err) +func toHTTPResponse(t *testing.T, pb proto.Message, statusCode int) PipelineResponse { + return toHTTPResponseWithResponseData(t, pb, statusCode, nil) } -func TestCombinerDiffs(t *testing.T) { - combiner := NewTypedSearch(100) - - // first request should be empty - resp, err := combiner.GRPCDiff() - require.NoError(t, err) - require.Equal(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{}, - Metrics: &tempopb.SearchMetrics{}, - }, resp) - - err = combiner.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "1234", - RootServiceName: "root", - }, - }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 1, - InspectedBytes: 2, - }, - }, 200)) - require.NoError(t, err) - - // now we should get the same metadata as above - resp, err = combiner.GRPCDiff() - require.NoError(t, err) - require.Equal(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "1234", - RootServiceName: "root", - }, - }, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 1, - InspectedTraces: 1, - InspectedBytes: 2, - }, - }, resp) - - // metrics, but the trace hasn't change - resp, err = combiner.GRPCDiff() - require.NoError(t, err) - require.Equal(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{}, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 1, - InspectedTraces: 1, - InspectedBytes: 2, - }, - }, resp) - - // new traces - err = combiner.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "5678", - RootServiceName: "root", - StartTimeUnixNano: 1, // forces order - }, - { - TraceID: "9011", - RootServiceName: "root", - StartTimeUnixNano: 2, - }, - }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 1, - InspectedBytes: 2, - }, - }, 200)) - require.NoError(t, err) - - resp, err = combiner.GRPCDiff() - require.NoError(t, err) - require.Equal(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "9011", - RootServiceName: "root", - StartTimeUnixNano: 2, - }, - { - TraceID: "5678", - RootServiceName: "root", - StartTimeUnixNano: 1, - }, - }, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 2, - InspectedTraces: 2, - InspectedBytes: 4, - }, - }, resp) - - // write over existing trace - err = combiner.AddResponse(toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "1234", - DurationMs: 100, - }, - }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 1, - InspectedBytes: 2, - }, - }, 200)) - require.NoError(t, err) - - resp, err = combiner.GRPCDiff() +func fromHTTPResponse(t *testing.T, r *http.Response, pb proto.Message) { + err := jsonpb.Unmarshal(r.Body, pb) require.NoError(t, err) - require.Equal(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: "1234", - RootServiceName: "root", - DurationMs: 100, - }, - }, - Metrics: &tempopb.SearchMetrics{ - CompletedJobs: 3, - InspectedTraces: 3, - InspectedBytes: 6, - }, - }, resp) -} - -func TestSearchCombinerDoesNotRace(t *testing.T) { - end := make(chan struct{}) - concurrent := func(f func()) { - for { - select { - case <-end: - return - default: - f() - } - } - } - - traceID := "1234" - combiner := NewTypedSearch(10) - i := 0 - go concurrent(func() { - i++ - resp := toHTTPResponse(t, &tempopb.SearchResponse{ - Traces: []*tempopb.TraceSearchMetadata{ - { - TraceID: traceID, - StartTimeUnixNano: math.MaxUint64 - uint64(i), - DurationMs: uint32(i), - SpanSets: []*tempopb.SpanSet{{ - Matched: uint32(i), - }}, - }, - }, - Metrics: &tempopb.SearchMetrics{ - InspectedTraces: 1, - InspectedBytes: 1, - TotalBlocks: 1, - TotalJobs: 1, - CompletedJobs: 1, - }, - }, 200) - _ = combiner.AddResponse(resp) - }) - - go concurrent(func() { - _, _ = combiner.GRPCFinal() - }) - - go concurrent(func() { - _, _ = combiner.HTTPFinal() - }) - - time.Sleep(100 * time.Millisecond) - close(end) - // Wait for go funcs to quit before - // exiting and cleaning up - time.Sleep(2 * time.Second) } diff --git a/modules/frontend/combiner/trace_by_id_test.go b/modules/frontend/combiner/trace_by_id_test.go index ac83e8ead6e..5deaaf598f8 100644 --- a/modules/frontend/combiner/trace_by_id_test.go +++ b/modules/frontend/combiner/trace_by_id_test.go @@ -43,7 +43,7 @@ func TestTraceByIDShouldQuit(t *testing.T) { // unparseable body should not quit, but should return an error c = NewTraceByID(0, api.HeaderAcceptJSON) - err = c.AddResponse(&pipelineResponse{&http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}}) + err = c.AddResponse(&testPipelineResponse{r: &http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}}) require.Error(t, err) should = c.ShouldQuit() require.False(t, should) @@ -101,7 +101,7 @@ func toHTTPProtoResponse(t *testing.T, pb proto.Message, statusCode int) Pipelin require.NoError(t, err) } - return &pipelineResponse{&http.Response{ + return &testPipelineResponse{r: &http.Response{ Body: io.NopCloser(bytes.NewReader(body)), StatusCode: statusCode, }} diff --git a/modules/frontend/combiner/trace_by_id_v2_test.go b/modules/frontend/combiner/trace_by_id_v2_test.go index 7df3095d443..6d6d6dc933f 100644 --- a/modules/frontend/combiner/trace_by_id_v2_test.go +++ b/modules/frontend/combiner/trace_by_id_v2_test.go @@ -27,6 +27,10 @@ func (m MockResponse) RequestData() any { return nil } +func (m MockResponse) IsMetadata() bool { + return false +} + func TestNewTraceByIdV2ReturnsAPartialTrace(t *testing.T) { traceResponse := &tempopb.TraceByIDResponse{ Trace: test.MakeTrace(2, []byte{0x01, 0x02}), diff --git a/modules/frontend/config.go b/modules/frontend/config.go index 0ba194df4b2..35e7c2b7a0a 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -79,6 +79,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) { ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, IngesterShards: 1, + MostRecentShards: defaultMostRecentShards, }, SLO: slo, } diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 9b6a472f6b7..7315849f264 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -78,6 +78,10 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t return nil, fmt.Errorf("query backend after should be less than or equal to query ingester until") } + if cfg.Search.Sharder.MostRecentShards <= 0 { + return nil, fmt.Errorf("most recent shards must be greater than 0") + } + if cfg.Metrics.Sharder.ConcurrentRequests <= 0 { return nil, fmt.Errorf("frontend metrics concurrent requests should be greater than 0") } diff --git a/modules/frontend/frontend_test.go b/modules/frontend/frontend_test.go index d21214af70e..0713f332435 100644 --- a/modules/frontend/frontend_test.go +++ b/modules/frontend/frontend_test.go @@ -26,6 +26,7 @@ func TestFrontendTagSearchRequiresOrgID(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -69,12 +70,12 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, }, nil, nil, nil, nil, "", log.NewNopLogger(), nil) assert.EqualError(t, err, "frontend query shards should be between 2 and 100000 (both inclusive)") - assert.Nil(t, f) f, err = New(Config{ @@ -86,6 +87,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -102,6 +104,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: 0, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -118,6 +121,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: 0, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -134,6 +138,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, QueryIngestersUntil: time.Minute, QueryBackendAfter: time.Hour, }, @@ -151,6 +156,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -166,6 +172,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -188,6 +195,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -202,4 +210,28 @@ func TestFrontendBadConfigFails(t *testing.T) { }, nil, nil, nil, nil, "", log.NewNopLogger(), nil) assert.EqualError(t, err, "frontend metrics interval should be greater than 0") assert.Nil(t, f) + + f, err = New(Config{ + TraceByID: TraceByIDConfig{ + QueryShards: maxQueryShards, + }, + Search: SearchConfig{ + Sharder: SearchSharderConfig{ + ConcurrentRequests: defaultConcurrentRequests, + TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: 0, + }, + SLO: testSLOcfg, + }, + Metrics: MetricsConfig{ + Sharder: QueryRangeSharderConfig{ + ConcurrentRequests: defaultConcurrentRequests, + TargetBytesPerRequest: defaultTargetBytesPerRequest, + Interval: 5 * time.Minute, + }, + SLO: testSLOcfg, + }, + }, nil, nil, nil, nil, "", log.NewNopLogger(), nil) + assert.EqualError(t, err, "most recent shards must be greater than 0") + assert.Nil(t, f) } diff --git a/modules/frontend/pipeline/responses.go b/modules/frontend/pipeline/responses.go index 8aabb3f67c1..6f9b7ac7513 100644 --- a/modules/frontend/pipeline/responses.go +++ b/modules/frontend/pipeline/responses.go @@ -30,11 +30,22 @@ func (p pipelineResponse) RequestData() any { return p.requestData } +func (p pipelineResponse) IsMetadata() bool { + return false +} + // syncResponse is a single http.Response that implements the Responses[*http.Response] interface. type syncResponse struct { r combiner.PipelineResponse } +// NewAsyncResponse creates a new AsyncResponse that wraps a single http.Response. +func NewAsyncResponse(r combiner.PipelineResponse) Responses[combiner.PipelineResponse] { + return syncResponse{ + r: r, + } +} + // NewHTTPToAsyncResponse creates a new AsyncResponse that wraps a single http.Response. func NewHTTPToAsyncResponse(r *http.Response) Responses[combiner.PipelineResponse] { return syncResponse{ diff --git a/modules/frontend/pipeline/responses_test.go b/modules/frontend/pipeline/responses_test.go index 966f5c1f56f..04f1ffbcaeb 100644 --- a/modules/frontend/pipeline/responses_test.go +++ b/modules/frontend/pipeline/responses_test.go @@ -304,7 +304,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { bridge := &pipelineBridge{ next: tc.finalRT(cancel), } - httpCollector := NewHTTPCollector(sharder{next: bridge}, 0, combiner.NewSearch(0)) + httpCollector := NewHTTPCollector(sharder{next: bridge}, 0, combiner.NewSearch(0, false)) _, _ = httpCollector.RoundTrip(req) @@ -326,7 +326,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { bridge := &pipelineBridge{ next: tc.finalRT(cancel), } - grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil }) + grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil }) _ = grpcCollector.RoundTrip(req) @@ -350,7 +350,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { } s := sharder{next: sharder{next: bridge}, funcSharder: true} - grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil }) + grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil }) _ = grpcCollector.RoundTrip(req) @@ -373,7 +373,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { } s := sharder{next: sharder{next: bridge, funcSharder: true}} - grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil }) + grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false), func(_ *tempopb.SearchResponse) error { return nil }) _ = grpcCollector.RoundTrip(req) diff --git a/modules/frontend/search_handlers.go b/modules/frontend/search_handlers.go index 1675271a27f..a2e2af843e1 100644 --- a/modules/frontend/search_handlers.go +++ b/modules/frontend/search_handlers.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/traceql" ) // newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler @@ -43,14 +44,14 @@ func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c tenant, _ := user.ExtractOrgID(ctx) start := time.Now() - limit, err := adjustLimit(req.Limit, cfg.Search.Sharder.DefaultLimit, cfg.Search.Sharder.MaxLimit) + comb, err := newCombiner(req, cfg.Search.Sharder) if err != nil { - level.Error(logger).Log("msg", "search streaming: adjust limit failed", "err", err) - return status.Errorf(codes.InvalidArgument, "adjust limit: %s", err.Error()) + level.Error(logger).Log("msg", "search streaming: could not create combiner", "err", err) + return status.Error(codes.InvalidArgument, err.Error()) + } var finalResponse *tempopb.SearchResponse - comb := combiner.NewTypedSearch(int(limit)) collector := pipeline.NewGRPCCollector[*tempopb.SearchResponse](next, cfg.ResponseConsumers, comb, func(sr *tempopb.SearchResponse) error { finalResponse = sr // sadly we can't srv.Send directly into the collector. we need bytesProcessed for the SLO calculations return srv.Send(sr) @@ -89,10 +90,9 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P }, nil } - // build combiner with limit - limit, err := adjustLimit(searchReq.Limit, cfg.Search.Sharder.DefaultLimit, cfg.Search.Sharder.MaxLimit) + comb, err := newCombiner(searchReq, cfg.Search.Sharder) if err != nil { - level.Error(logger).Log("msg", "search: adjust limit failed", "err", err) + level.Error(logger).Log("msg", "search: could not create combiner", "err", err) return &http.Response{ StatusCode: http.StatusBadRequest, Status: http.StatusText(http.StatusBadRequest), @@ -103,7 +103,6 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P logRequest(logger, tenant, searchReq) // build and use roundtripper - comb := combiner.NewTypedSearch(int(limit)) rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, comb) resp, err := rt.RoundTrip(req) @@ -122,6 +121,28 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P }) } +func newCombiner(req *tempopb.SearchRequest, cfg SearchSharderConfig) (combiner.GRPCCombiner[*tempopb.SearchResponse], error) { + limit, err := adjustLimit(req.Limit, cfg.DefaultLimit, cfg.MaxLimit) + if err != nil { + return nil, err + } + + mostRecent := false + if len(req.Query) > 0 { + query, err := traceql.Parse(req.Query) + if err != nil { + return nil, fmt.Errorf("invalid TraceQL query: %s", err) + } + + ok := false + if mostRecent, ok = query.Hints.GetBool(traceql.HintMostRecent, false); !ok { + mostRecent = false + } + } + + return combiner.NewTypedSearch(int(limit), mostRecent), nil +} + // adjusts the limit based on provided config func adjustLimit(limit, defaultLimit, maxLimit uint32) (uint32, error) { if limit == 0 { diff --git a/modules/frontend/search_handlers_test.go b/modules/frontend/search_handlers_test.go index 05766eb78ba..7e808ba7a65 100644 --- a/modules/frontend/search_handlers_test.go +++ b/modules/frontend/search_handlers_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" "math/rand" "net/http" "net/http/httptest" @@ -31,7 +32,6 @@ import ( "github.com/grafana/tempo/pkg/cache" "github.com/grafana/tempo/pkg/search" "github.com/grafana/tempo/pkg/tempopb" - "github.com/grafana/tempo/pkg/traceql" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb" @@ -139,14 +139,14 @@ func TestFrontendSearch(t *testing.T) { func runnerBadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http - httpReq := httptest.NewRequest("GET", "/api/search", nil) + httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil) httpResp := httptest.NewRecorder() f.SearchHandler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) // grpc - grpcReq := &tempopb.SearchRequest{} + grpcReq := &tempopb.SearchRequest{Query: "{}"} err := f.streamingSearch(grpcReq, newMockStreamingServer[*tempopb.SearchResponse]("", nil)) require.Equal(t, status.Error(codes.InvalidArgument, "no org id"), err) } @@ -175,8 +175,9 @@ func runnerRequests(t *testing.T, f *QueryFrontend) { expectedStatusCode: 200, expectedResponse: &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{{ - TraceID: "1", - RootServiceName: search.RootSpanNotYetReceivedText, + TraceID: "1", + RootServiceName: search.RootSpanNotYetReceivedText, + StartTimeUnixNano: math.MaxUint64, }}, Metrics: &tempopb.SearchMetrics{ InspectedTraces: 4, @@ -212,8 +213,9 @@ func runnerRequests(t *testing.T, f *QueryFrontend) { expectedStatusCode: 200, expectedResponse: &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{{ - TraceID: "1", - RootServiceName: search.RootSpanNotYetReceivedText, + TraceID: "1", + RootServiceName: search.RootSpanNotYetReceivedText, + StartTimeUnixNano: math.MaxUint64, }}, Metrics: &tempopb.SearchMetrics{ InspectedTraces: 8, @@ -270,7 +272,7 @@ func runnerRequests(t *testing.T, f *QueryFrontend) { func runnerClientCancelContext(t *testing.T, f *QueryFrontend) { // http - httpReq := httptest.NewRequest("GET", "/api/search", nil) + httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil) httpResp := httptest.NewRecorder() ctx, cancel := context.WithCancel(httpReq.Context()) @@ -293,7 +295,7 @@ func runnerClientCancelContext(t *testing.T, f *QueryFrontend) { time.Sleep(50 * time.Millisecond) cancel() }() - grpcReq := &tempopb.SearchRequest{} + grpcReq := &tempopb.SearchRequest{Query: "{}"} err := f.streamingSearch(grpcReq, srv) require.Equal(t, status.Error(codes.Internal, "context canceled"), err) } @@ -304,7 +306,8 @@ func TestSearchLimitHonored(t *testing.T) { return &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: util.TraceIDToHexString(test.ValidTraceID(nil)), + TraceID: util.TraceIDToHexString(test.ValidTraceID(nil)), + StartTimeUnixNano: math.MaxUint64, // forces GRPCDiff in the search combiner to return this trace b/c it's always after CompletedThroughSeconds }, }, Metrics: &tempopb.SearchMetrics{ @@ -323,6 +326,7 @@ func TestSearchLimitHonored(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, DefaultLimit: 10, MaxLimit: 15, }, @@ -381,13 +385,12 @@ func TestSearchLimitHonored(t *testing.T) { tenant := "1|2|3|4|5|6" // due to the blocks we will have 4 trace ids normally - httpReq := httptest.NewRequest("GET", "/api/search", nil) + httpReq := httptest.NewRequest("GET", "/api/search?q={}", nil) httpReq, err := api.BuildSearchRequest(httpReq, tc.request) require.NoError(t, err) ctx := user.InjectOrgID(httpReq.Context(), tenant) httpReq = httpReq.WithContext(ctx) - httpResp := httptest.NewRecorder() f.SearchHandler.ServeHTTP(httpResp, httpReq) @@ -403,18 +406,18 @@ func TestSearchLimitHonored(t *testing.T) { } // grpc - combiner := traceql.NewMetadataCombiner() + distinctTraces := map[string]struct{}{} err = f.streamingSearch(tc.request, newMockStreamingServer(tenant, func(i int, sr *tempopb.SearchResponse) { // combine for _, t := range sr.Traces { - combiner.AddMetadata(t) + distinctTraces[t.TraceID] = struct{}{} } })) if tc.badRequest { - require.Equal(t, status.Error(codes.InvalidArgument, "adjust limit: limit 20 exceeds max limit 15"), err) + require.Equal(t, status.Error(codes.InvalidArgument, "limit 20 exceeds max limit 15"), err) } else { require.NoError(t, err) - require.Equal(t, combiner.Count(), tc.expectedTraces) + require.Equal(t, tc.expectedTraces, len(distinctTraces)) } }) } @@ -487,6 +490,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -500,7 +504,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) { }, }, nil) - httpReq := httptest.NewRequest("GET", "/api/search?start=1&end=10000", nil) + httpReq := httptest.NewRequest("GET", "/api/search?start=1&end=10000&q={}", nil) httpResp := httptest.NewRecorder() ctx := user.InjectOrgID(httpReq.Context(), "foo") @@ -532,6 +536,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -547,7 +552,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) { // grpc srv := newMockStreamingServer[*tempopb.SearchResponse]("bar", nil) - grpcReq := &tempopb.SearchRequest{} + grpcReq := &tempopb.SearchRequest{Query: "{}"} err := f.streamingSearch(grpcReq, srv) require.Equal(t, tc.expectedErr, err) } @@ -718,7 +723,8 @@ func frontendWithSettings(t require.TestingT, next pipeline.RoundTripper, rdr te return &tempopb.SearchResponse{ Traces: []*tempopb.TraceSearchMetadata{ { - TraceID: "1", + TraceID: "1", + StartTimeUnixNano: math.MaxUint64, // forces GRPCDiff in the search combiner to return this trace b/c it's always after CompletedThroughSeconds }, }, Metrics: &tempopb.SearchMetrics{ @@ -777,6 +783,7 @@ func frontendWithSettings(t require.TestingT, next pipeline.RoundTripper, rdr te Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, diff --git a/modules/frontend/search_sharder.go b/modules/frontend/search_sharder.go index ddad508d30f..4f3d8cfff52 100644 --- a/modules/frontend/search_sharder.go +++ b/modules/frontend/search_sharder.go @@ -3,11 +3,12 @@ package frontend import ( "context" "fmt" + "math" "net/http" + "sort" "time" "github.com/go-kit/log" //nolint:all deprecated - "github.com/gogo/protobuf/jsonpb" "github.com/grafana/dskit/user" "github.com/segmentio/fasthash/fnv1a" @@ -24,6 +25,7 @@ import ( const ( defaultTargetBytesPerRequest = 100 * 1024 * 1024 defaultConcurrentRequests = 1000 + defaultMostRecentShards = 200 ) type SearchSharderConfig struct { @@ -35,6 +37,7 @@ type SearchSharderConfig struct { QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"` QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"` IngesterShards int `yaml:"ingester_shards,omitempty"` + MostRecentShards int `yaml:"most_recent_shards,omitempty"` } type asyncSearchSharder struct { @@ -91,72 +94,49 @@ func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipelin return pipeline.NewBadRequest(fmt.Errorf("range specified by start and end exceeds %s. received start=%d end=%d", maxDuration, searchReq.Start, searchReq.End)), nil } - // buffer of shards+1 allows us to insert ingestReq and metrics + // buffer allows us to insert any ingester reqs non-blocking reqCh := make(chan pipeline.Request, s.cfg.IngesterShards+1) // build request to search ingesters based on query_ingesters_until config and time range // pass subCtx in requests so we can cancel and exit early - err = s.ingesterRequests(tenantID, pipelineRequest, *searchReq, reqCh) + jobMetrics, err := s.ingesterRequests(tenantID, pipelineRequest, *searchReq, reqCh) if err != nil { return nil, err } - // Check the number of requests that were were written to the request channel - // before we start reading them. - ingesterJobs := len(reqCh) - // pass subCtx in requests so we can cancel and exit early - totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, reqCh, func(err error) { + s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, jobMetrics, reqCh, func(err error) { // todo: actually find a way to return this error to the user s.logger.Log("msg", "search: failed to build backend requests", "err", err) }) - totalJobs += ingesterJobs - - // send a job to communicate the search metrics. this is consumed by the combiner to calculate totalblocks/bytes/jobs - var jobMetricsResponse pipeline.Responses[combiner.PipelineResponse] - if totalJobs > 0 { - resp := &tempopb.SearchResponse{ - Metrics: &tempopb.SearchMetrics{ - TotalBlocks: uint32(totalBlocks), - TotalBlockBytes: totalBlockBytes, - TotalJobs: uint32(totalJobs), - }, - } - - m := jsonpb.Marshaler{} - body, err := m.MarshalToString(resp) - if err != nil { - return nil, fmt.Errorf("failed to marshal search metrics: %w", err) - } - - jobMetricsResponse = pipeline.NewSuccessfulResponse(body) - } // execute requests - return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, reqCh, jobMetricsResponse, s.next), nil + return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, reqCh, pipeline.NewAsyncResponse(jobMetrics), s.next), nil } -// blockMetas returns all relevant blockMetas given a start/end -func (s *asyncSearchSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta { +func (s *asyncSearchSharder) blockMetas(start, end uint32, tenantID string) []*backend.BlockMeta { // reduce metas to those in the requested range - allMetas := s.reader.BlockMetas(tenantID) - metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck - for _, m := range allMetas { - if m.StartTime.Unix() <= end && - m.EndTime.Unix() >= start && + allBlocks := s.reader.BlockMetas(tenantID) + blocks := make([]*backend.BlockMeta, 0, len(allBlocks)/50) // divide by 50 for luck + for _, m := range allBlocks { + if m.StartTime.Unix() <= int64(end) && + m.EndTime.Unix() >= int64(start) && m.ReplicationFactor == backend.DefaultReplicationFactor { // This check skips generator blocks (RF=1) - metas = append(metas, m) + blocks = append(blocks, m) } } - return metas + // search backwards in time + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].EndTime.After(blocks[j].EndTime) + }) + + return blocks } // backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it. // it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs -func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, reqCh chan<- pipeline.Request, errFn func(error)) (totalJobs, totalBlocks int, totalBlockBytes uint64) { - var blocks []*backend.BlockMeta - +func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, resp *combiner.SearchJobResponse, reqCh chan<- pipeline.Request, errFn func(error)) { // request without start or end, search only in ingester if searchReq.Start == 0 || searchReq.End == 0 { close(reqCh) @@ -172,45 +152,53 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin return } - // get block metadata of blocks in start, end duration - blocks = s.blockMetas(int64(start), int64(end), tenantID) - - targetBytesPerRequest := s.cfg.TargetBytesPerRequest + blocks := s.blockMetas(start, end, tenantID) // calculate metrics to return to the caller - totalBlocks = len(blocks) - for _, b := range blocks { - p := pagesPerRequest(b, targetBytesPerRequest) + resp.TotalBlocks = len(blocks) - totalJobs += int(b.TotalRecords) / p - if int(b.TotalRecords)%p != 0 { - totalJobs++ - } - totalBlockBytes += b.Size_ - } + blockIter := backendJobsFunc(blocks, s.cfg.TargetBytesPerRequest, s.cfg.MostRecentShards, searchReq.End) + blockIter(func(jobs int, sz uint64, completedThroughTime uint32) { + resp.TotalJobs += jobs + resp.TotalBytes += sz + + resp.Shards = append(resp.Shards, combiner.SearchShards{ + TotalJobs: uint32(jobs), + CompletedThroughSeconds: completedThroughTime, + }) + }, nil) go func() { - buildBackendRequests(ctx, tenantID, parent, searchReq, blocks, targetBytesPerRequest, reqCh, errFn) + buildBackendRequests(ctx, tenantID, parent, searchReq, blockIter, reqCh, errFn) }() - - return } // ingesterRequest returns a new start and end time range for the backend as well as an http request // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from // unexpectedly changing the passed searchReq. -func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) error { +func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) (*combiner.SearchJobResponse, error) { + resp := &combiner.SearchJobResponse{ + Shards: make([]combiner.SearchShards, 0, s.cfg.MostRecentShards+1), // +1 for the ingester shard + } + // request without start or end, search only in ingester if searchReq.Start == 0 || searchReq.End == 0 { - return buildIngesterRequest(tenantID, parent, &searchReq, reqCh) + // one shard that covers all time + resp.TotalJobs = 1 + resp.Shards = append(resp.Shards, combiner.SearchShards{ + TotalJobs: 1, + CompletedThroughSeconds: 1, + }) + + return resp, buildIngesterRequest(tenantID, parent, &searchReq, reqCh) } ingesterUntil := uint32(time.Now().Add(-s.cfg.QueryIngestersUntil).Unix()) // if there's no overlap between the query and ingester range just return nil if searchReq.End < ingesterUntil { - return nil + return resp, nil } ingesterStart := searchReq.Start @@ -223,7 +211,7 @@ func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.R // if ingester start == ingester end then we don't need to query it if ingesterStart == ingesterEnd { - return nil + return resp, nil } searchReq.Start = ingesterStart @@ -260,11 +248,20 @@ func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.R err := buildIngesterRequest(tenantID, parent, &subReq, reqCh) if err != nil { - return err + return nil, err } } - return nil + // add one shard that covers no time at all. this will force the combiner to wait + // for ingester requests to complete before moving on to the backend requests + ingesterJobs := len(reqCh) + resp.TotalJobs = ingesterJobs + resp.Shards = append(resp.Shards, combiner.SearchShards{ + TotalJobs: uint32(ingesterJobs), + CompletedThroughSeconds: math.MaxUint32, + }) + + return resp, nil } // maxDuration returns the max search duration allowed for this tenant. @@ -298,60 +295,54 @@ func backendRange(start, end uint32, queryBackendAfter time.Duration) (uint32, u // buildBackendRequests returns a slice of requests that cover all blocks in the store // that are covered by start/end. -func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- pipeline.Request, errFn func(error)) { +func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, blockIter func(shardIterFn, jobIterFn), reqCh chan<- pipeline.Request, errFn func(error)) { defer close(reqCh) queryHash := hashForSearchRequest(searchReq) colsToJSON := api.NewDedicatedColumnsToJSON() - for _, m := range metas { - pages := pagesPerRequest(m, bytesPerRequest) - if pages == 0 { - continue - } - + blockIter(nil, func(m *backend.BlockMeta, shard, startPage, pages int) { blockID := m.BlockID.String() dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) if err != nil { errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) - continue + return } - for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { - pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { - r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{ - BlockID: blockID, - StartPage: uint32(startPage), - PagesToSearch: uint32(pages), - Encoding: m.Encoding.String(), - IndexPageSize: m.IndexPageSize, - TotalRecords: m.TotalRecords, - DataEncoding: m.DataEncoding, - Version: m.Version, - Size_: m.Size_, - FooterSize: m.FooterSize, - // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json - }, dedColsJSON) - - return r, err - }) - if err != nil { - errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err)) - continue - } + pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { + r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{ + BlockID: blockID, + StartPage: uint32(startPage), + PagesToSearch: uint32(pages), + Encoding: m.Encoding.String(), + IndexPageSize: m.IndexPageSize, + TotalRecords: m.TotalRecords, + DataEncoding: m.DataEncoding, + Version: m.Version, + Size_: m.Size_, + FooterSize: m.FooterSize, + // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json + }, dedColsJSON) + + return r, err + }) + if err != nil { + errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err)) + return + } - key := searchJobCacheKey(tenantID, queryHash, int64(searchReq.Start), int64(searchReq.End), m, startPage, pages) - pipelineR.SetCacheKey(key) + key := searchJobCacheKey(tenantID, queryHash, int64(searchReq.Start), int64(searchReq.End), m, startPage, pages) + pipelineR.SetCacheKey(key) + pipelineR.SetResponseData(shard) - select { - case reqCh <- pipelineR: - case <-ctx.Done(): - // ignore the error if there is one. it will be handled elsewhere - return - } + select { + case reqCh <- pipelineR: + case <-ctx.Done(): + // ignore the error if there is one. it will be handled elsewhere + return } - } + }) } // hashForSearchRequest returns a uint64 hash of the query. if the query is invalid it returns a 0 hash. @@ -406,6 +397,78 @@ func buildIngesterRequest(tenantID string, parent pipeline.Request, searchReq *t return err } + subR.SetResponseData(0) // ingester requests are always shard 0 reqCh <- subR + return nil } + +type ( + shardIterFn func(jobs int, sz uint64, completedThroughTime uint32) + jobIterFn func(m *backend.BlockMeta, shard, startPage, pages int) +) + +// backendJobsFunc provides an iter func with 2 callbacks designed to be used once to calculate job and shard metrics and a second time +// to generate actual jobs. +func backendJobsFunc(blocks []*backend.BlockMeta, targetBytesPerRequest int, maxShards int, end uint32) func(shardIterFn, jobIterFn) { + blocksPerShard := len(blocks) / maxShards + + // if we have fewer blocks than shards then every shard is one block + if blocksPerShard == 0 { + blocksPerShard = 1 + } + + return func(shardIterCallback shardIterFn, jobIterCallback jobIterFn) { + currentShard := 0 + jobsInShard := 0 + bytesInShard := uint64(0) + blocksInShard := 0 + + for _, b := range blocks { + pages := pagesPerRequest(b, targetBytesPerRequest) + jobsInBlock := 0 + + if pages == 0 { + continue + } + + // if jobIterCallBack is nil we can skip the loop and directly calc the jobsInBlock + if jobIterCallback == nil { + jobsInBlock = int(b.TotalRecords) / pages + if int(b.TotalRecords)%pages != 0 { + jobsInBlock++ + } + } else { + for startPage := 0; startPage < int(b.TotalRecords); startPage += pages { + jobIterCallback(b, currentShard, startPage, pages) + jobsInBlock++ + } + } + + // do we need to roll to a new shard? + jobsInShard += jobsInBlock + bytesInShard += b.Size_ + blocksInShard++ + + // -1 b/c we will likely add a final shard below + // end comparison b/c there's no point in ending a shard that can't release any results + if blocksInShard >= blocksPerShard && currentShard < maxShards-1 && b.EndTime.Unix() < int64(end) { + if shardIterCallback != nil { + shardIterCallback(jobsInShard, bytesInShard, uint32(b.EndTime.Unix())) + } + currentShard++ + + jobsInShard = 0 + bytesInShard = 0 + blocksInShard = 0 + } + } + + // final shard - note that we are overpacking the final shard due to the integer math as well as the limit of 200 shards total. if the search + // this is the least impactful shard to place extra jobs in as it is searched last. if we make it here the chances of this being an exhaustive search + // are higher + if shardIterCallback != nil && jobsInShard > 0 { + shardIterCallback(jobsInShard, bytesInShard, 1) // final shard can cover all time. we don't need to be precise + } + } +} diff --git a/modules/frontend/search_sharder_test.go b/modules/frontend/search_sharder_test.go index 3a8730d083c..a0951ed7852 100644 --- a/modules/frontend/search_sharder_test.go +++ b/modules/frontend/search_sharder_test.go @@ -1,13 +1,14 @@ package frontend import ( - "bytes" "context" "fmt" "io" + "math" "net/http" "net/http/httptest" "net/url" + "sort" "strconv" "strings" "testing" @@ -224,9 +225,10 @@ func TestBuildBackendRequests(t *testing.T) { ctx, cancelCause := context.WithCancelCause(context.Background()) reqCh := make(chan pipeline.Request) + iterFn := backendJobsFunc(tc.metas, tc.targetBytesPerRequest, defaultMostRecentShards, math.MaxUint32) go func() { - buildBackendRequests(ctx, "test", pipeline.NewHTTPRequest(req), searchReq, tc.metas, tc.targetBytesPerRequest, reqCh, cancelCause) + buildBackendRequests(ctx, "test", pipeline.NewHTTPRequest(req), searchReq, iterFn, reqCh, cancelCause) }() actualURIs := []string{} @@ -250,7 +252,9 @@ func TestBackendRequests(t *testing.T) { bm.TotalRecords = 2 s := &asyncSearchSharder{ - cfg: SearchSharderConfig{}, + cfg: SearchSharderConfig{ + MostRecentShards: defaultMostRecentShards, + }, reader: &mockReader{metas: []*backend.BlockMeta{bm}}, } @@ -317,10 +321,12 @@ func TestBackendRequests(t *testing.T) { ctx, cancelCause := context.WithCancelCause(context.Background()) pipelineRequest := pipeline.NewHTTPRequest(r) - jobs, blocks, blockBytes := s.backendRequests(ctx, "test", pipelineRequest, searchReq, reqCh, cancelCause) - require.Equal(t, tc.expectedJobs, jobs) - require.Equal(t, tc.expectedBlocks, blocks) - require.Equal(t, tc.expectedBlockBytes, blockBytes) + + searchJobResponse := &combiner.SearchJobResponse{} + s.backendRequests(ctx, "test", pipelineRequest, searchReq, searchJobResponse, reqCh, cancelCause) + require.Equal(t, tc.expectedJobs, searchJobResponse.TotalJobs) + require.Equal(t, tc.expectedBlocks, searchJobResponse.TotalBlocks) + require.Equal(t, tc.expectedBlockBytes, searchJobResponse.TotalBytes) actualReqURIs := []string{} for r := range reqCh { @@ -494,13 +500,24 @@ func TestIngesterRequests(t *testing.T) { pr := pipeline.NewHTTPRequest(req) pr.SetWeight(2) - err = s.ingesterRequests("test", pr, *searchReq, reqChan) + actualSearchResponse, err := s.ingesterRequests("test", pr, *searchReq, reqChan) if tc.expectedError != nil { - assert.Equal(t, tc.expectedError, err) + require.Equal(t, tc.expectedError, err) continue } - assert.NoError(t, err) - assert.Equal(t, len(tc.expectedURI), len(reqChan)) + require.NoError(t, err) + require.Equal(t, len(tc.expectedURI), len(reqChan)) + require.Equal(t, len(tc.expectedURI), actualSearchResponse.TotalJobs) + if len(tc.expectedURI) > 0 { + require.Equal(t, len(tc.expectedURI), int(actualSearchResponse.Shards[0].TotalJobs)) + expectedCompletedThrough := math.MaxUint32 // normal ingester shard completes no time on purpose + if searchReq.Start == 0 && searchReq.End == 0 { // ingester only search completes all time on purpose + expectedCompletedThrough = 1 + } + require.Equal(t, expectedCompletedThrough, int(actualSearchResponse.Shards[0].CompletedThroughSeconds)) + } else { + require.Equal(t, 0, len(actualSearchResponse.Shards)) + } // drain the channel and check the URIs for _, expectedURI := range tc.expectedURI { @@ -667,6 +684,7 @@ func TestTotalJobsIncludesIngester(t *testing.T) { QueryIngestersUntil: 15 * time.Minute, ConcurrentRequests: 1, // 1 concurrent request to force order TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, IngesterShards: 1, }, log.NewNopLogger()) testRT := sharder.Wrap(next) @@ -680,29 +698,24 @@ func TestTotalJobsIncludesIngester(t *testing.T) { resps, err := testRT.RoundTrip(pipeline.NewHTTPRequest(req)) require.NoError(t, err) // find a response with total jobs > . this is the metadata response - var resp *tempopb.SearchResponse + + totalJobs := 0 for { res, done, err := resps.Next(context.Background()) - r := res.HTTPResponse() - require.NoError(t, err) - require.Equal(t, 200, r.StatusCode) - actualResp := &tempopb.SearchResponse{} - bytesResp, err := io.ReadAll(r.Body) - require.NoError(t, err) - err = jsonpb.Unmarshal(bytes.NewReader(bytesResp), actualResp) - require.NoError(t, err) + if res.IsMetadata() { + searchJobResponse := res.(*combiner.SearchJobResponse) + totalJobs += searchJobResponse.TotalJobs - if actualResp.Metrics.TotalJobs > 0 { - resp = actualResp break } + require.NoError(t, err) require.False(t, done) } - // 2 jobs for the meta + 1 for th ingester - assert.Equal(t, uint32(3), resp.Metrics.TotalJobs) + // 2 jobs for the meta + 1 for the ingester + assert.Equal(t, 3, totalJobs) } func TestSearchSharderRoundTripBadRequest(t *testing.T) { @@ -716,6 +729,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) { sharder := newAsyncSearchSharder(&mockReader{}, o, SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, MaxDuration: 5 * time.Minute, }, log.NewNopLogger()) testRT := sharder.Wrap(next) @@ -749,6 +763,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) { sharder = newAsyncSearchSharder(&mockReader{}, o, SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, MaxDuration: 5 * time.Minute, }, log.NewNopLogger()) testRT = sharder.Wrap(next) @@ -863,6 +878,126 @@ func TestHashTraceQLQuery(t *testing.T) { require.NotEqual(t, h1, h2) } +func TestBackendShards(t *testing.T) { + tcs := []struct { + name string + maxShards int + searchEnd uint32 + expected []combiner.SearchShards + }{ + { + name: "1 shard, puts all jobs in one shard", + maxShards: 1, + searchEnd: 50, + expected: []combiner.SearchShards{ + {TotalJobs: 8, CompletedThroughSeconds: 1}, + }, + }, + { + name: "2 shards, split evenly between", + maxShards: 2, + searchEnd: 50, + expected: []combiner.SearchShards{ + {TotalJobs: 4, CompletedThroughSeconds: 30}, + {TotalJobs: 4, CompletedThroughSeconds: 1}, + }, + }, + { + name: "3 shards, one for each block", + maxShards: 3, + searchEnd: 50, + expected: []combiner.SearchShards{ + {TotalJobs: 2, CompletedThroughSeconds: 40}, + {TotalJobs: 2, CompletedThroughSeconds: 30}, + {TotalJobs: 4, CompletedThroughSeconds: 1}, + }, + }, + { + name: "4 shards, one for each block", + maxShards: 4, + searchEnd: 50, + expected: []combiner.SearchShards{ + {TotalJobs: 2, CompletedThroughSeconds: 40}, + {TotalJobs: 2, CompletedThroughSeconds: 30}, + {TotalJobs: 2, CompletedThroughSeconds: 20}, + {TotalJobs: 2, CompletedThroughSeconds: 1}, + }, + }, + { + name: "5 shards, one for each block", + maxShards: 5, + searchEnd: 50, + expected: []combiner.SearchShards{ + {TotalJobs: 2, CompletedThroughSeconds: 40}, + {TotalJobs: 2, CompletedThroughSeconds: 30}, + {TotalJobs: 2, CompletedThroughSeconds: 20}, + {TotalJobs: 2, CompletedThroughSeconds: 10}, + }, + }, + { + name: "4 shards, search end forces 2 blocks in the first shard", + maxShards: 4, + searchEnd: 35, + expected: []combiner.SearchShards{ + {TotalJobs: 4, CompletedThroughSeconds: 30}, + {TotalJobs: 2, CompletedThroughSeconds: 20}, + {TotalJobs: 2, CompletedThroughSeconds: 10}, + }, + }, + { + name: "4 shards, search end forces 3 blocks in the first shard", + maxShards: 4, + searchEnd: 25, + expected: []combiner.SearchShards{ + {TotalJobs: 6, CompletedThroughSeconds: 20}, + {TotalJobs: 2, CompletedThroughSeconds: 10}, + }, + }, + { + name: "2 shards, search end forces 2 blocks in the first shard", + maxShards: 2, + searchEnd: 35, + expected: []combiner.SearchShards{ + {TotalJobs: 4, CompletedThroughSeconds: 30}, + {TotalJobs: 4, CompletedThroughSeconds: 1}, + }, + }, + } + + // create 4 metas with 2 records each for all the above test cases to use. 8 jobs total + metas := make([]*backend.BlockMeta, 0, 4) + for i := 0; i < 4; i++ { + metas = append(metas, &backend.BlockMeta{ + StartTime: time.Unix(int64(i*10), 0), // block 0 starts at 0 + EndTime: time.Unix(int64(i*10)+10, 0), // block 0 ends a 10 + Size_: defaultTargetBytesPerRequest * 2, // 2 jobs per block + TotalRecords: 2, + BlockID: backend.MustParse("00000000-0000-0000-0000-000000000000"), + }) + } + + // sort + sort.Slice(metas, func(i, j int) bool { + return metas[i].EndTime.After(metas[j].EndTime) + }) + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + fn := backendJobsFunc(metas, defaultTargetBytesPerRequest, tc.maxShards, tc.searchEnd) + actualShards := []combiner.SearchShards{} + + fn(func(jobs int, _ uint64, completedThroughTime uint32) { + actualShards = append(actualShards, combiner.SearchShards{ + TotalJobs: uint32(jobs), + CompletedThroughSeconds: completedThroughTime, + }) + }, nil) + + assert.Equal(t, tc.expected, actualShards) + }) + } +} + func urisEqual(t *testing.T, expectedURIs, actualURIs []string) { require.Equal(t, len(expectedURIs), len(actualURIs)) diff --git a/modules/frontend/tag_handlers_test.go b/modules/frontend/tag_handlers_test.go index 23f46147f83..4054c0619f1 100644 --- a/modules/frontend/tag_handlers_test.go +++ b/modules/frontend/tag_handlers_test.go @@ -16,6 +16,7 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" "github.com/gogo/status" + "github.com/gorilla/mux" "github.com/grafana/dskit/user" "github.com/grafana/tempo/pkg/cache" "github.com/grafana/tempo/pkg/tempopb" @@ -49,7 +50,7 @@ func runnerTagsBadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/search/tags", nil) httpResp := httptest.NewRecorder() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsHandler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) @@ -63,7 +64,7 @@ func runnerTagsV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/v2/search/tags", nil) httpResp := httptest.NewRecorder() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) @@ -76,8 +77,9 @@ func runnerTagsV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) { func runnerTagValuesBadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/search/tag/foo/values", nil) + httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"}) httpResp := httptest.NewRecorder() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsValuesHandler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) @@ -90,8 +92,9 @@ func runnerTagValuesBadRequestOnOrgID(t *testing.T, f *QueryFrontend) { func runnerTagValuesV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values", nil) + httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"}) httpResp := httptest.NewRecorder() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, "no org id", httpResp.Body.String()) require.Equal(t, http.StatusBadRequest, httpResp.Code) @@ -115,7 +118,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) { cancel() }() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, "context canceled", httpResp.Body.String()) require.Equal(t, 499, httpResp.Code) // todo: is this 499 valid? @@ -134,6 +137,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) { func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) { // http httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values", nil) + httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"}) httpResp := httptest.NewRecorder() ctx, cancel := context.WithCancel(httpReq.Context()) @@ -145,7 +149,7 @@ func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) { cancel() }() - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, "context canceled", httpResp.Body.String()) require.Equal(t, 499, httpResp.Code) // todo: is this 499 valid? @@ -229,6 +233,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -248,7 +253,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) { ctx := user.InjectOrgID(httpReq.Context(), "foo") httpReq = httpReq.WithContext(ctx) - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, tc.expectedMessage, httpResp.Body.String()) require.Equal(t, tc.expectedCode, httpResp.Code) @@ -274,6 +279,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -360,6 +366,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -373,13 +380,14 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) { }, }, nil) - httpReq := httptest.NewRequest("GET", "/api/v2/search/tags?start=1&end=10000", nil) + httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values?start=1&end=10000", nil) + httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"}) httpResp := httptest.NewRecorder() ctx := user.InjectOrgID(httpReq.Context(), "foo") httpReq = httpReq.WithContext(ctx) - f.SearchHandler.ServeHTTP(httpResp, httpReq) + f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq) require.Equal(t, tc.expectedMessage, httpResp.Body.String()) require.Equal(t, tc.expectedCode, httpResp.Code) @@ -405,6 +413,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, diff --git a/modules/frontend/tag_sharder_test.go b/modules/frontend/tag_sharder_test.go index e47fa87f35c..251a137f7fd 100644 --- a/modules/frontend/tag_sharder_test.go +++ b/modules/frontend/tag_sharder_test.go @@ -294,6 +294,7 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) { sharder := newAsyncTagSharder(&mockReader{}, o, SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, MaxDuration: 5 * time.Minute, }, parseTagsRequest, log.NewNopLogger()) testRT := sharder.Wrap(next) @@ -328,6 +329,7 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) { sharder = newAsyncTagSharder(&mockReader{}, o, SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, MaxDuration: 5 * time.Minute, }, parseTagsRequest, log.NewNopLogger()) testRT = sharder.Wrap(next) diff --git a/modules/frontend/traceid_handlers_test.go b/modules/frontend/traceid_handlers_test.go index 69e16abe036..a1d773769de 100644 --- a/modules/frontend/traceid_handlers_test.go +++ b/modules/frontend/traceid_handlers_test.go @@ -33,6 +33,7 @@ var config = &Config{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 6aafa5c5c9c..ff986f55499 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -235,12 +235,10 @@ func TestSearchWAL(t *testing.T) { // search WAL ctx := user.InjectOrgID(context.Background(), "test") - searchReq := &tempopb.SearchRequest{Tags: map[string]string{ - "foo": "bar", - }} + searchReq := &tempopb.SearchRequest{Query: "{ }"} results, err := inst.Search(ctx, searchReq) require.NoError(t, err) - require.Equal(t, uint32(1), results.Metrics.InspectedTraces) + require.Equal(t, 1, len(results.Traces)) // Shutdown require.NoError(t, i.stopping(nil)) @@ -256,7 +254,7 @@ func TestSearchWAL(t *testing.T) { results, err = inst.Search(ctx, searchReq) require.NoError(t, err) - require.Equal(t, uint32(1), results.Metrics.InspectedTraces) + require.Equal(t, 1, len(results.Traces)) } // TODO - This test is flaky and commented out until it's fixed diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 2de7b25f4a6..a870ead5a30 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -7,7 +7,6 @@ import ( "sync" "github.com/gogo/protobuf/proto" - "github.com/google/uuid" "github.com/segmentio/fasthash/fnv1a" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -41,24 +40,42 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem span.AddEvent("SearchRequest", trace.WithAttributes(attribute.String("request", req.String()))) + mostRecent := false + if len(req.Query) > 0 { + rootExpr, err := traceql.Parse(req.Query) + if err != nil { + return nil, fmt.Errorf("error parsing query: %w", err) + } + + ok := false + if mostRecent, ok = rootExpr.Hints.GetBool(traceql.HintMostRecent, false); !ok { + mostRecent = false + } + } + var ( resultsMtx = sync.Mutex{} - combiner = traceql.NewMetadataCombiner() + combiner = traceql.NewMetadataCombiner(maxResults, mostRecent) metrics = &tempopb.SearchMetrics{} opts = common.DefaultSearchOptions() anyErr atomic.Error ) - search := func(blockID uuid.UUID, block common.Searcher, spanName string) { + search := func(blockMeta *backend.BlockMeta, block common.Searcher, spanName string) { ctx, span := tracer.Start(ctx, "instance.searchBlock."+spanName) defer span.End() span.AddEvent("block entry mtx acquired") - span.SetAttributes(attribute.String("blockID", blockID.String())) + span.SetAttributes(attribute.String("blockID", blockMeta.BlockID.String())) var resp *tempopb.SearchResponse var err error + // if the combiner is complete for the block's end time, we can skip searching it + if combiner.IsCompleteFor(uint32(blockMeta.EndTime.Unix())) { + return + } + if api.IsTraceQLQuery(req) { // note: we are creating new engine for each wal block, // and engine.ExecuteSearch is parsing the query for each block @@ -70,7 +87,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem } if errors.Is(err, common.ErrUnsupported) { - level.Warn(log.Logger).Log("msg", "block does not support search", "blockID", blockID) + level.Warn(log.Logger).Log("msg", "block does not support search", "blockID", blockMeta.BlockID) return } if errors.Is(err, context.Canceled) { @@ -78,7 +95,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem return } if err != nil { - level.Error(log.Logger).Log("msg", "error searching block", "blockID", blockID, "err", err) + level.Error(log.Logger).Log("msg", "error searching block", "blockID", blockMeta.BlockID, "err", err) anyErr.Store(err) return } @@ -95,14 +112,9 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem metrics.InspectedBytes += resp.Metrics.InspectedBytes } - if combiner.Count() >= maxResults { - return - } - for _, tr := range resp.Traces { combiner.AddMetadata(tr) - if combiner.Count() >= maxResults { - // Cancel all other tasks + if combiner.IsCompleteFor(traceql.TimestampNever) { cancel() return } @@ -119,18 +131,12 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem i.headBlockMtx.RLock() span.AddEvent("acquired headblock mtx") if includeBlock(i.headBlock.BlockMeta(), req) { - search((uuid.UUID)(i.headBlock.BlockMeta().BlockID), i.headBlock, "headBlock") + search(i.headBlock.BlockMeta(), i.headBlock, "headBlock") } i.headBlockMtx.RUnlock() if err := anyErr.Load(); err != nil { return nil, err } - if combiner.Count() >= maxResults { - return &tempopb.SearchResponse{ - Traces: combiner.Metadata(), - Metrics: metrics, - }, nil - } // Search all other blocks (concurrently) // Lock blocks mutex until all search tasks are finished and this function exits. This avoids @@ -150,7 +156,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem wg.Add(1) go func(b common.WALBlock) { defer wg.Done() - search((uuid.UUID)(b.BlockMeta().BlockID), b, "completingBlock") + search(b.BlockMeta(), b, "completingBlock") }(b) } @@ -161,7 +167,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem wg.Add(1) go func(b *LocalBlock) { defer wg.Done() - search((uuid.UUID)(b.BlockMeta().BlockID), b, "completeBlock") + search(b.BlockMeta(), b, "completeBlock") }(b) } diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index 84a20a920d2..7d5dfe18578 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -43,9 +43,8 @@ func TestInstanceSearch(t *testing.T) { ids, _, _, _ := writeTracesForSearch(t, i, "", tagKey, tagValue, false, false) req := &tempopb.SearchRequest{ - Tags: map[string]string{}, + Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue), } - req.Tags[tagKey] = tagValue req.Limit = uint32(len(ids)) + 1 // Test after appending to WAL. writeTracesforSearch() makes sure all traces are in the wal @@ -188,27 +187,23 @@ func TestInstanceSearchWithStartAndEnd(t *testing.T) { searchAndAssert := func(req *tempopb.SearchRequest, inspectedTraces uint32) { sr := search(req, 0, 0) assert.Len(t, sr.Traces, len(ids)) - assert.Equal(t, sr.Metrics.InspectedTraces, inspectedTraces) checkEqual(t, ids, sr) // writeTracesForSearch will build spans that end 1 second from now // query 2 min range to have extra slack and always be within range sr = search(req, uint32(time.Now().Add(-5*time.Minute).Unix()), uint32(time.Now().Add(5*time.Minute).Unix())) assert.Len(t, sr.Traces, len(ids)) - assert.Equal(t, sr.Metrics.InspectedTraces, inspectedTraces) checkEqual(t, ids, sr) // search with start=5m from now, end=10m from now sr = search(req, uint32(time.Now().Add(5*time.Minute).Unix()), uint32(time.Now().Add(10*time.Minute).Unix())) // no results and should inspect 100 traces in wal assert.Len(t, sr.Traces, 0) - assert.Equal(t, uint32(0), sr.Metrics.InspectedTraces) } req := &tempopb.SearchRequest{ - Tags: map[string]string{}, + Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue), } - req.Tags[tagKey] = tagValue req.Limit = uint32(len(ids)) + 1 // Test after appending to WAL. @@ -612,7 +607,7 @@ func TestInstanceSearchNoData(t *testing.T) { i, _ := defaultInstance(t) req := &tempopb.SearchRequest{ - Tags: map[string]string{}, + Query: "{}", } sr, err := i.Search(context.Background(), req) @@ -634,7 +629,7 @@ func TestInstanceSearchDoesNotRace(t *testing.T) { tagValue := "bar" req := &tempopb.SearchRequest{ - Tags: map[string]string{tagKey: tagValue}, + Query: fmt.Sprintf(`{ span.%s = "%s" }`, tagKey, tagValue), } end := make(chan struct{}) @@ -764,11 +759,7 @@ func TestWALBlockDeletedDuringSearch(t *testing.T) { go concurrent(func() { _, err := i.Search(context.Background(), &tempopb.SearchRequest{ - Tags: map[string]string{ - // Not present in the data, so it will be an exhaustive - // search - "wuv": "xyz", - }, + Query: `{ span.wuv = "xyz" }`, }) require.NoError(t, err) }) @@ -811,7 +802,7 @@ func TestInstanceSearchMetrics(t *testing.T) { search := func() *tempopb.SearchMetrics { sr, err := i.Search(context.Background(), &tempopb.SearchRequest{ - Tags: map[string]string{"foo": "bar"}, + Query: fmt.Sprintf(`{ span.%s = "%s" }`, "foo", "bar"), }) require.NoError(t, err) return sr.Metrics @@ -826,14 +817,12 @@ func TestInstanceSearchMetrics(t *testing.T) { err := i.CutCompleteTraces(0, true) require.NoError(t, err) m = search() - require.Equal(t, numTraces, m.InspectedTraces) require.Less(t, numBytes, m.InspectedBytes) // Test after cutting new headblock blockID, err := i.CutBlockIfReady(0, 0, true) require.NoError(t, err) m = search() - require.Equal(t, numTraces, m.InspectedTraces) require.Less(t, numBytes, m.InspectedBytes) // Test after completing a block @@ -842,7 +831,7 @@ func TestInstanceSearchMetrics(t *testing.T) { err = i.ClearCompletingBlock(blockID) require.NoError(t, err) m = search() - require.Equal(t, numTraces, m.InspectedTraces) + require.Less(t, numBytes, m.InspectedBytes) } func BenchmarkInstanceSearchUnderLoad(b *testing.B) { diff --git a/pkg/api/dedicated_columns_to_json.go b/pkg/api/dedicated_columns_to_json.go index 457ab161566..13d9b4ac257 100644 --- a/pkg/api/dedicated_columns_to_json.go +++ b/pkg/api/dedicated_columns_to_json.go @@ -23,6 +23,7 @@ func (d *DedicatedColumnsToJSON) JSONForDedicatedColumns(cols backend.DedicatedC } hash := cols.Hash() + if jsonString, ok := d.columnsToJSON[hash]; ok { return jsonString, nil } diff --git a/pkg/traceql/combine.go b/pkg/traceql/combine.go index 6976d0e7868..eda545df297 100644 --- a/pkg/traceql/combine.go +++ b/pkg/traceql/combine.go @@ -1,43 +1,96 @@ package traceql import ( + "math" + "slices" "sort" "strings" + "time" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util" ) -type MetadataCombiner struct { - trs map[string]*tempopb.TraceSearchMetadata +type MetadataCombiner interface { + AddMetadata(new *tempopb.TraceSearchMetadata) bool + IsCompleteFor(ts uint32) bool + + Metadata() []*tempopb.TraceSearchMetadata + MetadataAfter(ts uint32) []*tempopb.TraceSearchMetadata + + addSpanset(new *Spanset) +} + +const TimestampNever = uint32(math.MaxUint32) + +func NewMetadataCombiner(limit int, keepMostRecent bool) MetadataCombiner { + if keepMostRecent { + return newMostRecentCombiner(limit) + } + + return newAnyCombiner(limit) +} + +type anyCombiner struct { + trs map[string]*tempopb.TraceSearchMetadata + limit int +} + +func newAnyCombiner(limit int) *anyCombiner { + return &anyCombiner{ + trs: make(map[string]*tempopb.TraceSearchMetadata, limit), + limit: limit, + } } -func NewMetadataCombiner() *MetadataCombiner { - return &MetadataCombiner{ - trs: make(map[string]*tempopb.TraceSearchMetadata), +// addSpanset adds a new spanset to the combiner. It only performs the asTraceSearchMetadata +// conversion if the spanset will be added +func (c *anyCombiner) addSpanset(new *Spanset) { + // if it's already in the list, then we should add it + if _, ok := c.trs[util.TraceIDToHexString(new.TraceID)]; ok { + c.AddMetadata(asTraceSearchMetadata(new)) + return + } + + // if we don't have too many + if c.IsCompleteFor(0) { + return } + + c.AddMetadata(asTraceSearchMetadata(new)) } // AddMetadata adds the new metadata to the map. if it already exists // use CombineSearchResults to combine the two -func (c *MetadataCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) { +func (c *anyCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) bool { if existing, ok := c.trs[new.TraceID]; ok { combineSearchResults(existing, new) - return + return true + } + + // if we don't have too many + if c.IsCompleteFor(0) { + return false } c.trs[new.TraceID] = new + return true } -func (c *MetadataCombiner) Count() int { +func (c *anyCombiner) Count() int { return len(c.trs) } -func (c *MetadataCombiner) Exists(id string) bool { +func (c *anyCombiner) Exists(id string) bool { _, ok := c.trs[id] return ok } -func (c *MetadataCombiner) Metadata() []*tempopb.TraceSearchMetadata { +func (c *anyCombiner) IsCompleteFor(_ uint32) bool { + return c.Count() >= c.limit && c.limit > 0 +} + +func (c *anyCombiner) Metadata() []*tempopb.TraceSearchMetadata { m := make([]*tempopb.TraceSearchMetadata, 0, len(c.trs)) for _, tr := range c.trs { m = append(m, tr) @@ -48,6 +101,130 @@ func (c *MetadataCombiner) Metadata() []*tempopb.TraceSearchMetadata { return m } +// MetadataAfter returns all traces that started after the given time. anyCombiner has no concept of time so it just returns all traces +func (c *anyCombiner) MetadataAfter(_ uint32) []*tempopb.TraceSearchMetadata { + return c.Metadata() +} + +type mostRecentCombiner struct { + trs map[string]*tempopb.TraceSearchMetadata + trsSorted []*tempopb.TraceSearchMetadata + keepMostRecent int +} + +func newMostRecentCombiner(limit int) *mostRecentCombiner { + return &mostRecentCombiner{ + trs: make(map[string]*tempopb.TraceSearchMetadata, limit), + trsSorted: make([]*tempopb.TraceSearchMetadata, 0, limit), + keepMostRecent: limit, + } +} + +// addSpanset adds a new spanset to the combiner. It only performs the asTraceSearchMetadata +// conversion if the spanset will be added +func (c *mostRecentCombiner) addSpanset(new *Spanset) { + // if we're not configured to keep most recent then just add it + if c.keepMostRecent == 0 || c.Count() < c.keepMostRecent { + c.AddMetadata(asTraceSearchMetadata(new)) + return + } + + // else let's see if it's worth converting this to a metadata and adding it + // if it's already in the list, then we should add it + if _, ok := c.trs[util.TraceIDToHexString(new.TraceID)]; ok { + c.AddMetadata(asTraceSearchMetadata(new)) + return + } + + // if it's within range + if c.OldestTimestampNanos() <= new.StartTimeUnixNanos { + c.AddMetadata(asTraceSearchMetadata(new)) + return + } + + // this spanset is too old to bother converting and adding it +} + +// AddMetadata adds the new metadata to the map. if it already exists +// use CombineSearchResults to combine the two +func (c *mostRecentCombiner) AddMetadata(new *tempopb.TraceSearchMetadata) bool { + if existing, ok := c.trs[new.TraceID]; ok { + combineSearchResults(existing, new) + return true + } + + if c.Count() == c.keepMostRecent && c.keepMostRecent > 0 { + // if this is older than the oldest element, bail + if c.OldestTimestampNanos() > new.StartTimeUnixNano { + return false + } + + // otherwise remove the oldest element and we'll add the new one below + oldest := c.trsSorted[c.Count()-1] + delete(c.trs, oldest.TraceID) + c.trsSorted = c.trsSorted[:len(c.trsSorted)-1] + } + + // insert new in the right spot + c.trs[new.TraceID] = new + idx, _ := slices.BinarySearchFunc(c.trsSorted, new, func(a, b *tempopb.TraceSearchMetadata) int { + if a.StartTimeUnixNano > b.StartTimeUnixNano { + return -1 + } + return 1 + }) + c.trsSorted = slices.Insert(c.trsSorted, idx, new) + return true +} + +func (c *mostRecentCombiner) Count() int { + return len(c.trs) +} + +func (c *mostRecentCombiner) Exists(id string) bool { + _, ok := c.trs[id] + return ok +} + +// IsCompleteFor returns true if the combiner has reached the limit and all traces are after the given time +func (c *mostRecentCombiner) IsCompleteFor(ts uint32) bool { + if ts == TimestampNever { + return false + } + + if c.Count() < c.keepMostRecent { + return false + } + + return c.OldestTimestampNanos() > uint64(ts)*uint64(time.Second) +} + +func (c *mostRecentCombiner) Metadata() []*tempopb.TraceSearchMetadata { + return c.trsSorted +} + +// MetadataAfter returns all traces that started after the given time +func (c *mostRecentCombiner) MetadataAfter(afterSeconds uint32) []*tempopb.TraceSearchMetadata { + afterNanos := uint64(afterSeconds) * uint64(time.Second) + afterTraces := make([]*tempopb.TraceSearchMetadata, 0, len(c.trsSorted)) + + for _, tr := range c.trsSorted { + if tr.StartTimeUnixNano > afterNanos { + afterTraces = append(afterTraces, tr) + } + } + + return afterTraces +} + +func (c *mostRecentCombiner) OldestTimestampNanos() uint64 { + if len(c.trsSorted) == 0 { + return 0 + } + + return c.trsSorted[len(c.trsSorted)-1].StartTimeUnixNano +} + // combineSearchResults overlays the incoming search result with the existing result. This is required // for the following reason: a trace may be present in multiple blocks, or in partial segments // in live traces. The results should reflect elements of all segments. diff --git a/pkg/traceql/combine_test.go b/pkg/traceql/combine_test.go index a5503786024..809795b738d 100644 --- a/pkg/traceql/combine_test.go +++ b/pkg/traceql/combine_test.go @@ -2,6 +2,7 @@ package traceql import ( "fmt" + "math/rand/v2" "slices" "strings" "testing" @@ -9,6 +10,7 @@ import ( "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" + "github.com/grafana/tempo/pkg/util" "github.com/stretchr/testify/require" ) @@ -268,6 +270,55 @@ func TestCombineResults(t *testing.T) { } } +func TestCombinerKeepsMostRecent(t *testing.T) { + totalTraces := 10 + keepMostRecent := 5 + combiner := NewMetadataCombiner(keepMostRecent, true).(*mostRecentCombiner) + + // make traces + traces := make([]*Spanset, totalTraces) + for i := 0; i < totalTraces; i++ { + traceID, err := util.HexStringToTraceID(fmt.Sprintf("%d", i)) + require.NoError(t, err) + + traces[i] = &Spanset{ + TraceID: traceID, + StartTimeUnixNanos: uint64(i) * uint64(time.Second), + } + } + + // save off the most recent and reverse b/c the combiner returns most recent first + expected := make([]*tempopb.TraceSearchMetadata, 0, keepMostRecent) + for i := totalTraces - keepMostRecent; i < totalTraces; i++ { + expected = append(expected, asTraceSearchMetadata(traces[i])) + } + slices.Reverse(expected) + + rand.Shuffle(totalTraces, func(i, j int) { + traces[i], traces[j] = traces[j], traces[i] + }) + + // add to combiner + for i := 0; i < totalTraces; i++ { + combiner.addSpanset(traces[i]) + } + + // test that the most recent are kept + actual := combiner.Metadata() + require.Equal(t, expected, actual) + require.Equal(t, keepMostRecent, combiner.Count()) + require.Equal(t, expected[len(expected)-1].StartTimeUnixNano, combiner.OldestTimestampNanos()) + for _, tr := range expected { + require.True(t, combiner.Exists(tr.TraceID)) + } + + // test MetadataAfter. 10 traces are added with start times 0-9. We want to get all traces that started after 7 + afterSeconds := uint32(7) + expectedTracesCount := totalTraces - int(afterSeconds+1) + actualTraces := combiner.MetadataAfter(afterSeconds) + require.Equal(t, expectedTracesCount, len(actualTraces)) +} + // nolint:govet func TestQueryRangeCombinerDiffs(t *testing.T) { start := uint64(100 * time.Millisecond) diff --git a/pkg/traceql/engine.go b/pkg/traceql/engine.go index e3288d7e64b..a6570d92400 100644 --- a/pkg/traceql/engine.go +++ b/pkg/traceql/engine.go @@ -55,6 +55,11 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq return nil, err } + var mostRecent, ok bool + if mostRecent, ok = rootExpr.Hints.GetBool(HintMostRecent, false); !ok { + mostRecent = false + } + fetchSpansRequest.StartTimeUnixNanos = unixSecToNano(searchReq.Start) fetchSpansRequest.EndTimeUnixNanos = unixSecToNano(searchReq.End) @@ -111,7 +116,7 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq Traces: nil, Metrics: &tempopb.SearchMetrics{}, } - combiner := NewMetadataCombiner() + combiner := NewMetadataCombiner(int(searchReq.Limit), mostRecent) for { spanset, err := iterator.Next(ctx) if err != nil && !errors.Is(err, io.EOF) { @@ -121,9 +126,9 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq if spanset == nil { break } - combiner.AddMetadata(e.asTraceSearchMetadata(spanset)) - if combiner.Count() >= int(searchReq.Limit) && searchReq.Limit > 0 { + combiner.addSpanset(spanset) + if combiner.IsCompleteFor(TimestampNever) { break } } @@ -242,7 +247,7 @@ func (e *Engine) createAutocompleteRequest(tag Attribute, pipeline Pipeline) Fet return autocompleteReq } -func (e *Engine) asTraceSearchMetadata(spanset *Spanset) *tempopb.TraceSearchMetadata { +func asTraceSearchMetadata(spanset *Spanset) *tempopb.TraceSearchMetadata { metadata := &tempopb.TraceSearchMetadata{ TraceID: util.TraceIDToHexString(spanset.TraceID), RootServiceName: spanset.RootServiceName, diff --git a/pkg/traceql/engine_test.go b/pkg/traceql/engine_test.go index f7f0599b4e4..467e87fb292 100644 --- a/pkg/traceql/engine_test.go +++ b/pkg/traceql/engine_test.go @@ -253,9 +253,7 @@ func TestEngine_asTraceSearchMetadata(t *testing.T) { }, } - e := NewEngine() - - traceSearchMetadata := e.asTraceSearchMetadata(spanSet) + traceSearchMetadata := asTraceSearchMetadata(spanSet) expectedSpanset := &tempopb.SpanSet{ Matched: 2, diff --git a/pkg/traceql/enum_hints.go b/pkg/traceql/enum_hints.go index e95de1fda93..5e09ffd3b4b 100644 --- a/pkg/traceql/enum_hints.go +++ b/pkg/traceql/enum_hints.go @@ -12,11 +12,12 @@ const ( HintTimeOverlapCutoff = "time_overlap_cutoff" HintConcurrentBlocks = "concurrent_blocks" HintExemplars = "exemplars" + HintMostRecent = "most_recent" // traceql search hint to return most recent results ordered by time ) func isUnsafe(h string) bool { switch h { - case HintSample, HintExemplars: + case HintSample, HintExemplars, HintMostRecent: return false default: return true diff --git a/tempodb/blocklist/poller.go b/tempodb/blocklist/poller.go index e8571497510..03b62e28a45 100644 --- a/tempodb/blocklist/poller.go +++ b/tempodb/blocklist/poller.go @@ -6,7 +6,6 @@ import ( "fmt" "math/rand" "path" - "sort" "strconv" "sync" "time" @@ -372,14 +371,6 @@ func (p *Poller) pollTenantBlocks( newBlockList = append(newBlockList, newM...) newCompactedBlocklist = append(newCompactedBlocklist, newCm...) - sort.Slice(newBlockList, func(i, j int) bool { - return newBlockList[i].StartTime.Before(newBlockList[j].StartTime) - }) - - sort.Slice(newCompactedBlocklist, func(i, j int) bool { - return newCompactedBlocklist[i].StartTime.Before(newCompactedBlocklist[j].StartTime) - }) - return newBlockList, newCompactedBlocklist, nil } diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index d47374f9047..a11bf3c694e 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -240,24 +240,11 @@ func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expected assert.Equal(t, expectedID, (uuid.UUID)(blocklist[0].BlockID)) } - // confirm blocklists are in starttime ascending order - lastTime := time.Time{} - for _, b := range blocklist { - assert.True(t, lastTime.Before(b.StartTime) || lastTime.Equal(b.StartTime)) - lastTime = b.StartTime - } - compactedBlocklist := rw.blocklist.CompactedMetas(testTenantID) assert.Len(t, compactedBlocklist, expectedCB) if expectedCB > 0 && expectedID != uuid.Nil { assert.Equal(t, expectedID, (uuid.UUID)(compactedBlocklist[0].BlockID)) } - - lastTime = time.Time{} - for _, b := range compactedBlocklist { - assert.True(t, lastTime.Before(b.StartTime) || lastTime.Equal(b.StartTime)) - lastTime = b.StartTime - } } func TestIncludeBlock(t *testing.T) {