diff --git a/CHANGELOG.md b/CHANGELOG.md index ef662a6269..c5ffd1ff78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 * [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005 * [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 +* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081 * [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987 * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 24240c36bf..06e34d991f 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2961,6 +2961,11 @@ instance_limits: # CLI flag: -ingester.instance-limits.max-inflight-push-requests [max_inflight_push_requests: | default = 0] + # Max inflight query requests that this ingester can handle (across all + # tenants). Additional requests will be rejected. 0 = unlimited. + # CLI flag: -ingester.instance-limits.max-inflight-query-requests + [max_inflight_query_requests: | default = 0] + # Comma-separated list of metric names, for which # -ingester.max-series-per-metric and -ingester.max-global-series-per-metric # limits will be ignored. Does not affect max-series-per-user or diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 350c9bee85..edcff187f7 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -155,6 +155,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.") f.Int64Var(&cfg.DefaultLimits.MaxInMemorySeries, "ingester.instance-limits.max-series", 0, "Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. This limit only works when using blocks engine. 0 = unlimited.") f.Int64Var(&cfg.DefaultLimits.MaxInflightPushRequests, "ingester.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.") + f.Int64Var(&cfg.DefaultLimits.MaxInflightQueryRequests, "ingester.instance-limits.max-inflight-query-requests", 0, "Max inflight query requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.") f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.") @@ -1401,9 +1402,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } - c := i.trackInflightQueryRequest() - defer c() - userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1426,8 +1424,15 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } + // We will report *this* request in the error too. + c, err := i.trackInflightQueryRequest() + if err != nil { + return nil, err + } + // It's not required to sort series from a single ingester because series are sorted by the Exemplar Storage before returning from Select. res, err := q.Select(from, through, matchers...) + c() if err != nil { return nil, err } @@ -1452,8 +1457,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { - c := i.trackInflightQueryRequest() - defer c() resp, cleanup, err := i.labelsValuesCommon(ctx, req) defer cleanup() return resp, err @@ -1461,8 +1464,6 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque // LabelValuesStream returns all label values that are associated with a given label name. func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error { - c := i.trackInflightQueryRequest() - defer c() resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req) defer cleanup() @@ -1525,6 +1526,11 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu q.Close() } + c, err := i.trackInflightQueryRequest() + if err != nil { + return nil, cleanup, err + } + defer c() vals, _, err := q.LabelValues(ctx, labelName, matchers...) if err != nil { return nil, cleanup, err @@ -1537,8 +1543,6 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { - c := i.trackInflightQueryRequest() - defer c() resp, cleanup, err := i.labelNamesCommon(ctx, req) defer cleanup() return resp, err @@ -1546,8 +1550,6 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest // LabelNamesStream return all the label names. func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error { - c := i.trackInflightQueryRequest() - defer c() resp, cleanup, err := i.labelNamesCommon(stream.Context(), req) defer cleanup() @@ -1605,6 +1607,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR q.Close() } + c, err := i.trackInflightQueryRequest() + if err != nil { + return nil, cleanup, err + } + defer c() names, _, err := q.LabelNames(ctx) if err != nil { return nil, cleanup, err @@ -1831,9 +1838,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } - c := i.trackInflightQueryRequest() - defer c() - spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream") defer spanlog.Finish() @@ -1879,11 +1883,18 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil } -func (i *Ingester) trackInflightQueryRequest() func() { +func (i *Ingester) trackInflightQueryRequest() (func(), error) { + gl := i.getInstanceLimits() + if gl != nil && gl.MaxInflightQueryRequests > 0 { + if i.inflightQueryRequests.Load() >= gl.MaxInflightQueryRequests { + return nil, errTooManyInflightQueryRequests + } + } + i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc()) return func() { i.inflightQueryRequests.Dec() - } + }, nil } // queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface @@ -1894,8 +1905,13 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th } defer q.Close() + c, err := i.trackInflightQueryRequest() + if err != nil { + return 0, 0, 0, err + } // It's not required to return sorted series because series are sorted by the Cortex querier. ss := q.Select(ctx, false, nil, matchers...) + c() if ss.Err() != nil { return 0, 0, 0, ss.Err() } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f50fda1807..436757486a 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2279,6 +2279,34 @@ func Test_Ingester_LabelValues(t *testing.T) { } } +func Test_Ingester_LabelValue_MaxInflightQueryRequest(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.DefaultLimits.MaxInflightQueryRequests = 1 + i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + i.inflightQueryRequests.Add(1) + + // Mock request + ctx := user.InjectOrgID(context.Background(), "test") + + wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000) + _, err = i.Push(ctx, wreq) + require.NoError(t, err) + + rreq := &client.LabelValuesRequest{} + _, err = i.LabelValues(ctx, rreq) + require.Error(t, err) + require.Equal(t, err, errTooManyInflightQueryRequests) +} + func Test_Ingester_Query(t *testing.T) { series := []struct { lbls labels.Labels @@ -2409,6 +2437,36 @@ func Test_Ingester_Query(t *testing.T) { }) } } + +func Test_Ingester_Query_MaxInflightQueryRequest(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.DefaultLimits.MaxInflightQueryRequests = 1 + i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + i.inflightQueryRequests.Add(1) + + // Mock request + ctx := user.InjectOrgID(context.Background(), "test") + + wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000) + _, err = i.Push(ctx, wreq) + require.NoError(t, err) + + rreq := &client.QueryRequest{} + s := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(rreq, s) + require.Error(t, err) + require.Equal(t, err, errTooManyInflightQueryRequests) +} + func TestIngester_Query_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) { i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), prometheus.NewRegistry()) require.NoError(t, err) @@ -4949,6 +5007,34 @@ func TestIngester_MaxExemplarsFallBack(t *testing.T) { require.Equal(t, maxExemplars, int64(5)) } +func Test_Ingester_QueryExemplar_MaxInflightQueryRequest(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.DefaultLimits.MaxInflightQueryRequests = 1 + i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + i.inflightQueryRequests.Add(1) + + // Mock request + ctx := user.InjectOrgID(context.Background(), "test") + + wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000) + _, err = i.Push(ctx, wreq) + require.NoError(t, err) + + rreq := &client.ExemplarQueryRequest{} + _, err = i.QueryExemplars(ctx, rreq) + require.Error(t, err) + require.Equal(t, err, errTooManyInflightQueryRequests) +} + func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest { var lbls = make([]labels.Labels, 0, count) var samples = make([]cortexpb.Sample, 0, count) diff --git a/pkg/ingester/instance_limits.go b/pkg/ingester/instance_limits.go index 30cebb77a0..d3b4671152 100644 --- a/pkg/ingester/instance_limits.go +++ b/pkg/ingester/instance_limits.go @@ -8,15 +8,17 @@ var ( errMaxUsersLimitReached = errors.New("cannot create TSDB: ingesters's max tenants limit reached") errMaxSeriesLimitReached = errors.New("cannot add series: ingesters's max series limit reached") errTooManyInflightPushRequests = errors.New("cannot push: too many inflight push requests in ingester") + errTooManyInflightQueryRequests = errors.New("cannot push: too many inflight query requests in ingester") ) // InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return // (internal) error. type InstanceLimits struct { - MaxIngestionRate float64 `yaml:"max_ingestion_rate"` - MaxInMemoryTenants int64 `yaml:"max_tenants"` - MaxInMemorySeries int64 `yaml:"max_series"` - MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"` + MaxIngestionRate float64 `yaml:"max_ingestion_rate"` + MaxInMemoryTenants int64 `yaml:"max_tenants"` + MaxInMemorySeries int64 `yaml:"max_series"` + MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"` + MaxInflightQueryRequests int64 `yaml:"max_inflight_query_requests"` } // Sets default limit values for unmarshalling. diff --git a/pkg/ingester/instance_limits_test.go b/pkg/ingester/instance_limits_test.go index ece468511b..273bd6c7fa 100644 --- a/pkg/ingester/instance_limits_test.go +++ b/pkg/ingester/instance_limits_test.go @@ -9,10 +9,11 @@ import ( func TestInstanceLimitsUnmarshal(t *testing.T) { defaultInstanceLimits = &InstanceLimits{ - MaxIngestionRate: 10, - MaxInMemoryTenants: 20, - MaxInMemorySeries: 30, - MaxInflightPushRequests: 40, + MaxIngestionRate: 10, + MaxInMemoryTenants: 20, + MaxInMemorySeries: 30, + MaxInflightPushRequests: 40, + MaxInflightQueryRequests: 50, } l := InstanceLimits{} @@ -24,6 +25,7 @@ max_tenants: 50000 require.NoError(t, yaml.UnmarshalStrict([]byte(input), &l)) require.Equal(t, float64(125.678), l.MaxIngestionRate) require.Equal(t, int64(50000), l.MaxInMemoryTenants) - require.Equal(t, int64(30), l.MaxInMemorySeries) // default value - require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value + require.Equal(t, int64(30), l.MaxInMemorySeries) // default value + require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value + require.Equal(t, int64(50), l.MaxInflightQueryRequests) // default value }