From 621ca8724652638a56cb01fd0995f39407755822 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 23 Nov 2023 16:34:44 -0800 Subject: [PATCH] Assign priority before splitting the query Signed-off-by: Justin Jung --- docs/configuration/config-file-reference.md | 16 +- pkg/frontend/transport/handler.go | 9 +- pkg/frontend/transport/roundtripper.go | 20 +- pkg/frontend/v1/frontend.go | 27 +- pkg/frontend/v2/frontend.go | 13 +- pkg/frontend/v2/frontend_scheduler_worker.go | 1 - pkg/frontend/v2/frontend_test.go | 13 +- .../tripperware/instantquery/instant_query.go | 18 +- pkg/querier/tripperware/limits.go | 9 +- pkg/querier/tripperware/priority.go | 94 ++++++ pkg/querier/tripperware/priority_test.go | 277 ++++++++++++++++++ .../tripperware/queryrange/limits_test.go | 5 + pkg/querier/tripperware/roundtrip.go | 23 +- .../tripperware/test_shard_by_query_utils.go | 6 + pkg/scheduler/scheduler.go | 10 +- pkg/scheduler/schedulerpb/scheduler.pb.go | 127 +++----- pkg/scheduler/schedulerpb/scheduler.proto | 1 - pkg/util/http.go | 1 + pkg/util/httpgrpcutil/header.go | 22 ++ pkg/util/query/priority.go | 74 ----- pkg/util/query/priority_test.go | 224 -------------- pkg/util/time.go | 14 + pkg/util/validation/limits.go | 6 +- tools/doc-generator/parser.go | 4 + 24 files changed, 533 insertions(+), 481 deletions(-) create mode 100644 pkg/querier/tripperware/priority.go create mode 100644 pkg/querier/tripperware/priority_test.go create mode 100644 pkg/util/httpgrpcutil/header.go delete mode 100644 pkg/util/query/priority.go delete mode 100644 pkg/util/query/priority_test.go diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e47805a1b9d..40ab1a05db5 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5039,8 +5039,8 @@ otel: # Priority level. Must be a unique value. [priority: | default = 0] -# Number of reserved queriers to handle priorities higher or equal to this value -# only. Value between 0 and 1 will be used as a percentage. +# Number of reserved queriers to handle priorities higher or equal to the +# priority level. Value between 0 and 1 will be used as a percentage. [reserved_queriers: | default = 0] # List of query attributes to assign the priority. @@ -5050,14 +5050,14 @@ otel: ### `QueryAttribute` ```yaml -# Query string regex. -[regex: | default = ".*"] +# Query string regex. If set to empty string, it will not match anything. +[regex: | default = ""] -# Query start time. -[start_time: | default = 0s] +# Query start time. If set to 0, the start time won't be checked. +[start_time: | default = 0] -# Query end time. -[end_time: | default = 0s] +# Query end time. If set to 0, the end time won't be checked. +[end_time: | default = 0] ``` ### `DisabledRuleGroup` diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index a4faacac588..00eea588571 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -200,12 +200,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(&buf) } + r.Header.Get("test") startTime := time.Now() - // get config - // assign priority - // embed it to the http request, header? - // extract Decode to here, to make sure all requests pass here - // log the priority as well resp, err := f.roundTripper.RoundTrip(r) queryResponseTime := time.Since(startTime) @@ -348,6 +344,9 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u if ua := r.Header.Get("User-Agent"); len(ua) > 0 { logMessage = append(logMessage, "user_agent", ua) } + if queryPriority := r.Header.Get(util.QueryPriorityHeaderKey); len(queryPriority) > 0 { + logMessage = append(logMessage, "priority", queryPriority) + } if error != nil { s, ok := status.FromError(error) diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index a0529ba6a41..583fc22d04a 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -5,9 +5,6 @@ import ( "context" "io" "net/http" - "net/url" - "strings" - "time" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" @@ -15,7 +12,7 @@ import ( // GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages. type GrpcRoundTripper interface { - RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest, url.Values, time.Time) (*httpgrpc.HTTPResponse, error) + RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) } func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper { @@ -42,20 +39,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er return nil, err } - var ( - resp *httpgrpc.HTTPResponse - reqValues url.Values - ts time.Time - ) - - if strings.HasSuffix(r.URL.Path, "/query") || strings.HasSuffix(r.URL.Path, "/query_range") { - if err = r.ParseForm(); err == nil { - reqValues = r.Form - ts = time.Now() - } - } - - resp, err = a.roundTripper.RoundTripGRPC(r.Context(), req, reqValues, ts) + resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req) if err != nil { return nil, err } diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index ef80fd07b16..024c1f961a5 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -5,7 +5,7 @@ import ( "flag" "fmt" "net/http" - "net/url" + "strconv" "time" "github.com/go-kit/log" @@ -23,7 +23,6 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/httpgrpcutil" - util_query "github.com/cortexproject/cortex/pkg/util/query" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -98,11 +97,15 @@ type request struct { request *httpgrpc.HTTPRequest err chan error response chan *httpgrpc.HTTPResponse - priority int64 } func (r request) Priority() int64 { - return r.priority + priority, err := strconv.ParseInt(httpgrpcutil.GetHeader(*r.request, util.QueryPriorityHeaderKey), 10, 64) + if err != nil { + return 0 + } + + return priority } // New creates a new frontend. Frontend implements service, and must be started and stopped. @@ -181,7 +184,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) { } // RoundTripGRPC round trips a proto (instead of a HTTP request). -func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, reqParams url.Values, ts time.Time) (*httpgrpc.HTTPResponse, error) { +func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { // Propagate trace context in gRPC too - this will be ignored if using HTTP. tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx) if tracer != nil && span != nil { @@ -192,12 +195,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, } } - tenantIDs, err := tenant.TenantIDs(ctx) - if err != nil { - return nil, err - } - userID := tenant.JoinTenantIDs(tenantIDs) - return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { request := request{ request: req, @@ -210,14 +207,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, response: make(chan *httpgrpc.HTTPResponse, 1), } - if reqParams != nil { - queryPriority := f.limits.QueryPriority(userID) - - if queryPriority.Enabled { - request.priority = util_query.GetPriority(reqParams, ts, queryPriority) - } - } - if err := f.queueRequest(ctx, &request); err != nil { return nil, err } diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index b11d8c04bc4..2df0f8f344b 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -6,7 +6,6 @@ import ( "fmt" "math/rand" "net/http" - "net/url" "sync" "time" @@ -28,7 +27,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/cortexproject/cortex/pkg/util/httpgrpcutil" util_log "github.com/cortexproject/cortex/pkg/util/log" - util_query "github.com/cortexproject/cortex/pkg/util/query" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -89,7 +87,6 @@ type frontendRequest struct { request *httpgrpc.HTTPRequest userID string statsEnabled bool - priority int64 cancel context.CancelFunc @@ -170,7 +167,7 @@ func (f *Frontend) stopping(_ error) error { } // RoundTripGRPC round trips a proto (instead of a HTTP request). -func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, reqParams url.Values, ts time.Time) (*httpgrpc.HTTPResponse, error) { +func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { if s := f.State(); s != services.Running { return nil, fmt.Errorf("frontend not running: %v", s) } @@ -210,14 +207,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, } - if reqParams != nil { - queryPriority := f.limits.QueryPriority(userID) - - if queryPriority.Enabled { - freq.priority = util_query.GetPriority(reqParams, ts, queryPriority) - } - } - f.requests.put(freq) defer f.requests.delete(freq.queryID) diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 2abff24bf6d..bbe1e2ed746 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -263,7 +263,6 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro HttpRequest: req.request, FrontendAddress: w.frontendAddr, StatsEnabled: req.statsEnabled, - Priority: req.priority, }) if err != nil { diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index 4c40d88b663..6b20926e89a 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -3,7 +3,6 @@ package v2 import ( "context" "net" - "net/url" "strconv" "strings" "sync" @@ -112,7 +111,7 @@ func TestFrontendBasicWorkflow(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 0) - resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now()) + resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) require.NoError(t, err) require.Equal(t, int32(200), resp.Code) require.Equal(t, []byte(body), resp.Body) @@ -142,7 +141,7 @@ func TestFrontendRetryRequest(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 3) - res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now()) + res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) require.NoError(t, err) require.Equal(t, int32(200), res.Code) } @@ -169,7 +168,7 @@ func TestFrontendRetryEnqueue(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK} }, 0) - _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now()) + _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}) require.NoError(t, err) } @@ -178,7 +177,7 @@ func TestFrontendEnqueueFailure(t *testing.T) { return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN} }, 0) - _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now()) + _, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{}) require.Error(t, err) require.True(t, strings.Contains(err.Error(), "failed to enqueue request")) } @@ -189,7 +188,7 @@ func TestFrontendCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now()) + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}) require.EqualError(t, err, context.DeadlineExceeded.Error()) require.Nil(t, resp) @@ -238,7 +237,7 @@ func TestFrontendFailedCancellation(t *testing.T) { }() // send request - resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now()) + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}) require.EqualError(t, err, context.Canceled.Error()) require.Nil(t, resp) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index a7350e65d5a..60bea7a8d10 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -4,18 +4,17 @@ import ( "bytes" "context" "fmt" + "github.com/cortexproject/cortex/pkg/util" "io" "net/http" "net/url" "sort" - "strconv" "strings" "time" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" - "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -26,7 +25,6 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) @@ -132,7 +130,7 @@ func (resp *PrometheusInstantQueryResponse) HTTPHeaders() map[string][]string { func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { result := PrometheusRequest{Headers: map[string][]string{}} var err error - result.Time, err = parseTimeParam(r, "time", c.now().Unix()) + result.Time, err = util.ParseTimeParam(r, "time", c.now().Unix()) if err != nil { return nil, decorateWithParamName(err, "time") } @@ -630,15 +628,3 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) { return s.Result.GetRawBytes(), nil } } - -func parseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) { - val := r.FormValue(paramName) - if val == "" { - val = strconv.FormatInt(defaultValue, 10) - } - result, err := util.ParseTime(val) - if err != nil { - return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName) - } - return result, nil -} diff --git a/pkg/querier/tripperware/limits.go b/pkg/querier/tripperware/limits.go index 15ce78592fb..815693b3c1d 100644 --- a/pkg/querier/tripperware/limits.go +++ b/pkg/querier/tripperware/limits.go @@ -1,6 +1,10 @@ package tripperware -import "time" +import ( + "time" + + "github.com/cortexproject/cortex/pkg/util/validation" +) // Limits allows us to specify per-tenant runtime limits on the behavior of // the query handling code. @@ -21,4 +25,7 @@ type Limits interface { // QueryVerticalShardSize returns the maximum number of queriers that can handle requests for this user. QueryVerticalShardSize(userID string) int + + // QueryPriority returns the query priority config for the tenant, including different priorities and their attributes. + QueryPriority(userID string) validation.QueryPriority } diff --git a/pkg/querier/tripperware/priority.go b/pkg/querier/tripperware/priority.go new file mode 100644 index 00000000000..5ce4e0d3fa8 --- /dev/null +++ b/pkg/querier/tripperware/priority.go @@ -0,0 +1,94 @@ +package tripperware + +import ( + "net/http" + "strings" + "time" + + "github.com/prometheus/prometheus/promql/parser" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func GetPriority(r *http.Request, userID string, limits Limits, now time.Time) (int64, error) { + isQuery := strings.HasSuffix(r.URL.Path, "/query") + isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range") + queryPriority := limits.QueryPriority(userID) + query := r.FormValue("query") + + if (!isQuery && !isQueryRange) || !queryPriority.Enabled || query == "" { + return 0, nil + } + + expr, err := parser.ParseExpr(query) + if err != nil { + return 0, err + } + + var startTime, endTime int64 + if isQuery { + if t, err := util.ParseTimeParam(r, "time", now.Unix()); err == nil { + startTime = t + endTime = t + } + } else if isQueryRange { + if st, err := util.ParseTime(r.FormValue("start")); err == nil { + if et, err := util.ParseTime(r.FormValue("end")); err == nil { + startTime = st + endTime = et + } + } + } + + es := &parser.EvalStmt{ + Expr: expr, + Start: util.TimeFromMillis(startTime), + End: util.TimeFromMillis(endTime), + LookbackDelta: limits.MaxQueryLookback(userID), // this is available from querier flag. + } + + minTime, maxTime := FindMinMaxTime(es) + + for _, priority := range queryPriority.Priorities { + for _, attribute := range priority.QueryAttributes { + if attribute.Regex == "" || (attribute.CompiledRegex != nil && !attribute.CompiledRegex.MatchString(query)) { + continue + } + + if isWithinTimeAttributes(attribute, now, minTime, maxTime) { + return priority.Priority, nil + } + } + } + + return queryPriority.DefaultPriority, nil +} + +func isWithinTimeAttributes(attribute validation.QueryAttribute, now time.Time, startTime, endTime int64) bool { + if attribute.StartTime == 0 && attribute.EndTime == 0 { + return true + } + + if attribute.StartTime != 0 { + startTimeThreshold := now.Add(-1 * time.Duration(attribute.StartTime).Abs()).Truncate(time.Second).Unix() + if startTime < startTimeThreshold { + return false + } + } + + if attribute.EndTime != 0 { + endTimeThreshold := now.Add(-1 * time.Duration(attribute.EndTime).Abs()).Add(1 * time.Second).Truncate(time.Second).Unix() + if endTime > endTimeThreshold { + return false + } + } + + return true +} + +func FindMinMaxTime(s *parser.EvalStmt) (int64, int64) { + // Placeholder until Prometheus is updated to >=0.48.0 + // which includes https://github.com/prometheus/prometheus/commit/9e3df532d8294d4fe3284bde7bc96db336a33552 + return s.Start.Unix(), s.End.Unix() +} diff --git a/pkg/querier/tripperware/priority_test.go b/pkg/querier/tripperware/priority_test.go new file mode 100644 index 00000000000..29188423467 --- /dev/null +++ b/pkg/querier/tripperware/priority_test.go @@ -0,0 +1,277 @@ +package tripperware + +import ( + "bytes" + "net/http" + "regexp" + "strconv" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func Test_GetPriorityShouldReturnDefaultPriorityIfNotEnabledOrEmptyQueryString(t *testing.T) { + now := time.Now() + limits := mockLimits{queryPriority: validation.QueryPriority{ + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + { + Regex: ".*", + CompiledRegex: regexp.MustCompile(".*"), + }, + }, + }, + }, + }} + + type testCase struct { + url string + queryPriorityEnabled bool + } + + tests := map[string]testCase{ + "should miss if query priority not enabled": { + url: "/query?query=up", + }, + "should miss if query string empty": { + url: "/query?query=", + queryPriorityEnabled: true, + }, + "should miss if query string empty - range query": { + url: "/query_range?query=", + queryPriorityEnabled: true, + }, + "should miss if neither instant nor range query": { + url: "/series", + queryPriorityEnabled: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + limits.queryPriority.Enabled = testData.queryPriorityEnabled + req, _ := http.NewRequest(http.MethodPost, testData.url, bytes.NewReader([]byte{})) + priority, err := GetPriority(req, "", limits, now) + assert.NoError(t, err) + assert.Equal(t, int64(0), priority) + }) + } +} + +func Test_GetPriorityShouldConsiderRegex(t *testing.T) { + now := time.Now() + limits := mockLimits{queryPriority: validation.QueryPriority{ + Enabled: true, + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + {}, + }, + }, + }, + }} + + type testCase struct { + regex string + query string + expectedPriority int + } + + tests := map[string]testCase{ + "should hit if regex matches": { + regex: "(^sum|c(.+)t)", + query: "sum(up)", + expectedPriority: 1, + }, + "should miss if regex doesn't match": { + regex: "(^sum|c(.+)t)", + query: "min(up)", + expectedPriority: 0, + }, + "should hit if regex matches - .*": { + regex: ".*", + query: "count(sum(up))", + expectedPriority: 1, + }, + "should hit if regex matches - .+": { + regex: ".+", + query: "count(sum(up))", + expectedPriority: 1, + }, + "should miss if regex is an empty string": { + regex: "", + query: "sum(up)", + expectedPriority: 0, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + limits.queryPriority.Priorities[0].QueryAttributes[0].Regex = testData.regex + limits.queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile(testData.regex) + req, _ := http.NewRequest(http.MethodPost, "/query?query="+testData.query, bytes.NewReader([]byte{})) + priority, err := GetPriority(req, "", limits, now) + assert.NoError(t, err) + assert.Equal(t, int64(testData.expectedPriority), priority) + }) + } +} + +func Test_GetPriorityShouldConsiderStartAndEndTime(t *testing.T) { + now := time.Now() + limits := mockLimits{queryPriority: validation.QueryPriority{ + Enabled: true, + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + { + Regex: ".*", + CompiledRegex: regexp.MustCompile(".*"), + StartTime: model.Duration(45 * time.Minute), + EndTime: model.Duration(15 * time.Minute), + }, + }, + }, + }, + }} + + type testCase struct { + time time.Time + start time.Time + end time.Time + expectedPriority int + } + + tests := map[string]testCase{ + "should hit instant query between start and end time": { + time: now.Add(-30 * time.Minute), + expectedPriority: 1, + }, + "should hit instant query equal to start time": { + time: now.Add(-45 * time.Minute), + expectedPriority: 1, + }, + "should hit instant query equal to end time": { + time: now.Add(-15 * time.Minute), + expectedPriority: 1, + }, + "should miss instant query outside of end time": { + expectedPriority: 0, + }, + "should miss instant query outside of start time": { + time: now.Add(-60 * time.Minute), + expectedPriority: 0, + }, + "should hit range query between start and end time": { + start: now.Add(-40 * time.Minute), + end: now.Add(-20 * time.Minute), + expectedPriority: 1, + }, + "should hit range query equal to start and end time": { + start: now.Add(-45 * time.Minute), + end: now.Add(-15 * time.Minute), + expectedPriority: 1, + }, + "should miss range query outside of start time": { + start: now.Add(-50 * time.Minute), + end: now.Add(-15 * time.Minute), + expectedPriority: 0, + }, + "should miss range query completely outside of start time": { + start: now.Add(-50 * time.Minute), + end: now.Add(-45 * time.Minute), + expectedPriority: 0, + }, + "should miss range query outside of end time": { + start: now.Add(-45 * time.Minute), + end: now.Add(-10 * time.Minute), + expectedPriority: 0, + }, + "should miss range query completely outside of end time": { + start: now.Add(-15 * time.Minute), + end: now.Add(-10 * time.Minute), + expectedPriority: 0, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + var url string + if !testData.time.IsZero() { + url = "/query?query=sum(up)&time=" + strconv.FormatInt(testData.time.Unix(), 10) + } else if !testData.start.IsZero() { + url = "/query_range?query=sum(up)&start=" + strconv.FormatInt(testData.start.Unix(), 10) + url += "&end=" + strconv.FormatInt(testData.end.Unix(), 10) + } else { + url = "/query?query=sum(up)" + } + req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader([]byte{})) + priority, err := GetPriority(req, "", limits, now) + assert.NoError(t, err) + assert.Equal(t, int64(testData.expectedPriority), priority) + }) + } +} + +func Test_GetPriorityShouldNotConsiderStartAndEndTimeIfEmpty(t *testing.T) { + now := time.Now() + limits := mockLimits{queryPriority: validation.QueryPriority{ + Enabled: true, + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + { + Regex: "^sum\\(up\\)$", + }, + }, + }, + }, + }} + + type testCase struct { + time time.Time + start time.Time + end time.Time + } + + tests := map[string]testCase{ + "should hit instant query with no time": {}, + "should hit instant query with future time": { + time: now.Add(1000000 * time.Hour), + }, + "should hit instant query with very old time": { + time: now.Add(-1000000 * time.Hour), + }, + "should hit range query with very wide time window": { + start: now.Add(-1000000 * time.Hour), + end: now.Add(1000000 * time.Hour), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + var url string + if !testData.time.IsZero() { + url = "/query?query=sum(up)&time=" + strconv.FormatInt(testData.time.Unix(), 10) + } else if !testData.start.IsZero() { + url = "/query_range?query=sum(up)&start=" + strconv.FormatInt(testData.start.Unix(), 10) + url += "&end=" + strconv.FormatInt(testData.end.Unix(), 10) + } else { + url = "/query?query=sum(up)" + } + req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader([]byte{})) + priority, err := GetPriority(req, "", limits, now) + assert.NoError(t, err) + assert.Equal(t, int64(1), priority) + }) + } +} diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index 1569ea2e3af..c1e5954421c 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -2,6 +2,7 @@ package queryrange import ( "context" + "github.com/cortexproject/cortex/pkg/util/validation" "testing" "time" @@ -219,6 +220,10 @@ func (m mockLimits) QueryVerticalShardSize(userID string) int { return 0 } +func (m mockLimits) QueryPriority(userID string) validation.QueryPriority { + return validation.QueryPriority{} +} + type mockHandler struct { mock.Mock } diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 6aefe4ccecd..6c7cd135086 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -19,6 +19,7 @@ import ( "context" "io" "net/http" + "strconv" "strings" "time" @@ -142,15 +143,27 @@ func NewQueryTripperware( if err != nil { return nil, err } + now := time.Now() userStr := tenant.JoinTenantIDs(tenantIDs) - activeUsers.UpdateUserTimestamp(userStr, time.Now()) + activeUsers.UpdateUserTimestamp(userStr, now) queriesPerTenant.WithLabelValues(op, userStr).Inc() - if maxSubQuerySteps > 0 && (isQuery || isQueryRange) { + if isQuery || isQueryRange { query := r.FormValue("query") - // Check subquery step size. - if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, maxSubQuerySteps); err != nil { - return nil, err + + if maxSubQuerySteps > 0 { + // Check subquery step size. + if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, maxSubQuerySteps); err != nil { + return nil, err + } + } + + if limits.QueryPriority(userStr).Enabled { + priority, err := GetPriority(r, userStr, limits, now) + if err != nil { + return nil, err + } + r.Header.Set(util.QueryPriorityHeaderKey, strconv.FormatInt(priority, 10)) } } diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index 657d7daa3a1..5cbad93ca81 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -21,6 +21,7 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/querysharding" + "github.com/cortexproject/cortex/pkg/util/validation" ) func TestQueryShardQuery(t *testing.T, instantQueryCodec Codec, shardedPrometheusCodec Codec) { @@ -466,6 +467,7 @@ type mockLimits struct { maxQueryLength time.Duration maxCacheFreshness time.Duration shardSize int + queryPriority validation.QueryPriority } func (m mockLimits) MaxQueryLookback(string) time.Duration { @@ -488,6 +490,10 @@ func (m mockLimits) QueryVerticalShardSize(userID string) int { return m.shardSize } +func (m mockLimits) QueryPriority(userID string) validation.QueryPriority { + return m.queryPriority +} + type singleHostRoundTripper struct { host string next http.RoundTripper diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 65235540a35..fa28485298f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "flag" "io" "net/http" + "strconv" "sync" "time" @@ -154,7 +155,6 @@ type schedulerRequest struct { queryID uint64 request *httpgrpc.HTTPRequest statsEnabled bool - priority int64 enqueueTime time.Time @@ -167,7 +167,12 @@ type schedulerRequest struct { } func (s schedulerRequest) Priority() int64 { - return s.priority + priority, err := strconv.ParseInt(httpgrpcutil.GetHeader(*s.request, util.QueryPriorityHeaderKey), 10, 64) + if err != nil { + return 0 + } + + return priority } // FrontendLoop handles connection from frontend. @@ -298,7 +303,6 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr queryID: msg.QueryID, request: msg.HttpRequest, statsEnabled: msg.StatsEnabled, - priority: msg.Priority, } now := time.Now() diff --git a/pkg/scheduler/schedulerpb/scheduler.pb.go b/pkg/scheduler/schedulerpb/scheduler.pb.go index 2f352a81383..d3288f95b39 100644 --- a/pkg/scheduler/schedulerpb/scheduler.pb.go +++ b/pkg/scheduler/schedulerpb/scheduler.pb.go @@ -219,7 +219,6 @@ type FrontendToScheduler struct { UserID string `protobuf:"bytes,4,opt,name=userID,proto3" json:"userID,omitempty"` HttpRequest *httpgrpc.HTTPRequest `protobuf:"bytes,5,opt,name=httpRequest,proto3" json:"httpRequest,omitempty"` StatsEnabled bool `protobuf:"varint,6,opt,name=statsEnabled,proto3" json:"statsEnabled,omitempty"` - Priority int64 `protobuf:"varint,7,opt,name=priority,proto3" json:"priority,omitempty"` } func (m *FrontendToScheduler) Reset() { *m = FrontendToScheduler{} } @@ -296,13 +295,6 @@ func (m *FrontendToScheduler) GetStatsEnabled() bool { return false } -func (m *FrontendToScheduler) GetPriority() int64 { - if m != nil { - return m.Priority - } - return 0 -} - type SchedulerToFrontend struct { Status SchedulerToFrontendStatus `protobuf:"varint,1,opt,name=status,proto3,enum=schedulerpb.SchedulerToFrontendStatus" json:"status,omitempty"` Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` @@ -446,49 +438,48 @@ func init() { func init() { proto.RegisterFile("scheduler.proto", fileDescriptor_2b3fc28395a6d9c5) } var fileDescriptor_2b3fc28395a6d9c5 = []byte{ - // 661 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xcd, 0x4e, 0xdb, 0x40, - 0x10, 0xf6, 0xe6, 0x0f, 0x98, 0xd0, 0xe2, 0x2e, 0xd0, 0xa6, 0x11, 0x5d, 0x2c, 0xab, 0xaa, 0x52, - 0x0e, 0x49, 0x95, 0x56, 0x6a, 0x0f, 0xa8, 0x52, 0x0a, 0xa6, 0x44, 0xa5, 0x0e, 0x6c, 0x1c, 0xf5, - 0xe7, 0x12, 0x91, 0x64, 0x49, 0x22, 0xc0, 0x6b, 0xd6, 0x76, 0x51, 0x6e, 0x7d, 0x84, 0x3e, 0x44, - 0x0f, 0x7d, 0x94, 0x5e, 0x2a, 0x71, 0xe4, 0xd0, 0x43, 0x31, 0x97, 0x1e, 0x79, 0x84, 0x2a, 0x8e, - 0xe3, 0x3a, 0x90, 0x00, 0xb7, 0x99, 0xf1, 0xf7, 0x79, 0xe7, 0xfb, 0x66, 0x76, 0x61, 0xce, 0x6e, - 0x76, 0x58, 0xcb, 0x3d, 0x60, 0x22, 0x6f, 0x09, 0xee, 0x70, 0x9c, 0x0e, 0x0b, 0x56, 0x23, 0xbb, - 0xd0, 0xe6, 0x6d, 0xee, 0xd7, 0x0b, 0xfd, 0x68, 0x00, 0xc9, 0xbe, 0x68, 0x77, 0x9d, 0x8e, 0xdb, - 0xc8, 0x37, 0xf9, 0x61, 0xe1, 0x98, 0xed, 0x7e, 0x61, 0xc7, 0x5c, 0xec, 0xdb, 0x85, 0x26, 0x3f, - 0x3c, 0xe4, 0x66, 0xa1, 0xe3, 0x38, 0x56, 0x5b, 0x58, 0xcd, 0x30, 0x18, 0xb0, 0xd4, 0x22, 0xe0, - 0x1d, 0x97, 0x89, 0x2e, 0x13, 0x06, 0xaf, 0x0e, 0xcf, 0xc0, 0x4b, 0x30, 0x73, 0x34, 0xa8, 0x96, - 0xd7, 0x33, 0x48, 0x41, 0xb9, 0x19, 0xfa, 0xbf, 0xa0, 0xfe, 0x42, 0x80, 0x43, 0xac, 0xc1, 0x03, - 0x3e, 0xce, 0xc0, 0x54, 0x1f, 0xd3, 0x0b, 0x28, 0x09, 0x3a, 0x4c, 0xf1, 0x4b, 0x48, 0xf7, 0x8f, - 0xa5, 0xec, 0xc8, 0x65, 0xb6, 0x93, 0x89, 0x29, 0x28, 0x97, 0x2e, 0x2e, 0xe6, 0xc3, 0x56, 0x36, - 0x0d, 0x63, 0x3b, 0xf8, 0x48, 0xa3, 0x48, 0x9c, 0x83, 0xb9, 0x3d, 0xc1, 0x4d, 0x87, 0x99, 0xad, - 0x52, 0xab, 0x25, 0x98, 0x6d, 0x67, 0xe2, 0x7e, 0x37, 0x97, 0xcb, 0xf8, 0x3e, 0xa4, 0x5c, 0xdb, - 0x6f, 0x37, 0xe1, 0x03, 0x82, 0x0c, 0xab, 0x30, 0x6b, 0x3b, 0xbb, 0x8e, 0xad, 0x99, 0xbb, 0x8d, - 0x03, 0xd6, 0xca, 0x24, 0x15, 0x94, 0x9b, 0xa6, 0x23, 0x35, 0xf5, 0x7b, 0x0c, 0xe6, 0x37, 0x82, - 0xff, 0x45, 0x5d, 0x78, 0x05, 0x09, 0xa7, 0x67, 0x31, 0x5f, 0xcd, 0xdd, 0xe2, 0xe3, 0x7c, 0x64, - 0x06, 0xf9, 0x31, 0x78, 0xa3, 0x67, 0x31, 0xea, 0x33, 0xc6, 0xf5, 0x1d, 0x1b, 0xdf, 0x77, 0xc4, - 0xb4, 0xf8, 0xa8, 0x69, 0x93, 0x14, 0x5d, 0x32, 0x33, 0x79, 0x6b, 0x33, 0x2f, 0x5b, 0x91, 0xba, - 0x6a, 0x05, 0xce, 0xc2, 0xb4, 0x25, 0xba, 0x5c, 0x74, 0x9d, 0x5e, 0x66, 0x4a, 0x41, 0xb9, 0x38, - 0x0d, 0x73, 0x75, 0x1f, 0xe6, 0x23, 0x53, 0x1f, 0x1a, 0x80, 0x5f, 0x43, 0xaa, 0xff, 0x0b, 0xd7, - 0x0e, 0x7c, 0x7a, 0x32, 0xe2, 0xd3, 0x18, 0x46, 0xd5, 0x47, 0xd3, 0x80, 0x85, 0x17, 0x20, 0xc9, - 0x84, 0xe0, 0x22, 0x70, 0x68, 0x90, 0xa8, 0xab, 0xb0, 0xa4, 0x73, 0xa7, 0xbb, 0xd7, 0x0b, 0xb6, - 0xab, 0xda, 0x71, 0x9d, 0x16, 0x3f, 0x36, 0x87, 0x62, 0xae, 0xdf, 0xd0, 0x65, 0x78, 0x34, 0x81, - 0x6d, 0x5b, 0xdc, 0xb4, 0xd9, 0xca, 0x2a, 0x3c, 0x98, 0x30, 0x41, 0x3c, 0x0d, 0x89, 0xb2, 0x5e, - 0x36, 0x64, 0x09, 0xa7, 0x61, 0x4a, 0xd3, 0x77, 0x6a, 0x5a, 0x4d, 0x93, 0x11, 0x06, 0x48, 0xad, - 0x95, 0xf4, 0x35, 0x6d, 0x4b, 0x8e, 0xad, 0x34, 0xe1, 0xe1, 0x44, 0x5d, 0x38, 0x05, 0xb1, 0xca, - 0x3b, 0x59, 0xc2, 0x0a, 0x2c, 0x19, 0x95, 0x4a, 0xfd, 0x7d, 0x49, 0xff, 0x54, 0xa7, 0xda, 0x4e, - 0x4d, 0xab, 0x1a, 0xd5, 0xfa, 0xb6, 0x46, 0xeb, 0x86, 0xa6, 0x97, 0x74, 0x43, 0x46, 0x78, 0x06, - 0x92, 0x1a, 0xa5, 0x15, 0x2a, 0xc7, 0xf0, 0x3d, 0xb8, 0x53, 0xdd, 0xac, 0x19, 0x46, 0x59, 0x7f, - 0x5b, 0x5f, 0xaf, 0x7c, 0xd0, 0xe5, 0x78, 0xf1, 0x37, 0x8a, 0xf8, 0xbd, 0xc1, 0xc5, 0xf0, 0x9a, - 0xd5, 0x20, 0x1d, 0x84, 0x5b, 0x9c, 0x5b, 0x78, 0x79, 0xc4, 0xee, 0xab, 0x77, 0x39, 0xbb, 0x3c, - 0x69, 0x1e, 0x01, 0x56, 0x95, 0x72, 0xe8, 0x19, 0xc2, 0x26, 0x2c, 0x8e, 0xb5, 0x0c, 0x3f, 0x1d, - 0xe1, 0x5f, 0x37, 0x94, 0xec, 0xca, 0x6d, 0xa0, 0x83, 0x09, 0x14, 0x2d, 0x58, 0x88, 0xaa, 0x0b, - 0xd7, 0xe9, 0x23, 0xcc, 0x0e, 0x63, 0x5f, 0x9f, 0x72, 0xd3, 0xb5, 0xcb, 0x2a, 0x37, 0x2d, 0xdc, - 0x40, 0xe1, 0x9b, 0xd2, 0xc9, 0x19, 0x91, 0x4e, 0xcf, 0x88, 0x74, 0x71, 0x46, 0xd0, 0x57, 0x8f, - 0xa0, 0x1f, 0x1e, 0x41, 0x3f, 0x3d, 0x82, 0x4e, 0x3c, 0x82, 0xfe, 0x78, 0x04, 0xfd, 0xf5, 0x88, - 0x74, 0xe1, 0x11, 0xf4, 0xed, 0x9c, 0x48, 0x27, 0xe7, 0x44, 0x3a, 0x3d, 0x27, 0xd2, 0xe7, 0xe8, - 0xcb, 0xdb, 0x48, 0xf9, 0x8f, 0xe6, 0xf3, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x83, 0xb8, 0xa1, - 0x26, 0xa0, 0x05, 0x00, 0x00, + // 644 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x4f, 0x4f, 0xdb, 0x4e, + 0x10, 0xf5, 0x86, 0x24, 0xc0, 0x84, 0xdf, 0x0f, 0x77, 0x81, 0x36, 0x8d, 0xe8, 0x12, 0x45, 0x55, + 0x95, 0x72, 0x48, 0xaa, 0xb4, 0x52, 0x7b, 0x40, 0x95, 0x52, 0x30, 0x25, 0x2a, 0x75, 0x60, 0xb3, + 0x51, 0xff, 0x5c, 0x22, 0x92, 0x2c, 0x09, 0x02, 0xbc, 0x66, 0x6d, 0x17, 0xe5, 0xd6, 0x63, 0x8f, + 0xfd, 0x18, 0xfd, 0x28, 0xbd, 0x54, 0xe2, 0xc8, 0xa1, 0x87, 0x62, 0x2e, 0x3d, 0xf2, 0x11, 0xaa, + 0x38, 0x76, 0xea, 0xa4, 0x0e, 0x70, 0x9b, 0x1d, 0xbf, 0xe7, 0x9d, 0xf7, 0x66, 0x66, 0x61, 0xde, + 0x6a, 0x75, 0x79, 0xdb, 0x39, 0xe2, 0xb2, 0x60, 0x4a, 0x61, 0x0b, 0x9c, 0x1a, 0x26, 0xcc, 0x66, + 0x66, 0xb1, 0x23, 0x3a, 0xc2, 0xcb, 0x17, 0xfb, 0xd1, 0x00, 0x92, 0x79, 0xd6, 0x39, 0xb0, 0xbb, + 0x4e, 0xb3, 0xd0, 0x12, 0xc7, 0xc5, 0x53, 0xbe, 0xf7, 0x89, 0x9f, 0x0a, 0x79, 0x68, 0x15, 0x5b, + 0xe2, 0xf8, 0x58, 0x18, 0xc5, 0xae, 0x6d, 0x9b, 0x1d, 0x69, 0xb6, 0x86, 0xc1, 0x80, 0x95, 0x2b, + 0x01, 0xde, 0x75, 0xb8, 0x3c, 0xe0, 0x92, 0x89, 0x5a, 0x70, 0x07, 0x5e, 0x86, 0xd9, 0x93, 0x41, + 0xb6, 0xb2, 0x91, 0x46, 0x59, 0x94, 0x9f, 0xa5, 0x7f, 0x13, 0xb9, 0x1f, 0x08, 0xf0, 0x10, 0xcb, + 0x84, 0xcf, 0xc7, 0x69, 0x98, 0xee, 0x63, 0x7a, 0x3e, 0x25, 0x4e, 0x83, 0x23, 0x7e, 0x0e, 0xa9, + 0xfe, 0xb5, 0x94, 0x9f, 0x38, 0xdc, 0xb2, 0xd3, 0xb1, 0x2c, 0xca, 0xa7, 0x4a, 0x4b, 0x85, 0x61, + 0x29, 0x5b, 0x8c, 0xed, 0xf8, 0x1f, 0x69, 0x18, 0x89, 0xf3, 0x30, 0xbf, 0x2f, 0x85, 0x61, 0x73, + 0xa3, 0x5d, 0x6e, 0xb7, 0x25, 0xb7, 0xac, 0xf4, 0x94, 0x57, 0xcd, 0x78, 0x1a, 0xdf, 0x85, 0xa4, + 0x63, 0x79, 0xe5, 0xc6, 0x3d, 0x80, 0x7f, 0xc2, 0x39, 0x98, 0xb3, 0xec, 0x3d, 0xdb, 0xd2, 0x8c, + 0xbd, 0xe6, 0x11, 0x6f, 0xa7, 0x13, 0x59, 0x94, 0x9f, 0xa1, 0x23, 0xb9, 0xdc, 0x97, 0x18, 0x2c, + 0x6c, 0xfa, 0xff, 0x0b, 0xbb, 0xf0, 0x02, 0xe2, 0x76, 0xcf, 0xe4, 0x9e, 0x9a, 0xff, 0x4b, 0x0f, + 0x0b, 0xa1, 0x1e, 0x14, 0x22, 0xf0, 0xac, 0x67, 0x72, 0xea, 0x31, 0xa2, 0xea, 0x8e, 0x45, 0xd7, + 0x1d, 0x32, 0x6d, 0x6a, 0xd4, 0xb4, 0x49, 0x8a, 0xc6, 0xcc, 0x4c, 0xdc, 0xda, 0xcc, 0x71, 0x2b, + 0x92, 0x11, 0x56, 0x1c, 0xc2, 0x42, 0xa8, 0xb3, 0x81, 0x48, 0xfc, 0x12, 0x92, 0x7d, 0x98, 0x63, + 0xf9, 0x5e, 0x3c, 0x1a, 0xf1, 0x22, 0x82, 0x51, 0xf3, 0xd0, 0xd4, 0x67, 0xe1, 0x45, 0x48, 0x70, + 0x29, 0x85, 0xf4, 0x5d, 0x18, 0x1c, 0x72, 0x6b, 0xb0, 0xac, 0x0b, 0xfb, 0x60, 0xbf, 0xe7, 0x4f, + 0x50, 0xad, 0xeb, 0xd8, 0x6d, 0x71, 0x6a, 0x04, 0x05, 0x5f, 0x3f, 0x85, 0x2b, 0xf0, 0x60, 0x02, + 0xdb, 0x32, 0x85, 0x61, 0xf1, 0xd5, 0x35, 0xb8, 0x37, 0xa1, 0x4b, 0x78, 0x06, 0xe2, 0x15, 0xbd, + 0xc2, 0x54, 0x05, 0xa7, 0x60, 0x5a, 0xd3, 0x77, 0xeb, 0x5a, 0x5d, 0x53, 0x11, 0x06, 0x48, 0xae, + 0x97, 0xf5, 0x75, 0x6d, 0x5b, 0x8d, 0xad, 0xb6, 0xe0, 0xfe, 0x44, 0x5d, 0x38, 0x09, 0xb1, 0xea, + 0x1b, 0x55, 0xc1, 0x59, 0x58, 0x66, 0xd5, 0x6a, 0xe3, 0x6d, 0x59, 0xff, 0xd0, 0xa0, 0xda, 0x6e, + 0x5d, 0xab, 0xb1, 0x5a, 0x63, 0x47, 0xa3, 0x0d, 0xa6, 0xe9, 0x65, 0x9d, 0xa9, 0x08, 0xcf, 0x42, + 0x42, 0xa3, 0xb4, 0x4a, 0xd5, 0x18, 0xbe, 0x03, 0xff, 0xd5, 0xb6, 0xea, 0x8c, 0x55, 0xf4, 0xd7, + 0x8d, 0x8d, 0xea, 0x3b, 0x5d, 0x9d, 0x2a, 0xfd, 0x44, 0x21, 0xbf, 0x37, 0x85, 0x0c, 0x56, 0xa9, + 0x0e, 0x29, 0x3f, 0xdc, 0x16, 0xc2, 0xc4, 0x2b, 0x23, 0x76, 0xff, 0xbb, 0xaf, 0x99, 0x95, 0x49, + 0xfd, 0xf0, 0xb1, 0x39, 0x25, 0x8f, 0x9e, 0x20, 0x6c, 0xc0, 0x52, 0xa4, 0x65, 0xf8, 0xf1, 0x08, + 0xff, 0xba, 0xa6, 0x64, 0x56, 0x6f, 0x03, 0x1d, 0x74, 0xa0, 0x64, 0xc2, 0x62, 0x58, 0xdd, 0x70, + 0x9c, 0xde, 0xc3, 0x5c, 0x10, 0x7b, 0xfa, 0xb2, 0x37, 0xad, 0x56, 0x26, 0x7b, 0xd3, 0xc0, 0x0d, + 0x14, 0xbe, 0x2a, 0x9f, 0x5d, 0x10, 0xe5, 0xfc, 0x82, 0x28, 0x57, 0x17, 0x04, 0x7d, 0x76, 0x09, + 0xfa, 0xe6, 0x12, 0xf4, 0xdd, 0x25, 0xe8, 0xcc, 0x25, 0xe8, 0x97, 0x4b, 0xd0, 0x6f, 0x97, 0x28, + 0x57, 0x2e, 0x41, 0x5f, 0x2f, 0x89, 0x72, 0x76, 0x49, 0x94, 0xf3, 0x4b, 0xa2, 0x7c, 0x0c, 0xbf, + 0xae, 0xcd, 0xa4, 0xf7, 0x30, 0x3e, 0xfd, 0x13, 0x00, 0x00, 0xff, 0xff, 0x88, 0x0c, 0xfe, 0x56, + 0x84, 0x05, 0x00, 0x00, } func (x FrontendToSchedulerType) String() string { @@ -602,9 +593,6 @@ func (this *FrontendToScheduler) Equal(that interface{}) bool { if this.StatsEnabled != that1.StatsEnabled { return false } - if this.Priority != that1.Priority { - return false - } return true } func (this *SchedulerToFrontend) Equal(that interface{}) bool { @@ -709,7 +697,7 @@ func (this *FrontendToScheduler) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 11) + s := make([]string, 0, 10) s = append(s, "&schedulerpb.FrontendToScheduler{") s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") s = append(s, "FrontendAddress: "+fmt.Sprintf("%#v", this.FrontendAddress)+",\n") @@ -719,7 +707,6 @@ func (this *FrontendToScheduler) GoString() string { s = append(s, "HttpRequest: "+fmt.Sprintf("%#v", this.HttpRequest)+",\n") } s = append(s, "StatsEnabled: "+fmt.Sprintf("%#v", this.StatsEnabled)+",\n") - s = append(s, "Priority: "+fmt.Sprintf("%#v", this.Priority)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1155,11 +1142,6 @@ func (m *FrontendToScheduler) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.Priority != 0 { - i = encodeVarintScheduler(dAtA, i, uint64(m.Priority)) - i-- - dAtA[i] = 0x38 - } if m.StatsEnabled { i-- if m.StatsEnabled { @@ -1375,9 +1357,6 @@ func (m *FrontendToScheduler) Size() (n int) { if m.StatsEnabled { n += 2 } - if m.Priority != 0 { - n += 1 + sovScheduler(uint64(m.Priority)) - } return n } @@ -1460,7 +1439,6 @@ func (this *FrontendToScheduler) String() string { `UserID:` + fmt.Sprintf("%v", this.UserID) + `,`, `HttpRequest:` + strings.Replace(fmt.Sprintf("%v", this.HttpRequest), "HTTPRequest", "httpgrpc.HTTPRequest", 1) + `,`, `StatsEnabled:` + fmt.Sprintf("%v", this.StatsEnabled) + `,`, - `Priority:` + fmt.Sprintf("%v", this.Priority) + `,`, `}`, }, "") return s @@ -1967,25 +1945,6 @@ func (m *FrontendToScheduler) Unmarshal(dAtA []byte) error { } } m.StatsEnabled = bool(v != 0) - case 7: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Priority", wireType) - } - m.Priority = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowScheduler - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Priority |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } default: iNdEx = preIndex skippy, err := skipScheduler(dAtA[iNdEx:]) diff --git a/pkg/scheduler/schedulerpb/scheduler.proto b/pkg/scheduler/schedulerpb/scheduler.proto index 706c34de4fd..eea28717b83 100644 --- a/pkg/scheduler/schedulerpb/scheduler.proto +++ b/pkg/scheduler/schedulerpb/scheduler.proto @@ -78,7 +78,6 @@ message FrontendToScheduler { string userID = 4; httpgrpc.HTTPRequest httpRequest = 5; bool statsEnabled = 6; - int64 priority = 7; } enum SchedulerToFrontendStatus { diff --git a/pkg/util/http.go b/pkg/util/http.go index 09fb3df38c1..41daae0fc65 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -21,6 +21,7 @@ import ( yamlv3 "gopkg.in/yaml.v3" ) +const QueryPriorityHeaderKey = "X-Cortex-Query-Priority" const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)" // IsRequestBodyTooLarge returns true if the error is "http: request body too large". diff --git a/pkg/util/httpgrpcutil/header.go b/pkg/util/httpgrpcutil/header.go new file mode 100644 index 00000000000..b844e0c65f8 --- /dev/null +++ b/pkg/util/httpgrpcutil/header.go @@ -0,0 +1,22 @@ +package httpgrpcutil + +import ( + "github.com/weaveworks/common/httpgrpc" +) + +// GetHeader is similar to http.Header.Get, which gets the first value associated with the given key. +// If there are no values associated with the key, it returns "". +func GetHeader(r httpgrpc.HTTPRequest, key string) string { + return GetHeaderValues(r, key)[0] +} + +// GetHeaderValues is similar to http.Header.Values, which returns all values associated with the given key. +func GetHeaderValues(r httpgrpc.HTTPRequest, key string) []string { + for _, header := range r.Headers { + if header.GetKey() == key { + return header.GetValues() + } + } + + return []string{} +} diff --git a/pkg/util/query/priority.go b/pkg/util/query/priority.go deleted file mode 100644 index 2a7f61ba108..00000000000 --- a/pkg/util/query/priority.go +++ /dev/null @@ -1,74 +0,0 @@ -package query - -import ( - "net/url" - "time" - - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/validation" -) - -func GetPriority(requestParams url.Values, now time.Time, queryPriority validation.QueryPriority) int64 { - queryParam := requestParams.Get("query") - timeParam := requestParams.Get("time") - startParam := requestParams.Get("start") - endParam := requestParams.Get("end") - - if queryParam == "" || !queryPriority.Enabled { - return queryPriority.DefaultPriority - } - - for _, priority := range queryPriority.Priorities { - for _, attribute := range priority.QueryAttributes { - if attribute.CompiledRegex != nil && !attribute.CompiledRegex.MatchString(queryParam) { - continue - } - - if startTime, err := util.ParseTime(startParam); err == nil { - if endTime, err := util.ParseTime(endParam); err == nil { - if isWithinTimeAttributes(attribute, now, startTime, endTime) { - return priority.Priority - } - } - } - - if instantTime, err := util.ParseTime(timeParam); err == nil { - if isWithinTimeAttributes(attribute, now, instantTime, instantTime) { - return priority.Priority - } - } - - if timeParam == "" { - if isWithinTimeAttributes(attribute, now, util.TimeToMillis(now), util.TimeToMillis(now)) { - return priority.Priority - } - } - } - } - - return queryPriority.DefaultPriority -} - -func isWithinTimeAttributes(attribute validation.QueryAttribute, now time.Time, startTime, endTime int64) bool { - if attribute.StartTime == 0 && attribute.EndTime == 0 { - return true - } - - if attribute.StartTime != 0 { - startTimeThreshold := now.Add(-1 * time.Duration(attribute.StartTime).Abs()).Truncate(time.Second) - - if util.TimeFromMillis(startTime).Before(startTimeThreshold) { - return false - } - } - - if attribute.EndTime != 0 { - endTimeThreshold := now.Add(-1 * time.Duration(attribute.EndTime).Abs()).Add(1 * time.Second).Truncate(time.Second) - - if util.TimeFromMillis(endTime).After(endTimeThreshold) { - return false - } - } - - return true -} diff --git a/pkg/util/query/priority_test.go b/pkg/util/query/priority_test.go deleted file mode 100644 index fea552cde49..00000000000 --- a/pkg/util/query/priority_test.go +++ /dev/null @@ -1,224 +0,0 @@ -package query - -import ( - "net/url" - "regexp" - "strconv" - "testing" - "time" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - - "github.com/cortexproject/cortex/pkg/util/validation" -) - -func Test_GetPriorityShouldReturnDefaultPriorityIfNotEnabledOrEmptyQueryString(t *testing.T) { - now := time.Now() - priorities := []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - { - Regex: ".*", - CompiledRegex: regexp.MustCompile(".*"), - }, - }, - }, - } - queryPriority := validation.QueryPriority{ - Priorities: priorities, - } - - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - - queryPriority.Enabled = true - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{""}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) -} - -func Test_GetPriorityShouldConsiderRegex(t *testing.T) { - now := time.Now() - priorities := []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - { - Regex: "sum", - CompiledRegex: regexp.MustCompile("sum"), - }, - }, - }, - } - queryPriority := validation.QueryPriority{ - Enabled: true, - Priorities: priorities, - } - - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - - queryPriority.Priorities[0].QueryAttributes[0].Regex = "(^sum$|c(.+)t)" - queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile("(^sum$|c(.+)t)") - - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - - queryPriority.Priorities[0].QueryAttributes[0].Regex = ".*" - queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile(".*") - - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - - queryPriority.Priorities[0].QueryAttributes[0].Regex = ".+" - queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile(".+") - - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - - queryPriority.Priorities[0].QueryAttributes[0].Regex = "" - queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile("") - - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"count(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) -} - -func Test_GetPriorityShouldConsiderStartAndEndTime(t *testing.T) { - now := time.Now() - priorities := []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - { - StartTime: model.Duration(45 * time.Minute), - EndTime: model.Duration(15 * time.Minute), - }, - }, - }, - } - queryPriority := validation.QueryPriority{ - Enabled: true, - Priorities: priorities, - } - - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Add(-30*time.Minute).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Add(-60*time.Minute).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Add(-45*time.Minute).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "time": []string{strconv.FormatInt(now.Add(-15*time.Minute).Unix(), 10)}, - }, now, queryPriority)) - - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "start": []string{strconv.FormatInt(now.Add(-45*time.Minute).Unix(), 10)}, - "end": []string{strconv.FormatInt(now.Add(-15*time.Minute).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "start": []string{strconv.FormatInt(now.Add(-50*time.Minute).Unix(), 10)}, - "end": []string{strconv.FormatInt(now.Add(-15*time.Minute).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "start": []string{strconv.FormatInt(now.Add(-45*time.Minute).Unix(), 10)}, - "end": []string{strconv.FormatInt(now.Add(-10*time.Minute).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "start": []string{strconv.FormatInt(now.Add(-60*time.Minute).Unix(), 10)}, - "end": []string{strconv.FormatInt(now.Add(-45*time.Minute).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(0), GetPriority(url.Values{ - "query": []string{"sum(up)"}, - "start": []string{strconv.FormatInt(now.Add(-15*time.Minute).Unix(), 10)}, - "end": []string{strconv.FormatInt(now.Add(-1*time.Minute).Unix(), 10)}, - }, now, queryPriority)) -} - -func Test_GetPriorityShouldSKipStartAndEndTimeIfEmpty(t *testing.T) { - now := time.Now() - priorities := []validation.PriorityDef{ - { - Priority: 1, - QueryAttributes: []validation.QueryAttribute{ - { - Regex: "^test$", - }, - }, - }, - } - queryPriority := validation.QueryPriority{ - Enabled: true, - Priorities: priorities, - } - - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"test"}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"test"}, - "time": []string{strconv.FormatInt(now.Add(8760*time.Hour).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"test"}, - "time": []string{strconv.FormatInt(now.Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"test"}, - "time": []string{strconv.FormatInt(now.Add(-8760*time.Hour).Unix(), 10)}, - }, now, queryPriority)) - assert.Equal(t, int64(1), GetPriority(url.Values{ - "query": []string{"test"}, - "start": []string{strconv.FormatInt(now.Add(-100000*time.Minute).Unix(), 10)}, - "end": []string{strconv.FormatInt(now.Add(100000*time.Minute).Unix(), 10)}, - }, now, queryPriority)) -} diff --git a/pkg/util/time.go b/pkg/util/time.go index 8816b1d7d2e..a28c84b0462 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" ) @@ -48,6 +49,19 @@ func ParseTime(s string) (int64, error) { return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s) } +// ParseTimeParam parses the time request parameter into an int64, milliseconds since epoch. +func ParseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) { + val := r.FormValue(paramName) + if val == "" { + val = strconv.FormatInt(defaultValue, 10) + } + result, err := ParseTime(val) + if err != nil { + return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName) + } + return result, nil +} + // DurationWithJitter returns random duration from "input - input*variance" to "input + input*variance" interval. func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration { // No duration? No jitter. diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index b9b005d0df6..cd65e875da4 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -57,13 +57,13 @@ type QueryPriority struct { type PriorityDef struct { Priority int64 `yaml:"priority" json:"priority" doc:"nocli|description=Priority level. Must be a unique value.|default=0"` - ReservedQueriers float64 `yaml:"reserved_queriers" json:"reserved_queriers" doc:"nocli|description=Number of reserved queriers to handle priorities higher or equal to this value only. Value between 0 and 1 will be used as a percentage.|default=0"` + ReservedQueriers float64 `yaml:"reserved_queriers" json:"reserved_queriers" doc:"nocli|description=Number of reserved queriers to handle priorities higher or equal to the priority level. Value between 0 and 1 will be used as a percentage.|default=0"` QueryAttributes []QueryAttribute `yaml:"query_attributes" json:"query_attributes" doc:"nocli|description=List of query attributes to assign the priority."` } type QueryAttribute struct { - Regex string `yaml:"regex" json:"regex" doc:"nocli|description=Query string regex. If set to empty string, it will not match anything.|default=\"\""` - StartTime model.Duration `yaml:"start_time" json:"start_time" doc:"nocli|description=Query start time. If set to 0, the start time won't be checked.'.|default=0"` + Regex string `yaml:"regex" json:"regex" doc:"nocli|description=Query string regex. If set to empty string, it will not match anything."` + StartTime model.Duration `yaml:"start_time" json:"start_time" doc:"nocli|description=Query start time. If set to 0, the start time won't be checked.|default=0"` EndTime model.Duration `yaml:"end_time" json:"end_time" doc:"nocli|description=Query end time. If set to 0, the end time won't be checked.|default=0"` CompiledRegex *regexp.Regexp } diff --git a/tools/doc-generator/parser.go b/tools/doc-generator/parser.go index 7a6dbfebfb7..28ee31ba4f8 100644 --- a/tools/doc-generator/parser.go +++ b/tools/doc-generator/parser.go @@ -409,6 +409,10 @@ func getCustomFieldEntry(parent reflect.Type, field reflect.StructField, fieldVa return nil, err } + if fieldFlag == nil { + return nil, nil + } + return &configEntry{ kind: "field", name: getFieldName(field),