From c2c40f8a27abe71e35eedf440ab25ae44bf8a9ff Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Mon, 13 Nov 2023 10:52:25 +0000 Subject: [PATCH] Make worker pool more resilient to slow tasks (#5751) * Make worker pool more resilient to slow tasks * CHANGELOG.md * fix invalid pointer --- CHANGELOG.md | 2 + pkg/flow/flow_updates_test.go | 4 +- pkg/flow/internal/worker/worker_pool.go | 201 ++++++++++++------- pkg/flow/internal/worker/worker_pool_test.go | 118 +++++++++-- pkg/flow/module_test.go | 2 +- 5 files changed, 236 insertions(+), 91 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a12e820ddf37..92b37f905438 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,8 @@ Main (unreleased) - Added support for unicode strings in `pyroscope.ebpf` python profiles. (@korniltsev) +- Improved resilience of graph evaluation in presence of slow components. (@thampiotr) + ### Bugfixes - Set exit code 1 on grafana-agentctl non-runnable command. (@fgouteroux) diff --git a/pkg/flow/flow_updates_test.go b/pkg/flow/flow_updates_test.go index 90960f6eef8e..c2349928f06a 100644 --- a/pkg/flow/flow_updates_test.go +++ b/pkg/flow/flow_updates_test.go @@ -114,7 +114,7 @@ func TestController_Updates_WithQueueFull(t *testing.T) { ModuleRegistry: newModuleRegistry(), IsModule: false, // The small number of workers and small queue means that a lot of updates will need to be retried. - WorkerPool: worker.NewShardedWorkerPool(1, 1), + WorkerPool: worker.NewFixedWorkerPool(1, 1), }) // Use testUpdatesFile from graph_builder_test.go. @@ -376,6 +376,6 @@ func newTestController(t *testing.T) *Flow { ModuleRegistry: newModuleRegistry(), IsModule: false, // Make sure that we have consistent number of workers for tests to make them deterministic. - WorkerPool: worker.NewShardedWorkerPool(4, 100), + WorkerPool: worker.NewFixedWorkerPool(4, 100), }) } diff --git a/pkg/flow/internal/worker/worker_pool.go b/pkg/flow/internal/worker/worker_pool.go index b5ff904b2024..25a8eeebe536 100644 --- a/pkg/flow/internal/worker/worker_pool.go +++ b/pkg/flow/internal/worker/worker_pool.go @@ -2,8 +2,6 @@ package worker import ( "fmt" - "hash/fnv" - "math/rand" "runtime" "sync" ) @@ -12,102 +10,72 @@ type Pool interface { // Stop stops the worker pool. It does not wait to drain any internal queues, but it does wait for the currently // running tasks to complete. It must only be called once. Stop() - // Submit submits a function to be executed by the worker pool on a random worker. Error is returned if the pool - // is unable to accept extra work. - Submit(func()) error - // SubmitWithKey submits a function to be executed by the worker pool, ensuring that only one - // job with given key can be queued at the time. Adding a job with a key that is already queued is a no-op (even if - // the submitted function is different). Error is returned if the pool is unable to accept extra work - the caller - // can decide how to handle this situation. + // SubmitWithKey submits a function to be executed by the worker pool, ensuring that: + // * Only one job with given key can be waiting to be executed at the time. This is desired if we don't want to + // run the same task multiple times, e.g. if it's a component update that we only need to run once. + // * Only one job with given key can be running at the time. This is desired when we don't want to duplicate work, + // and we want to protect the pool from a slow task hogging all the workers. + // + // Note that it is possible to have two tasks with the same key in the pool at the same time: one waiting to be + // executed and another one running. This ensures that a request to re-run a task with the same key is not lost. + // + // Adding a job with a key that is already queued is a no-op (even if the submitted function is different). + // Error is returned if the pool is unable to accept extra work - the caller can decide how to handle this situation. SubmitWithKey(string, func()) error - // QueueSize returns the number of tasks currently queued. + // QueueSize returns the number of tasks currently queued or running. QueueSize() int } -// shardedWorkerPool is a Pool that distributes work across a fixed number of workers, using a hash of the key to -// determine which worker to use. This, to an extent, defends the pool from a slow task hogging all the workers. -type shardedWorkerPool struct { +// fixedWorkerPool is a Pool that distributes work across a fixed number of workers. It uses workQueue to ensure +// that SubmitWithKey guarantees are met. +type fixedWorkerPool struct { workersCount int - workQueues []chan func() + workQueue *workQueue quit chan struct{} allStopped sync.WaitGroup - - lock sync.Mutex - set map[string]struct{} } -var _ Pool = (*shardedWorkerPool)(nil) +var _ Pool = (*fixedWorkerPool)(nil) func NewDefaultWorkerPool() Pool { - return NewShardedWorkerPool(runtime.NumCPU(), 1024) + return NewFixedWorkerPool(runtime.NumCPU(), 1024) } -// NewShardedWorkerPool creates a new worker pool with the given number of workers and queue size for each worker. -// The queued tasks are sharded across the workers using a hash of the key. The pool is automatically started and -// ready to accept work. To prevent resource leak, Stop() must be called when the pool is no longer needed. -func NewShardedWorkerPool(workersCount int, queueSize int) Pool { +// NewFixedWorkerPool creates a new Pool with the given number of workers and given max queue size. +// The max queue size is the maximum number of tasks that can be queued OR running at the same time. +// The tasks can run on a random worker, but workQueue ensures only one task with given key is running at a time. +// The pool is automatically started and ready to accept work. To prevent resource leak, Stop() must be called when the +// pool is no longer needed. +func NewFixedWorkerPool(workersCount int, maxQueueSize int) Pool { if workersCount <= 0 { panic(fmt.Sprintf("workersCount must be positive, got %d", workersCount)) } - queues := make([]chan func(), workersCount) - for i := 0; i < workersCount; i++ { - queues[i] = make(chan func(), queueSize) - } - pool := &shardedWorkerPool{ + pool := &fixedWorkerPool{ workersCount: workersCount, - workQueues: queues, + workQueue: newWorkQueue(maxQueueSize), quit: make(chan struct{}), - set: make(map[string]struct{}), } pool.start() return pool } -func (w *shardedWorkerPool) Submit(f func()) error { - return w.SubmitWithKey(fmt.Sprintf("%d", rand.Int()), f) -} - -func (w *shardedWorkerPool) SubmitWithKey(key string, f func()) error { - wrapped := func() { - // NOTE: we intentionally remove from the queue before executing the function. This means that while a task is - // executing, another task with the same key can be added to the queue, potentially even by the task itself. - w.lock.Lock() - delete(w.set, key) - w.lock.Unlock() - - f() - } - queue := w.workQueues[w.workerFor(key)] - - w.lock.Lock() - defer w.lock.Unlock() - if _, exists := w.set[key]; exists { - return nil // only queue one job for given key - } - - select { - case queue <- wrapped: - w.set[key] = struct{}{} - return nil - default: - return fmt.Errorf("worker queue is full") - } +func (w *fixedWorkerPool) SubmitWithKey(key string, f func()) error { + _, err := w.workQueue.tryEnqueue(key, f) + return err } -func (w *shardedWorkerPool) QueueSize() int { - w.lock.Lock() - defer w.lock.Unlock() - return len(w.set) +// QueueSize returns the number of tasks in the queue - waiting or currently running. +func (w *fixedWorkerPool) QueueSize() int { + return w.workQueue.queueSize() } -func (w *shardedWorkerPool) Stop() { +func (w *fixedWorkerPool) Stop() { close(w.quit) w.allStopped.Wait() } -func (w *shardedWorkerPool) start() { +func (w *fixedWorkerPool) start() { for i := 0; i < w.workersCount; i++ { - queue := w.workQueues[i] w.allStopped.Add(1) go func() { defer w.allStopped.Done() @@ -115,7 +83,7 @@ func (w *shardedWorkerPool) start() { select { case <-w.quit: return - case f := <-queue: + case f := <-w.workQueue.tasksToRun: f() } } @@ -123,8 +91,99 @@ func (w *shardedWorkerPool) start() { } } -func (w *shardedWorkerPool) workerFor(s string) int { - h := fnv.New32a() - _, _ = h.Write([]byte(s)) - return int(h.Sum32()) % w.workersCount +type workQueue struct { + maxSize int + tasksToRun chan func() + + lock sync.Mutex + waitingOrder []string + waiting map[string]func() + running map[string]struct{} +} + +func newWorkQueue(maxSize int) *workQueue { + return &workQueue{ + maxSize: maxSize, + tasksToRun: make(chan func(), maxSize), + waiting: make(map[string]func()), + running: make(map[string]struct{}), + } +} + +func (w *workQueue) tryEnqueue(key string, f func()) (bool, error) { + w.lock.Lock() + defer w.lock.Unlock() + + // Don't enqueue if same task already waiting + if _, exists := w.waiting[key]; exists { + return false, nil + } + + // Don't exceed queue size + queueSize := len(w.waitingOrder) + len(w.running) + if queueSize >= w.maxSize { + return false, fmt.Errorf("worker queue is full") + } + + // Else enqueue + w.waitingOrder = append(w.waitingOrder, key) + w.waiting[key] = f + + // A task may have become runnable now, emit it + w.emitNextTask() + + return true, nil +} + +func (w *workQueue) taskDone(key string) { + w.lock.Lock() + defer w.lock.Unlock() + delete(w.running, key) + // A task may have become runnable now, emit it + w.emitNextTask() +} + +// emitNextTask emits the next eligible task to be run if there is one. It must be called whenever the queue state +// changes (e.g. a task is added or a task finishes). The lock must be held when calling this function. +func (w *workQueue) emitNextTask() { + var ( + task func() + key string + index int + found = false + ) + + // Find the first key in waitingOrder that is not yet running + for i, k := range w.waitingOrder { + if _, alreadyRunning := w.running[k]; !alreadyRunning { + found, key, index = true, k, i + break + } + } + + // Return if we didn't find any task ready to run + if !found { + return + } + + // Remove the task from waiting and add it to running set + w.waitingOrder = append(w.waitingOrder[:index], w.waitingOrder[index+1:]...) + task = w.waiting[key] + delete(w.waiting, key) + w.running[key] = struct{}{} + + // Wrap the actual task to make sure we mark it as done when it finishes + wrapped := func() { + defer w.taskDone(key) + task() + } + + // Emit the task to be run. There will always be space in this buffered channel, because we limit queue size. + w.tasksToRun <- wrapped +} + +func (w *workQueue) queueSize() int { + w.lock.Lock() + defer w.lock.Unlock() + return len(w.waitingOrder) + len(w.running) } diff --git a/pkg/flow/internal/worker/worker_pool_test.go b/pkg/flow/internal/worker/worker_pool_test.go index 27b5c24b7ebc..3a594cba2622 100644 --- a/pkg/flow/internal/worker/worker_pool_test.go +++ b/pkg/flow/internal/worker/worker_pool_test.go @@ -1,6 +1,7 @@ package worker import ( + "fmt" "testing" "time" @@ -13,7 +14,7 @@ func TestWorkerPool(t *testing.T) { t.Run("worker pool", func(t *testing.T) { t.Run("should start and stop cleanly", func(t *testing.T) { defer goleak.VerifyNone(t) - pool := NewShardedWorkerPool(4, 1) + pool := NewFixedWorkerPool(4, 1) require.Equal(t, 0, pool.QueueSize()) defer pool.Stop() }) @@ -22,28 +23,28 @@ func TestWorkerPool(t *testing.T) { defer goleak.VerifyNone(t) require.Panics(t, func() { - NewShardedWorkerPool(0, 0) + NewFixedWorkerPool(0, 0) }) require.Panics(t, func() { - NewShardedWorkerPool(-1, 0) + NewFixedWorkerPool(-1, 0) }) }) t.Run("should reject invalid buffer size", func(t *testing.T) { defer goleak.VerifyNone(t) require.Panics(t, func() { - NewShardedWorkerPool(1, -1) + NewFixedWorkerPool(1, -1) }) }) t.Run("should process a single task", func(t *testing.T) { defer goleak.VerifyNone(t) done := make(chan struct{}) - pool := NewShardedWorkerPool(4, 1) + pool := NewFixedWorkerPool(4, 1) defer pool.Stop() - err := pool.Submit(func() { + err := pool.SubmitWithKey("123", func() { done <- struct{}{} }) require.NoError(t, err) @@ -59,7 +60,7 @@ func TestWorkerPool(t *testing.T) { t.Run("should process a single task with key", func(t *testing.T) { defer goleak.VerifyNone(t) done := make(chan struct{}) - pool := NewShardedWorkerPool(4, 1) + pool := NewFixedWorkerPool(4, 1) defer pool.Stop() err := pool.SubmitWithKey("testKey", func() { @@ -77,7 +78,7 @@ func TestWorkerPool(t *testing.T) { t.Run("should not queue duplicated keys", func(t *testing.T) { defer goleak.VerifyNone(t) - pool := NewShardedWorkerPool(4, 10) + pool := NewFixedWorkerPool(4, 10) defer pool.Stop() tasksDone := atomic.Int32{} @@ -90,17 +91,18 @@ func TestWorkerPool(t *testing.T) { tasksDone.Inc() }) require.NoError(t, err) + defer func() { close(blockFirstTask) }() // Wait for the first task to be running already and blocking the worker <-firstTaskRunning - require.Equal(t, 0, pool.QueueSize()) + require.Equal(t, 1, pool.QueueSize()) // Second task will be queued err = pool.SubmitWithKey("k1", func() { tasksDone.Inc() }) require.NoError(t, err) - require.Equal(t, 1, pool.QueueSize()) + require.Equal(t, 2, pool.QueueSize()) // Third task will be skipped, as we already have k1 in the queue err = pool.SubmitWithKey("k1", func() { @@ -115,17 +117,17 @@ func TestWorkerPool(t *testing.T) { blockFirstTask <- struct{}{} require.Eventually(t, func() bool { return tasksDone.Load() == 2 - }, 3*time.Second, 5*time.Millisecond) + }, 3*time.Second, 1*time.Millisecond) require.Equal(t, 0, pool.QueueSize()) // No more tasks should be done, verify again with some delay - time.Sleep(100 * time.Millisecond) + time.Sleep(10 * time.Millisecond) require.Equal(t, int32(2), tasksDone.Load()) }) t.Run("should concurrently process for different keys", func(t *testing.T) { defer goleak.VerifyNone(t) - pool := NewShardedWorkerPool(4, 10) + pool := NewFixedWorkerPool(4, 10) defer pool.Stop() tasksDone := atomic.Int32{} @@ -138,6 +140,7 @@ func TestWorkerPool(t *testing.T) { tasksDone.Inc() }) require.NoError(t, err) + defer func() { close(blockFirstTask) }() // Wait for the first task to be running already and blocking the worker <-firstTaskRunning @@ -164,7 +167,7 @@ func TestWorkerPool(t *testing.T) { t.Run("should reject when queue is full", func(t *testing.T) { defer goleak.VerifyNone(t) // Pool with one worker and queue size of 1 - all work goes to one queue - pool := NewShardedWorkerPool(1, 1) + pool := NewFixedWorkerPool(1, 2) defer pool.Stop() tasksDone := atomic.Int32{} @@ -177,21 +180,102 @@ func TestWorkerPool(t *testing.T) { tasksDone.Inc() }) require.NoError(t, err) - defer func() { blockFirstTask <- struct{}{} }() + defer func() { close(blockFirstTask) }() // Wait for the first task to be running already and blocking the worker <-firstTaskRunning - require.Equal(t, 0, pool.QueueSize()) + require.Equal(t, 1, pool.QueueSize()) // Second task will be queued err = pool.SubmitWithKey("k2", func() { tasksDone.Inc() }) require.NoError(t, err) - require.Equal(t, 1, pool.QueueSize()) + require.Equal(t, 2, pool.QueueSize()) // Third task cannot be accepted, because the queue is full err = pool.SubmitWithKey("k3", func() { tasksDone.Inc() }) require.ErrorContains(t, err, "queue is full") + require.Equal(t, 2, pool.QueueSize()) + }) + + t.Run("should not block when one task is stuck", func(t *testing.T) { + defer goleak.VerifyNone(t) + tasksCount := 1000 + + // Queue size is sufficient to queue all tasks + pool := NewFixedWorkerPool(3, tasksCount+1) + defer pool.Stop() + tasksDone := atomic.Int32{} + + // First task will block + blockFirstTask := make(chan struct{}) + firstTaskRunning := make(chan struct{}) + err := pool.SubmitWithKey("k-blocking", func() { + firstTaskRunning <- struct{}{} + <-blockFirstTask + tasksDone.Inc() + }) + require.NoError(t, err) + defer func() { close(blockFirstTask) }() + + // Wait for the first task to be running already and blocking the worker + <-firstTaskRunning + require.Equal(t, 1, pool.QueueSize()) + + // Submit a lot of tasks with random keys - no task should be blocked by the above one. + for i := 0; i < tasksCount; i++ { + err = pool.SubmitWithKey(fmt.Sprintf("t%d", i), func() { tasksDone.Inc() }) + require.NoError(t, err) + } + + // Ensure all tasks are done + require.Eventually(t, func() bool { + return tasksDone.Load() == int32(tasksCount) + }, 3*time.Second, 1*time.Millisecond) + }) + + t.Run("should NOT run concurrently tasks with the same key", func(t *testing.T) { + defer goleak.VerifyNone(t) + tasksCount := 1000 + + // Queue size is sufficient to queue all tasks + pool := NewFixedWorkerPool(10, 10) + defer pool.Stop() + tasksDone := atomic.Int32{} + + // First task will block + blockFirstTask := make(chan struct{}) + firstTaskRunning := make(chan struct{}) + err := pool.SubmitWithKey("k1", func() { + firstTaskRunning <- struct{}{} + <-blockFirstTask + tasksDone.Inc() + }) + require.NoError(t, err) + defer func() { close(blockFirstTask) }() + + // Wait for the first task to be running already and blocking the worker + <-firstTaskRunning require.Equal(t, 1, pool.QueueSize()) + + // Enqueue one more task with the same key - it should be allowed + err = pool.SubmitWithKey("k1", func() { tasksDone.Inc() }) + require.NoError(t, err) + + // Submit a lot of tasks with same key - all should be a no-op, since this task is already in queue + for i := 0; i < tasksCount; i++ { + err = pool.SubmitWithKey("k1", func() { tasksDone.Inc() }) + require.NoError(t, err) + } + + require.Equal(t, int32(0), tasksDone.Load()) + + // Unblock the first task + blockFirstTask <- struct{}{} + + // The first task and the second one should be the only ones that complete + require.Eventually(t, func() bool { + return tasksDone.Load() == 2 + }, 3*time.Second, 1*time.Millisecond) }) }) } diff --git a/pkg/flow/module_test.go b/pkg/flow/module_test.go index 0a371bc99a63..4e4ddb9faaa8 100644 --- a/pkg/flow/module_test.go +++ b/pkg/flow/module_test.go @@ -250,7 +250,7 @@ func testModuleControllerOptions(t *testing.T) *moduleControllerOptions { DataPath: t.TempDir(), Reg: prometheus.NewRegistry(), ModuleRegistry: newModuleRegistry(), - WorkerPool: worker.NewShardedWorkerPool(1, 100), + WorkerPool: worker.NewFixedWorkerPool(1, 100), } }