-
Notifications
You must be signed in to change notification settings - Fork 487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make worker pool more resilient to slow tasks #5751
Conversation
require.Equal(t, 2, pool.QueueSize()) | ||
}) | ||
|
||
t.Run("should not block when one task is stuck", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous implementation, this test would fail.
}, 3*time.Second, 1*time.Millisecond) | ||
}) | ||
|
||
t.Run("should NOT run concurrently tasks with the same key", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an important invariant that was true in the previous implementation, but was not explicitly tested (although some other tests would become flaky if it was broken)
754f423
to
47ae000
Compare
|
||
// Wait for the first task to be running already and blocking the worker | ||
<-firstTaskRunning | ||
require.Equal(t, 0, pool.QueueSize()) | ||
require.Equal(t, 1, pool.QueueSize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The definition of QueueSize changed to also include the tasks that are currently executing. This makes things a bit simpler in the implementation, while still being able to perform its function: tell us when queue is building up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice improvement! I have one question about performance implications, but I think this is good to go.
} | ||
|
||
// Remove the task from waiting and add it to running set | ||
w.waitingOrder = append(w.waitingOrder[:index], w.waitingOrder[index+1:]...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the behavior of append when removing an element in this way; is it copying the whole slice? I'm curious if this has any performance implications for allocations long term, and if a traditional linked list might be more appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good point as a linked list should be a natural choice for this algorithm.
The above code will copy the second argument element-by-element, as per https://go.dev/blog/slices-intro, but it will NOT allocate since we're reducing the size of the slice. This will be slower than a linked-list approach, where removal is O(1) instead of O(N). However, this code is not a bottleneck for two reasons: 1) this slice copying is dominated by other, slower processes - like evaluating components that can take multiple milliseconds, compared to slice copy that takes few nanoseconds. 2) the size of the queue is usually less than 10 elements even in our largest clusters.
If we had a linked list in standard Go library, I'd use it, but we don't and it's not a bottleneck, so I'm proposing we stick to this more idiomatic implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m fine with that, we can always return to this later if it turns into a bottleneck :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we had a linked list in standard Go library, I'd use it, but we don't and it's not a bottleneck, so I'm proposing we stick to this more idiomatic implementation.
There is linked list in Go standard library (thanks!) but it allocates memory, so it would not be more performant here... and it's still not a bottleneck :) I have some benchmarks here: #5765 which we can merge if we want to keep it for the posteriority.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing after the fact, but LGTM; I like the new semantics and the readability 🙂
Adding a small nit about how tryEnqueue
semantics below and whether they matter in the context of Flow graph updates.
* Make worker pool more resilient to slow tasks * CHANGELOG.md * fix invalid pointer
* Make worker pool more resilient to slow tasks * CHANGELOG.md * fix invalid pointer
PR Description
The current worker pool distributes tasks among goroutines using hash of the task's key. In presence of a very slow task, e.g. due to a bug, we can have one goroutine that will be effectively stuck. All other tasks that are unfortunate to hash to the same goroutine will also be affected.
We could prevent this if we were to not rely on hash, but instead assign tasks randomly to any goroutine. The problem with this approach, however, is that a slow task may come in again and again and soon we will have every goroutine in the pool trying to execute the very slow task. So, in order to prevent this, we must only allow one task with given key to be executing at any given time.
In this PR changes the worker pool implementation:
As a result a very slow task will a) not be able to take over all the goroutines b) will not block other tasks from executing.
As discussed in previous PRs - the worker pool we need for the Agent requires a quite unique behaviour as described above and we are not able to use an existing library to fulfil these requirements.
Notes to the Reviewer
Tested this in a dev cluster, where, as expected, I can see a drop in times that tasks had to wait for a goroutine to pick them up:
(left half - before change, right half - after the change)
The size of this effect is expected to be larger in other large clusters that have slower components.
PR Checklist