Skip to content

Commit

Permalink
Make the number of worker processing federated query configurable
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Dec 22, 2024
1 parent c124391 commit 7b8783b
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Querier: Add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query when the `-tenant-federation.enabled=true`. #6449
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ tenant_federation:
# CLI flag: -tenant-federation.enabled
[enabled: <boolean> | default = false]

# The number of workers used to process each federated query.
# CLI flag: -tenant-federation.max-concurrent
[max_concurrent: <int> | default = 16]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down
3 changes: 1 addition & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,12 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {

// Enable merge querier if multi tenant query federation is enabled
func (t *Cortex) initTenantFederation() (serv services.Service, err error) {

if t.Cfg.TenantFederation.Enabled {
// Make sure the mergeQuerier is only used for request with more than a
// single tenant. This allows for a less impactful enabling of tenant
// federation.
byPassForSingleQuerier := true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier, prometheus.DefaultRegisterer))
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
}
return nil, nil
}
Expand Down
26 changes: 15 additions & 11 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
)

const (
defaultTenantLabel = "__tenant_id__"
retainExistingPrefix = "original_"
maxConcurrency = 16
defaultTenantLabel = "__tenant_id__"
retainExistingPrefix = "original_"
defaultMaxConcurrency = 16
)

// NewQueryable returns a queryable that iterates through all the tenant IDs
Expand All @@ -38,8 +38,8 @@ const (
// If the label "__tenant_id__" is already existing, its value is overwritten
// by the tenant ID and the previous value is exposed through a new label
// prefixed with "original_". This behaviour is not implemented recursively.
func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg)
func NewQueryable(upstream storage.Queryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
return NewMergeQueryable(defaultTenantLabel, maxConcurrent, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg)
}

func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback {
Expand Down Expand Up @@ -81,9 +81,10 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids
// If the label `idLabelName` is already existing, its value is overwritten and
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively.
func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
func NewMergeQueryable(idLabelName string, maxConcurrent int, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
return &mergeQueryable{
idLabelName: idLabelName,
maxConcurrent: maxConcurrent,
callback: callback,
byPassWithSingleQuerier: byPassWithSingleQuerier,

Expand All @@ -98,6 +99,7 @@ func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPass

type mergeQueryable struct {
idLabelName string
maxConcurrent int
byPassWithSingleQuerier bool
callback MergeQuerierCallback
tenantsPerQuery prometheus.Histogram
Expand All @@ -108,6 +110,7 @@ type mergeQueryable struct {
func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error) {
return &mergeQuerier{
idLabelName: m.idLabelName,
maxConcurrent: m.maxConcurrent,
mint: mint,
maxt: maxt,
byPassWithSingleQuerier: m.byPassWithSingleQuerier,
Expand All @@ -123,9 +126,10 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively
type mergeQuerier struct {
idLabelName string
mint, maxt int64
callback MergeQuerierCallback
idLabelName string
mint, maxt int64
callback MergeQuerierCallback
maxConcurrent int

byPassWithSingleQuerier bool
tenantsPerQuery prometheus.Histogram
Expand Down Expand Up @@ -273,7 +277,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(ctx context.Context,
return nil
}

err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -370,7 +374,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
return nil
}

if err := concurrency.ForEach(ctx, jobs, maxConcurrency, run); err != nil {
if err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run); err != nil {
return storage.ErrSeriesSet(err)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/tenantfederation/merge_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ type mergeQueryableScenario struct {
func (s *mergeQueryableScenario) init() (storage.Querier, prometheus.Gatherer, error) {
// initialize with default tenant label
reg := prometheus.NewPedanticRegistry()
q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, reg)
q := NewQueryable(&s.queryable, defaultMaxConcurrency, !s.doNotByPassSingleQuerier, reg)

// retrieve querier
querier, err := q.Querier(mint, maxt)
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestMergeQueryable_Querier(t *testing.T) {
t.Run("querying without a tenant specified should error", func(t *testing.T) {
t.Parallel()
queryable := &mockTenantQueryableWithFilter{}
q := NewQueryable(queryable, false /* byPassWithSingleQuerier */, nil)
q := NewQueryable(queryable, defaultMaxConcurrency, false /* byPassWithSingleQuerier */, nil)

querier, err := q.Querier(mint, maxt)
require.NoError(t, err)
Expand Down Expand Up @@ -1115,7 +1115,7 @@ func TestTracingMergeQueryable(t *testing.T) {
// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())
filter := mockTenantQueryableWithFilter{}
q := NewQueryable(&filter, false, nil)
q := NewQueryable(&filter, defaultMaxConcurrency, false, nil)
// retrieve querier if set
querier, err := q.Querier(mint, maxt)
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/tenantfederation/tenant_federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
type Config struct {
// Enabled switches on support for multi tenant query federation
Enabled bool `yaml:"enabled"`
// MaxConcurrent The number of workers used for processing federated query.
MaxConcurrent int `yaml:"max_concurrent"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).")
f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.")
}

0 comments on commit 7b8783b

Please sign in to comment.