From d2c95c6ba3593708f5d9a9df37181389ef179a72 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sat, 21 Dec 2024 06:24:02 +0900 Subject: [PATCH] Make the number of worker processing federnated query configurable Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- docs/configuration/config-file-reference.md | 4 +++ pkg/cortex/modules.go | 2 +- .../tenantfederation/merge_queryable.go | 26 +++++++++++-------- .../tenantfederation/merge_queryable_test.go | 6 ++--- .../tenantfederation/tenant_federation.go | 3 +++ 6 files changed, 27 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a3f5ece9a..a0021d4694 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 * [FEATURE] Ruler: Add support for per-user external labels #6340 -* [ENHANCEMENT] Querier: Add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query when the `-tenant-federation.enabled=true`. #6449 +* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449 * [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423 * [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 * [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f48bfeddbc..669bb377b8 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -157,6 +157,10 @@ tenant_federation: # CLI flag: -tenant-federation.enabled [enabled: | default = false] + # The number of workers used for processing federated query. + # CLI flag: -tenant-federation.max-concurrent + [max_concurrent: | default = 16] + # The ruler_config configures the Cortex ruler. [ruler: ] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 8d00a6b872..1fe95d4287 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -274,7 +274,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // single tenant. This allows for a less impactful enabling of tenant // federation. byPassForSingleQuerier := true - t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier, prometheus.DefaultRegisterer)) + t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)) } return nil, nil } diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 40313b2fcb..9364e01510 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -21,9 +21,9 @@ import ( ) const ( - defaultTenantLabel = "__tenant_id__" - retainExistingPrefix = "original_" - maxConcurrency = 16 + defaultTenantLabel = "__tenant_id__" + retainExistingPrefix = "original_" + defaultMaxConcurrency = 16 ) // NewQueryable returns a queryable that iterates through all the tenant IDs @@ -38,8 +38,8 @@ const ( // If the label "__tenant_id__" is already existing, its value is overwritten // by the tenant ID and the previous value is exposed through a new label // prefixed with "original_". This behaviour is not implemented recursively. -func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { - return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg) +func NewQueryable(upstream storage.Queryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { + return NewMergeQueryable(defaultTenantLabel, maxConcurrent, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg) } func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback { @@ -81,9 +81,10 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids // If the label `idLabelName` is already existing, its value is overwritten and // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively. -func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { +func NewMergeQueryable(idLabelName string, maxConcurrent int, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { return &mergeQueryable{ idLabelName: idLabelName, + maxConcurrent: maxConcurrent, callback: callback, byPassWithSingleQuerier: byPassWithSingleQuerier, @@ -98,6 +99,7 @@ func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPass type mergeQueryable struct { idLabelName string + maxConcurrent int byPassWithSingleQuerier bool callback MergeQuerierCallback tenantsPerQuery prometheus.Histogram @@ -108,6 +110,7 @@ type mergeQueryable struct { func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error) { return &mergeQuerier{ idLabelName: m.idLabelName, + maxConcurrent: m.maxConcurrent, mint: mint, maxt: maxt, byPassWithSingleQuerier: m.byPassWithSingleQuerier, @@ -123,9 +126,10 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively type mergeQuerier struct { - idLabelName string - mint, maxt int64 - callback MergeQuerierCallback + idLabelName string + mint, maxt int64 + callback MergeQuerierCallback + maxConcurrent int byPassWithSingleQuerier bool tenantsPerQuery prometheus.Histogram @@ -273,7 +277,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(ctx context.Context, return nil } - err := concurrency.ForEach(ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run) if err != nil { return nil, nil, err } @@ -370,7 +374,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora return nil } - if err := concurrency.ForEach(ctx, jobs, maxConcurrency, run); err != nil { + if err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run); err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 9658d3f2a0..595ef572fa 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -302,7 +302,7 @@ type mergeQueryableScenario struct { func (s *mergeQueryableScenario) init() (storage.Querier, prometheus.Gatherer, error) { // initialize with default tenant label reg := prometheus.NewPedanticRegistry() - q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, reg) + q := NewQueryable(&s.queryable, defaultMaxConcurrency, !s.doNotByPassSingleQuerier, reg) // retrieve querier querier, err := q.Querier(mint, maxt) @@ -384,7 +384,7 @@ func TestMergeQueryable_Querier(t *testing.T) { t.Run("querying without a tenant specified should error", func(t *testing.T) { t.Parallel() queryable := &mockTenantQueryableWithFilter{} - q := NewQueryable(queryable, false /* byPassWithSingleQuerier */, nil) + q := NewQueryable(queryable, defaultMaxConcurrency, false /* byPassWithSingleQuerier */, nil) querier, err := q.Querier(mint, maxt) require.NoError(t, err) @@ -1115,7 +1115,7 @@ func TestTracingMergeQueryable(t *testing.T) { // set a multi tenant resolver tenant.WithDefaultResolver(tenant.NewMultiResolver()) filter := mockTenantQueryableWithFilter{} - q := NewQueryable(&filter, false, nil) + q := NewQueryable(&filter, defaultMaxConcurrency, false, nil) // retrieve querier if set querier, err := q.Querier(mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index af5bd7b929..eb6d230fde 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -7,8 +7,11 @@ import ( type Config struct { // Enabled switches on support for multi tenant query federation Enabled bool `yaml:"enabled"` + // MaxConcurrent The number of workers used for processing federated query. + MaxConcurrent int `yaml:"max_concurrent"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).") + f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used for processing federated query.") }