Skip to content

Commit

Permalink
Make reserved querier to match exact priority + change query length g…
Browse files Browse the repository at this point in the history
…auge vector in user request queue

Signed-off-by: Justin Jung <[email protected]>
  • Loading branch information
justinjung04 committed Nov 15, 2023
1 parent cd1c357 commit 5d0746f
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 55 deletions.
32 changes: 5 additions & 27 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ type RequestQueue struct {
queues *queues
stopped bool

queueLength *prometheus.GaugeVec // Per user, priority and type of the queue (fifo or priority).
totalRequests *prometheus.CounterVec // Per user and priority.
discardedRequests *prometheus.CounterVec // Per user and priority.
}

func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits, registerer prometheus.Registerer) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits),
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits, queueLength),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
totalRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_request_queue_requests_total",
Help: "Total number of query requests going to the request queue.",
Expand Down Expand Up @@ -108,10 +106,6 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers fl
return errors.New("no queue found")
}

queueType := "fifo"
if q.queues.limits.QueryPriority(userID).Enabled {
queueType = "priority"
}
metricLabels := prometheus.Labels{
"user": userID,
"priority": priority,
Expand All @@ -124,11 +118,6 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers fl
}

queue.enqueueRequest(req)
q.queueLength.With(prometheus.Labels{
"user": userID,
"priority": priority,
"type": queueType,
}).Inc()
q.cond.Broadcast()
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
if successFn != nil {
Expand Down Expand Up @@ -170,10 +159,10 @@ FindQueue:

// Pick next request from the queue.
for {
minPriority, checkMinPriority := q.getMinPriorityForQuerier(userID, querierID)
request := queue.dequeueRequest(minPriority, checkMinPriority)
priority, matchPriority := q.getPriorityForQuerier(userID, querierID)
request := queue.dequeueRequest(priority, matchPriority)
if request == nil {
// the queue does not contain request with the min priority, break to wait for more requests
// the queue does not contain request with the min priority, wait for more requests
querierWait = true
goto FindQueue
}
Expand All @@ -182,17 +171,6 @@ FindQueue:
q.queues.deleteQueue(userID)
}

queueType := "fifo"
if q.queues.limits.QueryPriority(userID).Enabled {
queueType = "priority"
}
metricLabels := prometheus.Labels{
"user": userID,
"priority": strconv.FormatInt(request.Priority(), 10),
"type": queueType,
}
q.queueLength.With(metricLabels).Dec()

// Tell close() we've processed a request.
q.cond.Broadcast()

Expand All @@ -206,7 +184,7 @@ FindQueue:
goto FindQueue
}

func (q *RequestQueue) getMinPriorityForQuerier(userID string, querierID string) (int64, bool) {
func (q *RequestQueue) getPriorityForQuerier(userID string, querierID string) (int64, bool) {
if priority, ok := q.queues.userQueues[userID].reservedQueriers[querierID]; ok {
return priority, true
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) {

assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {}))
assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {}))
assert.NoError(t, queue.EnqueueRequest("userID", priority2Request, 1, func() {}))
assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {}))

nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1")
assert.Equal(t, priority2Request, nextRequest)
assert.Equal(t, priority1Request, nextRequest)

nextRequest, _, _ = queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1")
assert.Equal(t, priority1Request, nextRequest)
Expand All @@ -243,6 +243,18 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) {
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1")
assert.Nil(t, nextRequest)
assert.Equal(t, 1, queue.queues.userQueues["userID"].queue.length())

assert.NoError(t, queue.EnqueueRequest("userID", priority2Request, 1, func() {}))

ctxTimeout, cancel = context.WithTimeout(ctx, 1*time.Second)
defer cancel()

time.AfterFunc(2*time.Second, func() {
queue.cond.Broadcast()
})
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1")
assert.Nil(t, nextRequest)
assert.Equal(t, 2, queue.queues.userQueues["userID"].queue.length())
}

func TestExitingRequestsShouldPersistEvenIfTheConfigHasChanged(t *testing.T) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/scheduler/queue/user_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"sort"
"time"

"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// Limits needed for the Query Scheduler - interface used for decoupling.
Expand Down Expand Up @@ -57,6 +58,8 @@ type queues struct {
sortedQueriers []string

limits Limits

queueLength *prometheus.GaugeVec // Per user, type and priority.
}

type userQueue struct {
Expand All @@ -79,7 +82,7 @@ type userQueue struct {
index int
}

func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits) *queues {
func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits, queueLength *prometheus.GaugeVec) *queues {
return &queues{
userQueues: map[string]*userQueue{},
users: nil,
Expand All @@ -88,6 +91,7 @@ func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limit
queriers: map[string]*querier{},
sortedQueriers: nil,
limits: limits,
queueLength: queueLength,
}
}

Expand Down Expand Up @@ -195,7 +199,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue

func (q *queues) createUserRequestQueue(userID string) userRequestQueue {
if q.limits.QueryPriority(userID).Enabled {
return NewPriorityRequestQueue(util.NewPriorityQueue(nil))
return NewPriorityRequestQueue(util.NewPriorityQueue(nil), userID, q.queueLength)
}

queueSize := q.limits.MaxOutstandingPerTenant(userID)
Expand All @@ -206,7 +210,7 @@ func (q *queues) createUserRequestQueue(userID string) userRequestQueue {
queueSize = q.maxUserQueueSize
}

return NewFIFORequestQueue(make(chan Request, queueSize))
return NewFIFORequestQueue(make(chan Request, queueSize), userID, q.queueLength)
}

// Finds next queue for the querier. To support fair scheduling between users, client is expected
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/queue/user_queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func TestQueues(t *testing.T) {
uq := newUserQueues(0, 0, MockLimits{})
uq := newUserQueues(0, 0, MockLimits{}, nil)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -70,7 +70,7 @@ func TestQueues(t *testing.T) {
}

func TestQueuesWithQueriers(t *testing.T) {
uq := newUserQueues(0, 0, MockLimits{})
uq := newUserQueues(0, 0, MockLimits{}, nil)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -147,7 +147,7 @@ func TestQueuesConsistency(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
uq := newUserQueues(0, testData.forgetDelay, MockLimits{})
uq := newUserQueues(0, testData.forgetDelay, MockLimits{}, nil)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -196,7 +196,7 @@ func TestQueues_ForgetDelay(t *testing.T) {
)

now := time.Now()
uq := newUserQueues(0, forgetDelay, MockLimits{})
uq := newUserQueues(0, forgetDelay, MockLimits{}, nil)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -288,7 +288,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
)

now := time.Now()
uq := newUserQueues(0, forgetDelay, MockLimits{})
uq := newUserQueues(0, forgetDelay, MockLimits{}, nil)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))

Expand Down Expand Up @@ -357,7 +357,7 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) {
limits := MockLimits{
MaxOutstanding: 3,
}
q := newUserQueues(0, 0, limits)
q := newUserQueues(0, 0, limits, nil)
q.addQuerierConnection("q-1")
q.addQuerierConnection("q-2")
q.addQuerierConnection("q-3")
Expand Down
64 changes: 52 additions & 12 deletions pkg/scheduler/queue/user_request_queue.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,90 @@
package queue

import "github.com/cortexproject/cortex/pkg/util"
import (
"strconv"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/util"
)

type userRequestQueue interface {
enqueueRequest(Request)
dequeueRequest(minPriority int64, checkMinPriority bool) Request
dequeueRequest(int64, bool) Request
length() int
}

type FIFORequestQueue struct {
queue chan Request
queue chan Request
userID string
queueLength *prometheus.GaugeVec
}

func NewFIFORequestQueue(queue chan Request) *FIFORequestQueue {
return &FIFORequestQueue{queue: queue}
func NewFIFORequestQueue(queue chan Request, userID string, queueLength *prometheus.GaugeVec) *FIFORequestQueue {
return &FIFORequestQueue{queue: queue, userID: userID, queueLength: queueLength}
}

func (f *FIFORequestQueue) enqueueRequest(r Request) {
f.queue <- r
if f.queueLength != nil {
f.queueLength.With(prometheus.Labels{
"user": f.userID,
"priority": strconv.FormatInt(r.Priority(), 10),
"type": "fifo",
}).Inc()
}
}

func (f *FIFORequestQueue) dequeueRequest(_ int64, _ bool) Request {
return <-f.queue
r := <-f.queue
if f.queueLength != nil {
f.queueLength.With(prometheus.Labels{
"user": f.userID,
"priority": strconv.FormatInt(r.Priority(), 10),
"type": "fifo",
}).Dec()
}
return r
}

func (f *FIFORequestQueue) length() int {
return len(f.queue)
}

type PriorityRequestQueue struct {
queue *util.PriorityQueue
queue *util.PriorityQueue
userID string
queueLength *prometheus.GaugeVec
}

func NewPriorityRequestQueue(queue *util.PriorityQueue) *PriorityRequestQueue {
return &PriorityRequestQueue{queue: queue}
func NewPriorityRequestQueue(queue *util.PriorityQueue, userID string, queueLength *prometheus.GaugeVec) *PriorityRequestQueue {
return &PriorityRequestQueue{queue: queue, userID: userID, queueLength: queueLength}
}

func (f *PriorityRequestQueue) enqueueRequest(r Request) {
f.queue.Enqueue(r)
if f.queueLength != nil {
f.queueLength.With(prometheus.Labels{
"user": f.userID,
"priority": strconv.FormatInt(r.Priority(), 10),
"type": "priority",
}).Inc()
}
}

func (f *PriorityRequestQueue) dequeueRequest(minPriority int64, checkMinPriority bool) Request {
if checkMinPriority && f.queue.Peek().Priority() < minPriority {
func (f *PriorityRequestQueue) dequeueRequest(priority int64, matchPriority bool) Request {
if matchPriority && f.queue.Peek().Priority() != priority {
return nil
}
return f.queue.Dequeue()
r := f.queue.Dequeue()
if f.queueLength != nil {
f.queueLength.With(prometheus.Labels{
"user": f.userID,
"priority": strconv.FormatInt(r.Priority(), 10),
"type": "priority",
}).Dec()
}
return r
}

func (f *PriorityRequestQueue) length() int {
Expand Down
Loading

0 comments on commit 5d0746f

Please sign in to comment.