-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: optimize qos queue trimming with heap.Init()
(O(n)
)
#152
Conversation
}) | ||
} | ||
|
||
// priority determines which tie breakers are used during normal enqueueing and queue trimming. | ||
func priority(p PriorityType) (enqueueTieBreaker tieBreaker, trimTieBreaker tieBreaker, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making this its own function helps with testing.
@@ -61,19 +67,33 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error { | |||
return nil | |||
} | |||
|
|||
// trim removes messages with the lowest QualityOfService (taking `prioritizeOldest` into account) | |||
// until the queue no longer violates `maxQueueSize“. | |||
func (pq *priorityQueue) trim() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we trim and we're looking to trim the least prioritized messages, then (right before removing messages) we need to call heap.Init() since the prioritization was changed. https://pkg.go.dev/container/heap#Init
The complexity of heap.Init() is O(n). Every time we need to trim, heap.Init() will be called twice (before and after trimming).
So the tax for trimming is: O(n) [two heap.Init() calls]
+ O(n*log(n)) [X calls to sort and remove messages from the queue]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the simplest way we can achieve this while minimizing code changes.
I think the cost of this new trimming code will be small overall because I think the situation where we are trimming often is a very unlikely edge case, it’ll require the xmidt-agent to
- consistently or completely be unable to send messages to xmidt (e.g outage or connectivity issues)
- while also receiving a steady flow of new messages to a already full queued
One way i think we can mitigate this edge case of constantly trimming may be (during a single pass of trimming) to remove stale messages (if any) and if there aren't any stale messages, then the lowest qos message can be removed.
that way we're potentially removing bulks of stale messages rather than removing 1 message at a time during a single pass of trimming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if we want to optimize trimming further in the future, I think we can achieve n*log(n) trimming (I have a working version locally but it introduces a bit of code)
- 100% test cover for `internal/wrphandlers/qos` - optimize `priorityQueue.trim()` to remove messages with the lowest qos value while using `trimTieBreaker` breaks any QualityOfService ties during queue trimming - the compute cost of optimizing trim will be O(n) More details on the compute cost of optimizing trim: When we trim and we're looking to trim the least prioritized messages, then (right before removing messages) we need to call heap.Init() since the prioritization was changed. https://pkg.go.dev/container/heap#Init The complexity of heap.Init() is O(n). Every time we need to trim, heap.Init() will be called twice (before and after trimming). So the tax for trimming is: O(n) `[two heap.Init() calls]` + O(n*log(n)) `[X calls to sort and remove messages from the queue]`
150f0a6
to
0b366a7
Compare
heap.Init()
(O(n)
)
internal/wrphandlers/qos
priorityQueue.trim()
to remove messages with the lowest qos value while usingtrimTieBreaker
breaks any QualityOfService ties during queue trimmingpriorityQueue.trim()
will be O(n)More details on the compute cost of optimizing trim:
When we trim and we're looking to trim the least prioritized messages, then (right before removing messages) we need to call heap.Init() since the prioritization was changed. https://pkg.go.dev/container/heap#Init
The complexity of heap.Init() is O(n). Every time we need to trim, heap.Init() will be called twice (before and after trimming).
So the tax for trimming is: O(n)
[two heap.Init() calls]
+ O(n*log(n))[X calls to sort and remove messages from the queue]