diff --git a/CHANGELOG.md b/CHANGELOG.md index 7da08bf6ea..a0021d4694 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 * [FEATURE] Ruler: Add support for per-user external labels #6340 +* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449 * [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423 * [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 * [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f48bfeddbc..5479bb4d00 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -157,6 +157,10 @@ tenant_federation: # CLI flag: -tenant-federation.enabled [enabled: | default = false] + # The number of workers used to process each federated query. + # CLI flag: -tenant-federation.max-concurrent + [max_concurrent: | default = 16] + # The ruler_config configures the Cortex ruler. [ruler: ] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 8823922a3c..a771c22116 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -273,7 +273,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // single tenant. This allows for a less impactful enabling of tenant // federation. byPassForSingleQuerier := true - t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier)) + t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)) } return nil, nil } diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 92f4f47b6f..9364e01510 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -7,6 +7,8 @@ import ( "strings" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -19,9 +21,9 @@ import ( ) const ( - defaultTenantLabel = "__tenant_id__" - retainExistingPrefix = "original_" - maxConcurrency = 16 + defaultTenantLabel = "__tenant_id__" + retainExistingPrefix = "original_" + defaultMaxConcurrency = 16 ) // NewQueryable returns a queryable that iterates through all the tenant IDs @@ -36,8 +38,8 @@ const ( // If the label "__tenant_id__" is already existing, its value is overwritten // by the tenant ID and the previous value is exposed through a new label // prefixed with "original_". This behaviour is not implemented recursively. -func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool) storage.Queryable { - return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier) +func NewQueryable(upstream storage.Queryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { + return NewMergeQueryable(defaultTenantLabel, maxConcurrent, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg) } func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback { @@ -79,18 +81,28 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids // If the label `idLabelName` is already existing, its value is overwritten and // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively. -func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool) storage.Queryable { +func NewMergeQueryable(idLabelName string, maxConcurrent int, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable { return &mergeQueryable{ idLabelName: idLabelName, + maxConcurrent: maxConcurrent, callback: callback, byPassWithSingleQuerier: byPassWithSingleQuerier, + + tenantsPerQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "querier_federated_tenants_per_query", + Help: "Number of tenants per query.", + Buckets: []float64{1, 2, 4, 8, 16, 32, 64}, + }), } } type mergeQueryable struct { idLabelName string + maxConcurrent int byPassWithSingleQuerier bool callback MergeQuerierCallback + tenantsPerQuery prometheus.Histogram } // Querier returns a new mergeQuerier, which aggregates results from multiple @@ -98,10 +110,12 @@ type mergeQueryable struct { func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error) { return &mergeQuerier{ idLabelName: m.idLabelName, + maxConcurrent: m.maxConcurrent, mint: mint, maxt: maxt, byPassWithSingleQuerier: m.byPassWithSingleQuerier, callback: m.callback, + tenantsPerQuery: m.tenantsPerQuery, }, nil } @@ -112,11 +126,13 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively type mergeQuerier struct { - idLabelName string - mint, maxt int64 - callback MergeQuerierCallback + idLabelName string + mint, maxt int64 + callback MergeQuerierCallback + maxConcurrent int byPassWithSingleQuerier bool + tenantsPerQuery prometheus.Histogram } // LabelValues returns all potential values for a label name. It is not safe @@ -130,6 +146,8 @@ func (m *mergeQuerier) LabelValues(ctx context.Context, name string, hints *stor return nil, nil, err } + m.tenantsPerQuery.Observe(float64(len(ids))) + // by pass when only single querier is returned if m.byPassWithSingleQuerier && len(queriers) == 1 { return queriers[0].LabelValues(ctx, name, hints, matchers...) @@ -169,6 +187,8 @@ func (m *mergeQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints return nil, nil, err } + m.tenantsPerQuery.Observe(float64(len(ids))) + // by pass when only single querier is returned if m.byPassWithSingleQuerier && len(queriers) == 1 { return queriers[0].LabelNames(ctx, hints, matchers...) @@ -257,7 +277,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(ctx context.Context, return nil } - err := concurrency.ForEach(ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run) if err != nil { return nil, nil, err } @@ -309,6 +329,8 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora return storage.ErrSeriesSet(err) } + m.tenantsPerQuery.Observe(float64(len(ids))) + // by pass when only single querier is returned if m.byPassWithSingleQuerier && len(queriers) == 1 { return queriers[0].Select(ctx, sortSeries, hints, matchers...) @@ -352,7 +374,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora return nil } - if err := concurrency.ForEach(ctx, jobs, maxConcurrency, run); err != nil { + if err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run); err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 2c296673a9..595ef572fa 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -12,6 +12,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -297,12 +299,15 @@ type mergeQueryableScenario struct { doNotByPassSingleQuerier bool } -func (s *mergeQueryableScenario) init() (storage.Querier, error) { +func (s *mergeQueryableScenario) init() (storage.Querier, prometheus.Gatherer, error) { // initialize with default tenant label - q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier) + reg := prometheus.NewPedanticRegistry() + q := NewQueryable(&s.queryable, defaultMaxConcurrency, !s.doNotByPassSingleQuerier, reg) // retrieve querier - return q.Querier(mint, maxt) + querier, err := q.Querier(mint, maxt) + + return querier, reg, err } // selectTestCase is the inputs and expected outputs of a call to Select. @@ -319,6 +324,8 @@ type selectTestCase struct { expectedWarnings []string // expectedQueryErr is the error expected when querying. expectedQueryErr error + // expectedMetrics is the expected metrics. + expectedMetrics string } // selectScenario tests a call to Select over a range of test cases in a specific scenario. @@ -339,6 +346,8 @@ type labelNamesTestCase struct { expectedWarnings []string // expectedQueryErr is the error expected when querying. expectedQueryErr error + // expectedMetrics is the expected metrics. + expectedMetrics string } // labelNamesScenario tests a call to LabelNames in a specific scenario. @@ -361,6 +370,8 @@ type labelValuesTestCase struct { expectedWarnings []string // expectedQueryErr is the error expected when querying. expectedQueryErr error + // expectedMetrics is the expected metrics. + expectedMetrics string } // labelValuesScenario tests a call to LabelValues over a range of test cases in a specific scenario. @@ -373,7 +384,7 @@ func TestMergeQueryable_Querier(t *testing.T) { t.Run("querying without a tenant specified should error", func(t *testing.T) { t.Parallel() queryable := &mockTenantQueryableWithFilter{} - q := NewQueryable(queryable, false /* byPassWithSingleQuerier */) + q := NewQueryable(queryable, defaultMaxConcurrency, false /* byPassWithSingleQuerier */, nil) querier, err := q.Querier(mint, maxt) require.NoError(t, err) @@ -428,6 +439,36 @@ var ( }, }, } + + expectedSingleTenantsMetrics = ` +# HELP cortex_querier_federated_tenants_per_query Number of tenants per query. +# TYPE cortex_querier_federated_tenants_per_query histogram +cortex_querier_federated_tenants_per_query_bucket{le="1"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="2"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="4"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="8"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="16"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="32"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="64"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="+Inf"} 1 +cortex_querier_federated_tenants_per_query_sum 1 +cortex_querier_federated_tenants_per_query_count 1 +` + + expectedThreeTenantsMetrics = ` +# HELP cortex_querier_federated_tenants_per_query Number of tenants per query. +# TYPE cortex_querier_federated_tenants_per_query histogram +cortex_querier_federated_tenants_per_query_bucket{le="1"} 0 +cortex_querier_federated_tenants_per_query_bucket{le="2"} 0 +cortex_querier_federated_tenants_per_query_bucket{le="4"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="8"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="16"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="32"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="64"} 1 +cortex_querier_federated_tenants_per_query_bucket{le="+Inf"} 1 +cortex_querier_federated_tenants_per_query_sum 3 +cortex_querier_federated_tenants_per_query_count 1 +` ) func TestMergeQueryable_Select(t *testing.T) { @@ -441,6 +482,7 @@ func TestMergeQueryable_Select(t *testing.T) { { name: "should return all series when no matchers are provided", expectedSeriesCount: 6, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return only series for team-a and team-c tenants when there is a not-equals matcher for the team-b tenant", @@ -466,6 +508,7 @@ func TestMergeQueryable_Select(t *testing.T) { {Name: "instance", Value: "host2.team-c"}, }, }, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return only series for team-b when there is an equals matcher for the team-b tenant", @@ -482,11 +525,13 @@ func TestMergeQueryable_Select(t *testing.T) { {Name: "instance", Value: "host2.team-b"}, }, }, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return one series for each tenant when there is an equals matcher for the host1 instance", matchers: []*labels.Matcher{{Name: "instance", Value: "host1", Type: labels.MatchEqual}}, expectedSeriesCount: 3, + expectedMetrics: expectedThreeTenantsMetrics, }, }, }, @@ -531,17 +576,20 @@ func TestMergeQueryable_Select(t *testing.T) { {Name: "original___tenant_id__", Value: "original-value"}, }, }, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return only series for team-a and team-c tenants when there is with not-equals matcher for the team-b tenant", matchers: []*labels.Matcher{{Name: defaultTenantLabel, Value: "team-b", Type: labels.MatchNotEqual}}, expectedSeriesCount: 4, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return no series where there are conflicting tenant matchers", matchers: []*labels.Matcher{ {Name: defaultTenantLabel, Value: "team-a", Type: labels.MatchEqual}, {Name: defaultTenantLabel, Value: "team-c", Type: labels.MatchEqual}}, expectedSeriesCount: 0, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return only series for team-b when there is an equals matcher for team-b tenant", @@ -560,21 +608,25 @@ func TestMergeQueryable_Select(t *testing.T) { {Name: "original___tenant_id__", Value: "original-value"}, }, }, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return all series when there is an equals matcher for the original value of __tenant_id__ using the revised tenant label", matchers: []*labels.Matcher{{Name: originalDefaultTenantLabel, Value: "original-value", Type: labels.MatchEqual}}, expectedSeriesCount: 6, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return all series when there is a regexp matcher for the original value of __tenant_id__ using the revised tenant label", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, originalDefaultTenantLabel, "original-value")}, expectedSeriesCount: 6, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return no series when there is a not-equals matcher for the original value of __tenant_id__ using the revised tenant label", matchers: []*labels.Matcher{{Name: originalDefaultTenantLabel, Value: "original-value", Type: labels.MatchNotEqual}}, expectedSeriesCount: 0, + expectedMetrics: expectedThreeTenantsMetrics, }, }, }, @@ -587,6 +639,7 @@ func TestMergeQueryable_Select(t *testing.T) { `warning querying tenant_id team-c: out of office`, }, expectedSeriesCount: 6, + expectedMetrics: expectedThreeTenantsMetrics, }, }}, { @@ -594,31 +647,32 @@ func TestMergeQueryable_Select(t *testing.T) { selectTestCases: []selectTestCase{{ name: "should return any error encountered with any tenant", expectedQueryErr: errors.New("error querying tenant_id team-b: failure xyz"), + expectedMetrics: expectedThreeTenantsMetrics, }}, }, } { scenario := scenario t.Run(scenario.name, func(t *testing.T) { - t.Parallel() - querier, err := scenario.init() - require.NoError(t, err) - - // inject tenants into context - ctx := context.Background() - if len(scenario.tenants) > 0 { - ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) - } - for _, tc := range scenario.selectTestCases { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() + querier, reg, err := scenario.init() + require.NoError(t, err) + + // inject tenants into context + ctx := context.Background() + if len(scenario.tenants) > 0 { + ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) + } + seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}, tc.matchers...) if tc.expectedQueryErr != nil { require.EqualError(t, seriesSet.Err(), tc.expectedQueryErr.Error()) } else { require.NoError(t, seriesSet.Err()) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) assertEqualWarnings(t, tc.expectedWarnings, seriesSet.Warnings()) } @@ -650,6 +704,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { labelNamesTestCase: labelNamesTestCase{ name: "should not return the __tenant_id__ label as the MergeQueryable has been bypassed", expectedLabelNames: []string{"instance", "tenant-team-a"}, + expectedMetrics: expectedSingleTenantsMetrics, }, }, { @@ -658,6 +713,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { name: "should not return the __tenant_id__ label as the MergeQueryable has been bypassed with matchers", matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, seriesWithLabelNames, "bar|foo")}, expectedLabelNames: []string{"bar", "foo", "instance", "tenant-team-a"}, + expectedMetrics: expectedSingleTenantsMetrics, }, }, { @@ -665,6 +721,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { labelNamesTestCase: labelNamesTestCase{ name: "should return the __tenant_id__ label as the MergeQueryable has not been bypassed", expectedLabelNames: []string{defaultTenantLabel, "instance", "tenant-team-a"}, + expectedMetrics: expectedSingleTenantsMetrics, }, }, { @@ -672,6 +729,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { labelNamesTestCase: labelNamesTestCase{ name: "should return the __tenant_id__ label and all tenant team labels", expectedLabelNames: []string{defaultTenantLabel, "instance", "tenant-team-a", "tenant-team-b", "tenant-team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }, }, { @@ -679,6 +737,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { labelNamesTestCase: labelNamesTestCase{ name: "should return the __tenant_id__ label and all tenant team labels, and the __original_tenant_id__ label", expectedLabelNames: []string{defaultTenantLabel, "instance", originalDefaultTenantLabel, "tenant-team-a", "tenant-team-b", "tenant-team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }, }, { @@ -690,6 +749,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { `warning querying tenant_id team-b: don't like them`, `warning querying tenant_id team-c: out of office`, }, + expectedMetrics: expectedThreeTenantsMetrics, }, }, { @@ -697,6 +757,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { labelNamesTestCase: labelNamesTestCase{ name: "should return any error encountered with any tenant", expectedQueryErr: errors.New("error querying tenant_id team-b: failure xyz"), + expectedMetrics: expectedThreeTenantsMetrics, }, }, { @@ -709,6 +770,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { `warning querying tenant_id team-b: don't like them`, `warning querying tenant_id team-c: out of office`, }, + expectedMetrics: expectedThreeTenantsMetrics, }, }, { @@ -724,6 +786,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { expectedWarnings: []string{ `warning querying tenant_id team-b: don't like them`, }, + expectedMetrics: expectedThreeTenantsMetrics, }, }, { @@ -739,6 +802,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { `warning querying tenant_id team-b: don't like them`, `warning querying tenant_id team-c: out of office`, }, + expectedMetrics: expectedThreeTenantsMetrics, }, }, { @@ -753,13 +817,14 @@ func TestMergeQueryable_LabelNames(t *testing.T) { expectedWarnings: []string{ `warning querying tenant_id team-b: don't like them`, }, + expectedMetrics: expectedThreeTenantsMetrics, }, }, } { scenario := scenario t.Run(scenario.mergeQueryableScenario.name, func(t *testing.T) { t.Parallel() - querier, err := scenario.init() + querier, reg, err := scenario.init() require.NoError(t, err) // inject tenants into context @@ -775,6 +840,7 @@ func TestMergeQueryable_LabelNames(t *testing.T) { require.EqualError(t, err, scenario.labelNamesTestCase.expectedQueryErr.Error()) } else { require.NoError(t, err) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(scenario.expectedMetrics), "cortex_querier_federated_tenants_per_query")) assert.Equal(t, scenario.labelNamesTestCase.expectedLabelNames, labelNames) assertEqualWarnings(t, scenario.labelNamesTestCase.expectedWarnings, warnings) } @@ -796,11 +862,13 @@ func TestMergeQueryable_LabelValues(t *testing.T) { name: "should return all label values for instance when no matchers are provided", labelName: "instance", expectedLabelValues: []string{"host1", "host2.team-a"}, + expectedMetrics: expectedSingleTenantsMetrics, }, { name: "should return no tenant values for the __tenant_id__ label as the MergeQueryable has been bypassed", labelName: defaultTenantLabel, expectedLabelValues: nil, + expectedMetrics: expectedSingleTenantsMetrics, }, }, }, @@ -811,11 +879,13 @@ func TestMergeQueryable_LabelValues(t *testing.T) { name: "should return all label values for instance when no matchers are provided", labelName: "instance", expectedLabelValues: []string{"host1", "host2.team-a"}, + expectedMetrics: expectedSingleTenantsMetrics, }, { name: "should return a tenant team value for the __tenant_id__ label as the MergeQueryable has not been bypassed", labelName: defaultTenantLabel, expectedLabelValues: []string{"team-a"}, + expectedMetrics: expectedSingleTenantsMetrics, }, }, }, @@ -826,6 +896,7 @@ func TestMergeQueryable_LabelValues(t *testing.T) { name: "should return all label values for instance when no matchers are provided", labelName: "instance", expectedLabelValues: []string{"host1", "host2.team-a", "host2.team-b", "host2.team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should propagate non-tenant matchers to downstream queriers", @@ -838,6 +909,7 @@ func TestMergeQueryable_LabelValues(t *testing.T) { "warning querying tenant_id team-b: " + mockMatchersNotImplemented, "warning querying tenant_id team-c: " + mockMatchersNotImplemented, }, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return no values for the instance label when there are conflicting tenant matchers", @@ -847,29 +919,34 @@ func TestMergeQueryable_LabelValues(t *testing.T) { }, labelName: "instance", expectedLabelValues: []string{}, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should only query tenant-b when there is an equals matcher for team-b tenant", matchers: []*labels.Matcher{{Name: defaultTenantLabel, Value: "team-b", Type: labels.MatchEqual}}, labelName: "instance", expectedLabelValues: []string{"host1", "host2.team-b"}, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return all tenant team values for the __tenant_id__ label when no matchers are provided", labelName: defaultTenantLabel, expectedLabelValues: []string{"team-a", "team-b", "team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return only label values for team-a and team-c tenants when there is a not-equals matcher for team-b tenant", labelName: defaultTenantLabel, matchers: []*labels.Matcher{{Name: defaultTenantLabel, Value: "team-b", Type: labels.MatchNotEqual}}, expectedLabelValues: []string{"team-a", "team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return only label values for team-b tenant when there is an equals matcher for team-b tenant", labelName: defaultTenantLabel, matchers: []*labels.Matcher{{Name: defaultTenantLabel, Value: "team-b", Type: labels.MatchEqual}}, expectedLabelValues: []string{"team-b"}, + expectedMetrics: expectedThreeTenantsMetrics, }, }, }, @@ -880,16 +957,19 @@ func TestMergeQueryable_LabelValues(t *testing.T) { name: "should return all label values for instance when no matchers are provided", labelName: "instance", expectedLabelValues: []string{"host1", "host2.team-a", "host2.team-b", "host2.team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return all tenant values for __tenant_id__ label name", labelName: defaultTenantLabel, expectedLabelValues: []string{"team-a", "team-b", "team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return the original value for the revised tenant label name when no matchers are provided", labelName: originalDefaultTenantLabel, expectedLabelValues: []string{"original-value"}, + expectedMetrics: expectedThreeTenantsMetrics, }, { name: "should return the original value for the revised tenant label name with matchers", @@ -901,6 +981,7 @@ func TestMergeQueryable_LabelValues(t *testing.T) { "warning querying tenant_id team-b: " + mockMatchersNotImplemented, "warning querying tenant_id team-c: " + mockMatchersNotImplemented, }, + expectedMetrics: expectedThreeTenantsMetrics, }, }, }, @@ -914,6 +995,7 @@ func TestMergeQueryable_LabelValues(t *testing.T) { `warning querying tenant_id team-b: don't like them`, `warning querying tenant_id team-c: out of office`, }, + expectedMetrics: expectedThreeTenantsMetrics, }}, }, { @@ -922,6 +1004,7 @@ func TestMergeQueryable_LabelValues(t *testing.T) { name: "should not return warnings as the underlying queryables are not queried in requests for the __tenant_id__ label", labelName: defaultTenantLabel, expectedLabelValues: []string{"team-a", "team-b", "team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }}, }, { @@ -930,6 +1013,7 @@ func TestMergeQueryable_LabelValues(t *testing.T) { name: "should return any error encountered with any tenant", labelName: "instance", expectedQueryErr: errors.New("error querying tenant_id team-b: failure xyz"), + expectedMetrics: expectedThreeTenantsMetrics, }}, }, { @@ -938,30 +1022,31 @@ func TestMergeQueryable_LabelValues(t *testing.T) { name: "should not return errors as the underlying queryables are not queried in requests for the __tenant_id__ label", labelName: defaultTenantLabel, expectedLabelValues: []string{"team-a", "team-b", "team-c"}, + expectedMetrics: expectedThreeTenantsMetrics, }}, }, } { scenario := scenario t.Run(scenario.name, func(t *testing.T) { - t.Parallel() - querier, err := scenario.init() - require.NoError(t, err) - - // inject tenants into context - ctx := context.Background() - if len(scenario.tenants) > 0 { - ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) - } - for _, tc := range scenario.labelValuesTestCases { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() + querier, reg, err := scenario.init() + require.NoError(t, err) + + // inject tenants into context + ctx := context.Background() + if len(scenario.tenants) > 0 { + ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) + } + actLabelValues, warnings, err := querier.LabelValues(ctx, tc.labelName, nil, tc.matchers...) if tc.expectedQueryErr != nil { require.EqualError(t, err, tc.expectedQueryErr.Error()) } else { require.NoError(t, err) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) assert.Equal(t, tc.expectedLabelValues, actLabelValues, fmt.Sprintf("unexpected values for label '%s'", tc.labelName)) assertEqualWarnings(t, tc.expectedWarnings, warnings) } @@ -1030,7 +1115,7 @@ func TestTracingMergeQueryable(t *testing.T) { // set a multi tenant resolver tenant.WithDefaultResolver(tenant.NewMultiResolver()) filter := mockTenantQueryableWithFilter{} - q := NewQueryable(&filter, false) + q := NewQueryable(&filter, defaultMaxConcurrency, false, nil) // retrieve querier if set querier, err := q.Querier(mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index af5bd7b929..56e5fb59db 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -7,8 +7,11 @@ import ( type Config struct { // Enabled switches on support for multi tenant query federation Enabled bool `yaml:"enabled"` + // MaxConcurrent The number of workers used for processing federated query. + MaxConcurrent int `yaml:"max_concurrent"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).") + f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.") }