diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 3e48575f5e6..57d3a5e34fd 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -639,8 +639,12 @@ query_frontend: [throughput_bytes_slo: | default = 0 ] # The number of shards to break ingester queries into. - [ingester_shards]: | default = 1] - + [ingester_shards: | default = 1] + + # The number of time windows to break a search up into when doing a most recent TraceQL search. This only impacts TraceQL + # searches with (most_recent=true) + [most_recent_shards: | default = 200] + # SLO configuration for Metadata (tags and tag values) endpoints. metadata_slo: # If set to a non-zero value, it's value will be used to decide if metadata query is within SLO or not. diff --git a/modules/frontend/config.go b/modules/frontend/config.go index 0ba194df4b2..35e7c2b7a0a 100644 --- a/modules/frontend/config.go +++ b/modules/frontend/config.go @@ -79,6 +79,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) { ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, IngesterShards: 1, + MostRecentShards: defaultMostRecentShards, }, SLO: slo, } diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 9b6a472f6b7..7315849f264 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -78,6 +78,10 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t return nil, fmt.Errorf("query backend after should be less than or equal to query ingester until") } + if cfg.Search.Sharder.MostRecentShards <= 0 { + return nil, fmt.Errorf("most recent shards must be greater than 0") + } + if cfg.Metrics.Sharder.ConcurrentRequests <= 0 { return nil, fmt.Errorf("frontend metrics concurrent requests should be greater than 0") } diff --git a/modules/frontend/frontend_test.go b/modules/frontend/frontend_test.go index d21214af70e..7d0f23bf199 100644 --- a/modules/frontend/frontend_test.go +++ b/modules/frontend/frontend_test.go @@ -26,6 +26,7 @@ func TestFrontendTagSearchRequiresOrgID(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -69,12 +70,12 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, }, nil, nil, nil, nil, "", log.NewNopLogger(), nil) assert.EqualError(t, err, "frontend query shards should be between 2 and 100000 (both inclusive)") - assert.Nil(t, f) f, err = New(Config{ @@ -86,6 +87,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -102,6 +104,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: 0, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -118,6 +121,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: 0, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -134,6 +138,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, QueryIngestersUntil: time.Minute, QueryBackendAfter: time.Hour, }, @@ -151,6 +156,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -166,6 +172,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -188,6 +195,7 @@ func TestFrontendBadConfigFails(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -202,4 +210,29 @@ func TestFrontendBadConfigFails(t *testing.T) { }, nil, nil, nil, nil, "", log.NewNopLogger(), nil) assert.EqualError(t, err, "frontend metrics interval should be greater than 0") assert.Nil(t, f) + + f, err = New(Config{ + TraceByID: TraceByIDConfig{ + QueryShards: maxQueryShards, + }, + Search: SearchConfig{ + Sharder: SearchSharderConfig{ + ConcurrentRequests: defaultConcurrentRequests, + TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: 0, + }, + SLO: testSLOcfg, + }, + Metrics: MetricsConfig{ + Sharder: QueryRangeSharderConfig{ + ConcurrentRequests: defaultConcurrentRequests, + TargetBytesPerRequest: defaultTargetBytesPerRequest, + Interval: 5 * time.Minute, + }, + SLO: testSLOcfg, + }, + }, nil, nil, nil, nil, "", log.NewNopLogger(), nil) + assert.EqualError(t, err, "most recent shards must be greater than 0") + assert.Nil(t, f) + } diff --git a/modules/frontend/search_handlers_test.go b/modules/frontend/search_handlers_test.go index 81e0e0c82ce..7e808ba7a65 100644 --- a/modules/frontend/search_handlers_test.go +++ b/modules/frontend/search_handlers_test.go @@ -326,6 +326,7 @@ func TestSearchLimitHonored(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, DefaultLimit: 10, MaxLimit: 15, }, @@ -489,6 +490,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -534,6 +536,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -780,6 +783,7 @@ func frontendWithSettings(t require.TestingT, next pipeline.RoundTripper, rdr te Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, diff --git a/modules/frontend/search_sharder.go b/modules/frontend/search_sharder.go index 1bdb5c36e9a..4f3d8cfff52 100644 --- a/modules/frontend/search_sharder.go +++ b/modules/frontend/search_sharder.go @@ -25,8 +25,7 @@ import ( const ( defaultTargetBytesPerRequest = 100 * 1024 * 1024 defaultConcurrentRequests = 1000 - // every search is broken into the same number of search shards. these shards are used to stream partial results back that are guaranteed to be latest results - maxSearchShards = 200 + defaultMostRecentShards = 200 ) type SearchSharderConfig struct { @@ -38,6 +37,7 @@ type SearchSharderConfig struct { QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"` QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"` IngesterShards int `yaml:"ingester_shards,omitempty"` + MostRecentShards int `yaml:"most_recent_shards,omitempty"` } type asyncSearchSharder struct { @@ -157,7 +157,7 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin // calculate metrics to return to the caller resp.TotalBlocks = len(blocks) - blockIter := backendJobsFunc(blocks, s.cfg.TargetBytesPerRequest, maxSearchShards, searchReq.End) + blockIter := backendJobsFunc(blocks, s.cfg.TargetBytesPerRequest, s.cfg.MostRecentShards, searchReq.End) blockIter(func(jobs int, sz uint64, completedThroughTime uint32) { resp.TotalJobs += jobs resp.TotalBytes += sz @@ -179,7 +179,7 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin // unexpectedly changing the passed searchReq. func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) (*combiner.SearchJobResponse, error) { resp := &combiner.SearchJobResponse{ - Shards: make([]combiner.SearchShards, 0, maxSearchShards+1), // +1 for the ingester shard + Shards: make([]combiner.SearchShards, 0, s.cfg.MostRecentShards+1), // +1 for the ingester shard } // request without start or end, search only in ingester diff --git a/modules/frontend/search_sharder_test.go b/modules/frontend/search_sharder_test.go index 4f85ffe733e..a0951ed7852 100644 --- a/modules/frontend/search_sharder_test.go +++ b/modules/frontend/search_sharder_test.go @@ -225,7 +225,7 @@ func TestBuildBackendRequests(t *testing.T) { ctx, cancelCause := context.WithCancelCause(context.Background()) reqCh := make(chan pipeline.Request) - iterFn := backendJobsFunc(tc.metas, tc.targetBytesPerRequest, maxSearchShards, math.MaxUint32) + iterFn := backendJobsFunc(tc.metas, tc.targetBytesPerRequest, defaultMostRecentShards, math.MaxUint32) go func() { buildBackendRequests(ctx, "test", pipeline.NewHTTPRequest(req), searchReq, iterFn, reqCh, cancelCause) @@ -252,7 +252,9 @@ func TestBackendRequests(t *testing.T) { bm.TotalRecords = 2 s := &asyncSearchSharder{ - cfg: SearchSharderConfig{}, + cfg: SearchSharderConfig{ + MostRecentShards: defaultMostRecentShards, + }, reader: &mockReader{metas: []*backend.BlockMeta{bm}}, } @@ -682,6 +684,7 @@ func TestTotalJobsIncludesIngester(t *testing.T) { QueryIngestersUntil: 15 * time.Minute, ConcurrentRequests: 1, // 1 concurrent request to force order TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, IngesterShards: 1, }, log.NewNopLogger()) testRT := sharder.Wrap(next) @@ -726,6 +729,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) { sharder := newAsyncSearchSharder(&mockReader{}, o, SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, MaxDuration: 5 * time.Minute, }, log.NewNopLogger()) testRT := sharder.Wrap(next) @@ -759,6 +763,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) { sharder = newAsyncSearchSharder(&mockReader{}, o, SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, MaxDuration: 5 * time.Minute, }, log.NewNopLogger()) testRT = sharder.Wrap(next) diff --git a/modules/frontend/tag_handlers_test.go b/modules/frontend/tag_handlers_test.go index 84b67b3e54c..4054c0619f1 100644 --- a/modules/frontend/tag_handlers_test.go +++ b/modules/frontend/tag_handlers_test.go @@ -233,6 +233,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -278,6 +279,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -364,6 +366,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, @@ -410,6 +413,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) { Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, }, diff --git a/modules/frontend/tag_sharder_test.go b/modules/frontend/tag_sharder_test.go index e47fa87f35c..251a137f7fd 100644 --- a/modules/frontend/tag_sharder_test.go +++ b/modules/frontend/tag_sharder_test.go @@ -294,6 +294,7 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) { sharder := newAsyncTagSharder(&mockReader{}, o, SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, MaxDuration: 5 * time.Minute, }, parseTagsRequest, log.NewNopLogger()) testRT := sharder.Wrap(next) @@ -328,6 +329,7 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) { sharder = newAsyncTagSharder(&mockReader{}, o, SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, MaxDuration: 5 * time.Minute, }, parseTagsRequest, log.NewNopLogger()) testRT = sharder.Wrap(next) diff --git a/modules/frontend/traceid_handlers_test.go b/modules/frontend/traceid_handlers_test.go index 69e16abe036..a1d773769de 100644 --- a/modules/frontend/traceid_handlers_test.go +++ b/modules/frontend/traceid_handlers_test.go @@ -33,6 +33,7 @@ var config = &Config{ Sharder: SearchSharderConfig{ ConcurrentRequests: defaultConcurrentRequests, TargetBytesPerRequest: defaultTargetBytesPerRequest, + MostRecentShards: defaultMostRecentShards, }, SLO: testSLOcfg, },