Skip to content

Commit

Permalink
make shards configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott committed Nov 7, 2024
1 parent f09ea56 commit d5cc9f4
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 9 deletions.
8 changes: 6 additions & 2 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,12 @@ query_frontend:
[throughput_bytes_slo: <float> | default = 0 ]

# The number of shards to break ingester queries into.
[ingester_shards]: <int> | default = 1]

[ingester_shards: <int> | default = 1]

# The number of time windows to break a search up into when doing a most recent TraceQL search. This only impacts TraceQL
# searches with (most_recent=true)
[most_recent_shards: <int> | default = 200]

# SLO configuration for Metadata (tags and tag values) endpoints.
metadata_slo:
# If set to a non-zero value, it's value will be used to decide if metadata query is within SLO or not.
Expand Down
1 change: 1 addition & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
IngesterShards: 1,
MostRecentShards: defaultMostRecentShards,
},
SLO: slo,
}
Expand Down
4 changes: 4 additions & 0 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
return nil, fmt.Errorf("query backend after should be less than or equal to query ingester until")
}

if cfg.Search.Sharder.MostRecentShards <= 0 {
return nil, fmt.Errorf("most recent shards must be greater than 0")
}

if cfg.Metrics.Sharder.ConcurrentRequests <= 0 {
return nil, fmt.Errorf("frontend metrics concurrent requests should be greater than 0")
}
Expand Down
35 changes: 34 additions & 1 deletion modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestFrontendTagSearchRequiresOrgID(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down Expand Up @@ -69,12 +70,12 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 100000 (both inclusive)")

assert.Nil(t, f)

f, err = New(Config{
Expand All @@ -86,6 +87,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand All @@ -102,6 +104,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: 0,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand All @@ -118,6 +121,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: 0,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand All @@ -134,6 +138,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
QueryIngestersUntil: time.Minute,
QueryBackendAfter: time.Hour,
},
Expand All @@ -151,6 +156,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand All @@ -166,6 +172,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand All @@ -188,6 +195,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand All @@ -202,4 +210,29 @@ func TestFrontendBadConfigFails(t *testing.T) {
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend metrics interval should be greater than 0")
assert.Nil(t, f)

f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: 0,
},
SLO: testSLOcfg,
},
Metrics: MetricsConfig{
Sharder: QueryRangeSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
Interval: 5 * time.Minute,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "most recent shards must be greater than 0")
assert.Nil(t, f)

}
4 changes: 4 additions & 0 deletions modules/frontend/search_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func TestSearchLimitHonored(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
DefaultLimit: 10,
MaxLimit: 15,
},
Expand Down Expand Up @@ -489,6 +490,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down Expand Up @@ -534,6 +536,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down Expand Up @@ -780,6 +783,7 @@ func frontendWithSettings(t require.TestingT, next pipeline.RoundTripper, rdr te
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down
8 changes: 4 additions & 4 deletions modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
const (
defaultTargetBytesPerRequest = 100 * 1024 * 1024
defaultConcurrentRequests = 1000
// every search is broken into the same number of search shards. these shards are used to stream partial results back that are guaranteed to be latest results
maxSearchShards = 200
defaultMostRecentShards = 200
)

type SearchSharderConfig struct {
Expand All @@ -38,6 +37,7 @@ type SearchSharderConfig struct {
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"`
IngesterShards int `yaml:"ingester_shards,omitempty"`
MostRecentShards int `yaml:"most_recent_shards,omitempty"`
}

type asyncSearchSharder struct {
Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin
// calculate metrics to return to the caller
resp.TotalBlocks = len(blocks)

blockIter := backendJobsFunc(blocks, s.cfg.TargetBytesPerRequest, maxSearchShards, searchReq.End)
blockIter := backendJobsFunc(blocks, s.cfg.TargetBytesPerRequest, s.cfg.MostRecentShards, searchReq.End)
blockIter(func(jobs int, sz uint64, completedThroughTime uint32) {
resp.TotalJobs += jobs
resp.TotalBytes += sz
Expand All @@ -179,7 +179,7 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin
// unexpectedly changing the passed searchReq.
func (s *asyncSearchSharder) ingesterRequests(tenantID string, parent pipeline.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) (*combiner.SearchJobResponse, error) {
resp := &combiner.SearchJobResponse{
Shards: make([]combiner.SearchShards, 0, maxSearchShards+1), // +1 for the ingester shard
Shards: make([]combiner.SearchShards, 0, s.cfg.MostRecentShards+1), // +1 for the ingester shard
}

// request without start or end, search only in ingester
Expand Down
9 changes: 7 additions & 2 deletions modules/frontend/search_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestBuildBackendRequests(t *testing.T) {

ctx, cancelCause := context.WithCancelCause(context.Background())
reqCh := make(chan pipeline.Request)
iterFn := backendJobsFunc(tc.metas, tc.targetBytesPerRequest, maxSearchShards, math.MaxUint32)
iterFn := backendJobsFunc(tc.metas, tc.targetBytesPerRequest, defaultMostRecentShards, math.MaxUint32)

go func() {
buildBackendRequests(ctx, "test", pipeline.NewHTTPRequest(req), searchReq, iterFn, reqCh, cancelCause)
Expand All @@ -252,7 +252,9 @@ func TestBackendRequests(t *testing.T) {
bm.TotalRecords = 2

s := &asyncSearchSharder{
cfg: SearchSharderConfig{},
cfg: SearchSharderConfig{
MostRecentShards: defaultMostRecentShards,
},
reader: &mockReader{metas: []*backend.BlockMeta{bm}},
}

Expand Down Expand Up @@ -682,6 +684,7 @@ func TestTotalJobsIncludesIngester(t *testing.T) {
QueryIngestersUntil: 15 * time.Minute,
ConcurrentRequests: 1, // 1 concurrent request to force order
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
IngesterShards: 1,
}, log.NewNopLogger())
testRT := sharder.Wrap(next)
Expand Down Expand Up @@ -726,6 +729,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) {
sharder := newAsyncSearchSharder(&mockReader{}, o, SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
}, log.NewNopLogger())
testRT := sharder.Wrap(next)
Expand Down Expand Up @@ -759,6 +763,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) {
sharder = newAsyncSearchSharder(&mockReader{}, o, SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
}, log.NewNopLogger())
testRT = sharder.Wrap(next)
Expand Down
4 changes: 4 additions & 0 deletions modules/frontend/tag_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down Expand Up @@ -278,6 +279,7 @@ func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down Expand Up @@ -364,6 +366,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down Expand Up @@ -410,6 +413,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) {
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down
2 changes: 2 additions & 0 deletions modules/frontend/tag_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) {
sharder := newAsyncTagSharder(&mockReader{}, o, SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
}, parseTagsRequest, log.NewNopLogger())
testRT := sharder.Wrap(next)
Expand Down Expand Up @@ -328,6 +329,7 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) {
sharder = newAsyncTagSharder(&mockReader{}, o, SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
}, parseTagsRequest, log.NewNopLogger())
testRT = sharder.Wrap(next)
Expand Down
1 change: 1 addition & 0 deletions modules/frontend/traceid_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var config = &Config{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
},
SLO: testSLOcfg,
},
Expand Down

0 comments on commit d5cc9f4

Please sign in to comment.