Skip to content

Commit

Permalink
Make worker pool more resilient to slow tasks (#5751)
Browse files Browse the repository at this point in the history
* Make worker pool more resilient to slow tasks

* CHANGELOG.md

* fix invalid pointer
  • Loading branch information
thampiotr authored Nov 13, 2023
1 parent 867910b commit c2c40f8
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 91 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ Main (unreleased)

- Added support for unicode strings in `pyroscope.ebpf` python profiles. (@korniltsev)

- Improved resilience of graph evaluation in presence of slow components. (@thampiotr)

### Bugfixes

- Set exit code 1 on grafana-agentctl non-runnable command. (@fgouteroux)
Expand Down
4 changes: 2 additions & 2 deletions pkg/flow/flow_updates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestController_Updates_WithQueueFull(t *testing.T) {
ModuleRegistry: newModuleRegistry(),
IsModule: false,
// The small number of workers and small queue means that a lot of updates will need to be retried.
WorkerPool: worker.NewShardedWorkerPool(1, 1),
WorkerPool: worker.NewFixedWorkerPool(1, 1),
})

// Use testUpdatesFile from graph_builder_test.go.
Expand Down Expand Up @@ -376,6 +376,6 @@ func newTestController(t *testing.T) *Flow {
ModuleRegistry: newModuleRegistry(),
IsModule: false,
// Make sure that we have consistent number of workers for tests to make them deterministic.
WorkerPool: worker.NewShardedWorkerPool(4, 100),
WorkerPool: worker.NewFixedWorkerPool(4, 100),
})
}
201 changes: 130 additions & 71 deletions pkg/flow/internal/worker/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package worker

import (
"fmt"
"hash/fnv"
"math/rand"
"runtime"
"sync"
)
Expand All @@ -12,119 +10,180 @@ type Pool interface {
// Stop stops the worker pool. It does not wait to drain any internal queues, but it does wait for the currently
// running tasks to complete. It must only be called once.
Stop()
// Submit submits a function to be executed by the worker pool on a random worker. Error is returned if the pool
// is unable to accept extra work.
Submit(func()) error
// SubmitWithKey submits a function to be executed by the worker pool, ensuring that only one
// job with given key can be queued at the time. Adding a job with a key that is already queued is a no-op (even if
// the submitted function is different). Error is returned if the pool is unable to accept extra work - the caller
// can decide how to handle this situation.
// SubmitWithKey submits a function to be executed by the worker pool, ensuring that:
// * Only one job with given key can be waiting to be executed at the time. This is desired if we don't want to
// run the same task multiple times, e.g. if it's a component update that we only need to run once.
// * Only one job with given key can be running at the time. This is desired when we don't want to duplicate work,
// and we want to protect the pool from a slow task hogging all the workers.
//
// Note that it is possible to have two tasks with the same key in the pool at the same time: one waiting to be
// executed and another one running. This ensures that a request to re-run a task with the same key is not lost.
//
// Adding a job with a key that is already queued is a no-op (even if the submitted function is different).
// Error is returned if the pool is unable to accept extra work - the caller can decide how to handle this situation.
SubmitWithKey(string, func()) error
// QueueSize returns the number of tasks currently queued.
// QueueSize returns the number of tasks currently queued or running.
QueueSize() int
}

// shardedWorkerPool is a Pool that distributes work across a fixed number of workers, using a hash of the key to
// determine which worker to use. This, to an extent, defends the pool from a slow task hogging all the workers.
type shardedWorkerPool struct {
// fixedWorkerPool is a Pool that distributes work across a fixed number of workers. It uses workQueue to ensure
// that SubmitWithKey guarantees are met.
type fixedWorkerPool struct {
workersCount int
workQueues []chan func()
workQueue *workQueue
quit chan struct{}
allStopped sync.WaitGroup

lock sync.Mutex
set map[string]struct{}
}

var _ Pool = (*shardedWorkerPool)(nil)
var _ Pool = (*fixedWorkerPool)(nil)

func NewDefaultWorkerPool() Pool {
return NewShardedWorkerPool(runtime.NumCPU(), 1024)
return NewFixedWorkerPool(runtime.NumCPU(), 1024)
}

// NewShardedWorkerPool creates a new worker pool with the given number of workers and queue size for each worker.
// The queued tasks are sharded across the workers using a hash of the key. The pool is automatically started and
// ready to accept work. To prevent resource leak, Stop() must be called when the pool is no longer needed.
func NewShardedWorkerPool(workersCount int, queueSize int) Pool {
// NewFixedWorkerPool creates a new Pool with the given number of workers and given max queue size.
// The max queue size is the maximum number of tasks that can be queued OR running at the same time.
// The tasks can run on a random worker, but workQueue ensures only one task with given key is running at a time.
// The pool is automatically started and ready to accept work. To prevent resource leak, Stop() must be called when the
// pool is no longer needed.
func NewFixedWorkerPool(workersCount int, maxQueueSize int) Pool {
if workersCount <= 0 {
panic(fmt.Sprintf("workersCount must be positive, got %d", workersCount))
}
queues := make([]chan func(), workersCount)
for i := 0; i < workersCount; i++ {
queues[i] = make(chan func(), queueSize)
}
pool := &shardedWorkerPool{
pool := &fixedWorkerPool{
workersCount: workersCount,
workQueues: queues,
workQueue: newWorkQueue(maxQueueSize),
quit: make(chan struct{}),
set: make(map[string]struct{}),
}
pool.start()
return pool
}

func (w *shardedWorkerPool) Submit(f func()) error {
return w.SubmitWithKey(fmt.Sprintf("%d", rand.Int()), f)
}

func (w *shardedWorkerPool) SubmitWithKey(key string, f func()) error {
wrapped := func() {
// NOTE: we intentionally remove from the queue before executing the function. This means that while a task is
// executing, another task with the same key can be added to the queue, potentially even by the task itself.
w.lock.Lock()
delete(w.set, key)
w.lock.Unlock()

f()
}
queue := w.workQueues[w.workerFor(key)]

w.lock.Lock()
defer w.lock.Unlock()
if _, exists := w.set[key]; exists {
return nil // only queue one job for given key
}

select {
case queue <- wrapped:
w.set[key] = struct{}{}
return nil
default:
return fmt.Errorf("worker queue is full")
}
func (w *fixedWorkerPool) SubmitWithKey(key string, f func()) error {
_, err := w.workQueue.tryEnqueue(key, f)
return err
}

func (w *shardedWorkerPool) QueueSize() int {
w.lock.Lock()
defer w.lock.Unlock()
return len(w.set)
// QueueSize returns the number of tasks in the queue - waiting or currently running.
func (w *fixedWorkerPool) QueueSize() int {
return w.workQueue.queueSize()
}

func (w *shardedWorkerPool) Stop() {
func (w *fixedWorkerPool) Stop() {
close(w.quit)
w.allStopped.Wait()
}

func (w *shardedWorkerPool) start() {
func (w *fixedWorkerPool) start() {
for i := 0; i < w.workersCount; i++ {
queue := w.workQueues[i]
w.allStopped.Add(1)
go func() {
defer w.allStopped.Done()
for {
select {
case <-w.quit:
return
case f := <-queue:
case f := <-w.workQueue.tasksToRun:
f()
}
}
}()
}
}

func (w *shardedWorkerPool) workerFor(s string) int {
h := fnv.New32a()
_, _ = h.Write([]byte(s))
return int(h.Sum32()) % w.workersCount
type workQueue struct {
maxSize int
tasksToRun chan func()

lock sync.Mutex
waitingOrder []string
waiting map[string]func()
running map[string]struct{}
}

func newWorkQueue(maxSize int) *workQueue {
return &workQueue{
maxSize: maxSize,
tasksToRun: make(chan func(), maxSize),
waiting: make(map[string]func()),
running: make(map[string]struct{}),
}
}

func (w *workQueue) tryEnqueue(key string, f func()) (bool, error) {
w.lock.Lock()
defer w.lock.Unlock()

// Don't enqueue if same task already waiting
if _, exists := w.waiting[key]; exists {
return false, nil
}

// Don't exceed queue size
queueSize := len(w.waitingOrder) + len(w.running)
if queueSize >= w.maxSize {
return false, fmt.Errorf("worker queue is full")
}

// Else enqueue
w.waitingOrder = append(w.waitingOrder, key)
w.waiting[key] = f

// A task may have become runnable now, emit it
w.emitNextTask()

return true, nil
}

func (w *workQueue) taskDone(key string) {
w.lock.Lock()
defer w.lock.Unlock()
delete(w.running, key)
// A task may have become runnable now, emit it
w.emitNextTask()
}

// emitNextTask emits the next eligible task to be run if there is one. It must be called whenever the queue state
// changes (e.g. a task is added or a task finishes). The lock must be held when calling this function.
func (w *workQueue) emitNextTask() {
var (
task func()
key string
index int
found = false
)

// Find the first key in waitingOrder that is not yet running
for i, k := range w.waitingOrder {
if _, alreadyRunning := w.running[k]; !alreadyRunning {
found, key, index = true, k, i
break
}
}

// Return if we didn't find any task ready to run
if !found {
return
}

// Remove the task from waiting and add it to running set
w.waitingOrder = append(w.waitingOrder[:index], w.waitingOrder[index+1:]...)
task = w.waiting[key]
delete(w.waiting, key)
w.running[key] = struct{}{}

// Wrap the actual task to make sure we mark it as done when it finishes
wrapped := func() {
defer w.taskDone(key)
task()
}

// Emit the task to be run. There will always be space in this buffered channel, because we limit queue size.
w.tasksToRun <- wrapped
}

func (w *workQueue) queueSize() int {
w.lock.Lock()
defer w.lock.Unlock()
return len(w.waitingOrder) + len(w.running)
}
Loading

0 comments on commit c2c40f8

Please sign in to comment.