From 0edfd7d95d0206031df26fed13ba107b8b91dcef Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Fri, 19 Jul 2024 17:23:18 -0700 Subject: [PATCH] Fix user queue in scheduler that was not thread-safe (#6077) --- CHANGELOG.md | 2 ++ pkg/scheduler/queue/user_queues.go | 13 +++++++++- pkg/scheduler/queue/user_queues_test.go | 34 +++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba6acb3175..b992fbf7e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,10 +27,12 @@ * [ENHANCEMENT] Ruler: Add support for filtering by `match` field on Rules API. #6083 * [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095 * [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097 +* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 * [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952 * [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018 * [BUGFIX] Ingester: Fix issue with the minimize token generator where it was not taking in consideration the current ownerhip of an instance when generating extra tokens. #6062 +* [BUGFIX] Scheduler: Fix user queue in scheduler that was not thread-safe. #6077 ## 1.17.1 2024-05-20 diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index 25f562ee02..159df7810b 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -3,6 +3,7 @@ package queue import ( "math/rand" "sort" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -37,7 +38,8 @@ type querier struct { // This struct holds user queues for pending requests. It also keeps track of connected queriers, // and mapping between users and queriers. type queues struct { - userQueues map[string]*userQueue + userQueues map[string]*userQueue + userQueuesMx sync.RWMutex // List of all users with queues, used for iteration when searching for next queue to handle. // Users removed from the middle are replaced with "". To avoid skipping users during iteration, we only shrink @@ -103,6 +105,9 @@ func (q *queues) len() int { } func (q *queues) deleteQueue(userID string) { + q.userQueuesMx.Lock() + defer q.userQueuesMx.Unlock() + uq := q.userQueues[userID] if uq == nil { return @@ -132,6 +137,9 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue maxQueriers = 0 } + q.userQueuesMx.Lock() + defer q.userQueuesMx.Unlock() + uq := q.userQueues[userID] priorityEnabled := q.limits.QueryPriority(userID).Enabled maxOutstanding := q.limits.MaxOutstandingPerTenant(userID) @@ -237,6 +245,9 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (us continue } + q.userQueuesMx.RLock() + defer q.userQueuesMx.RUnlock() + uq := q.userQueues[u] if uq.queriers != nil { diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/user_queues_test.go index ded597baa0..4e720de402 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/user_queues_test.go @@ -5,6 +5,7 @@ import ( "math" "math/rand" "sort" + "sync" "testing" "time" @@ -457,6 +458,39 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) { } } +func TestQueueConcurrency(t *testing.T) { + const numGoRoutines = 30 + limits := MockLimits{ + MaxOutstanding: 50, + } + q := newUserQueues(0, 0, limits, nil) + q.addQuerierConnection("q-1") + q.addQuerierConnection("q-2") + q.addQuerierConnection("q-3") + q.addQuerierConnection("q-4") + q.addQuerierConnection("q-5") + + var wg sync.WaitGroup + wg.Add(numGoRoutines) + + for i := 0; i < numGoRoutines; i++ { + go func(cnt int) { + defer wg.Done() + queue := q.getOrAddQueue("userID", 2) + if cnt%2 == 0 { + queue.enqueueRequest(MockRequest{}) + q.getNextQueueForQuerier(0, "q-1") + } else if cnt%5 == 0 { + queue.dequeueRequest(0, false) + } else if cnt%7 == 0 { + q.deleteQueue("userID") + } + }(i) + } + + wg.Wait() +} + func generateTenant(r *rand.Rand) string { return fmt.Sprint("tenant-", r.Int()%5) }