Skip to content

Commit

Permalink
feat: optimize qos queue trim
Browse files Browse the repository at this point in the history
- 100% test cover for `internal/wrphandlers/qos`
- optimize `priorityQueue.trim()` to remove messages with the lowest qos value while using `trimTieBreaker` breaks any QualityOfService ties during queue trimming
- the compute cost of optimizing trim will be O(n)

More details on the compute cost of optimizing trim:

We're not trimming it's a O(n*log(n)) sort.

But when we trim and we're looking to trim the least prioritized messages, then (right before removing messages) we need to call heap.Init() since the prioritization was changed. https://pkg.go.dev/container/heap#Init

The complexity of heap.Init() is O(n). Every time we need to trim, heap.Init() will be called twice (before and after trimming).

So the tax for trimming is: O(n) `[two heap.Init()  calls]` + O(n*log(n)) `[X calls to sort and remove messages from the queue]`
  • Loading branch information
denopink committed Jun 4, 2024
1 parent 50caded commit 150f0a6
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 44 deletions.
4 changes: 2 additions & 2 deletions internal/wrphandlers/qos/internal_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func validatePriority() Option {
func validateTieBreaker() Option {
return optionFunc(
func(h *Handler) error {
if h.tieBreaker == nil {
return errors.Join(fmt.Errorf("%w: nil tiebreak", ErrPriorityTypeInvalid), ErrMisconfiguredQOS)
if h.tieBreaker == nil || h.trimTieBreaker == nil {
return errors.Join(fmt.Errorf("%w: nil tiebreak/trimTieBreaker", ErrPriorityTypeInvalid), ErrMisconfiguredQOS)
}

return nil
Expand Down
38 changes: 24 additions & 14 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,31 @@ func MaxMessageBytes(s int) Option {
// with the default being to prioritize the newest messages.
func Priority(p PriorityType) Option {
return optionFunc(
func(h *Handler) error {
// Determine what will be used as a QualityOfService tie breaker.
switch p {
case NewestType:
// Prioritize the newest messages.
h.tieBreaker = PriorityNewestMsg
case OldestType:
// Prioritize the oldest messages.
h.tieBreaker = PriorityOldestMsg
default:
return errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, h.priority), ErrMisconfiguredQOS)
}

func(h *Handler) (err error) {
h.tieBreaker, h.trimTieBreaker, err = priority(p)
h.priority = p

return nil
return err
})
}

// priority determines which tie breakers are used during normal enqueueing and queue trimming.
func priority(p PriorityType) (enqueueTieBreaker tieBreaker, trimTieBreaker tieBreaker, err error) {
// Determine what will be used as a QualityOfService tie breaker during normal enqueueing and queue trimming.
switch p {
case NewestType:
// Prioritize the newest messages.
enqueueTieBreaker = PriorityNewestMsg
// Remove the oldest messages during trimming.
trimTieBreaker = PriorityOldestMsg
case OldestType:
// Prioritize the oldest messages.
enqueueTieBreaker = PriorityOldestMsg
// Remove the newest messages during trimming.
trimTieBreaker = PriorityNewestMsg
default:
return nil, nil, errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, p), ErrMisconfiguredQOS)
}

return enqueueTieBreaker, trimTieBreaker, nil
}
44 changes: 37 additions & 7 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ type priorityQueue struct {
queue []item
// tieBreaker breaks any QualityOfService ties.
tieBreaker tieBreaker
// queueInTrimming indicates the queue is actively being trimmed, where messages with the lowest
// QualityOfService are prioritize and removed.
// Only used during `priorityQueue.trim()`.
queueInTrimming bool
// trimTieBreaker breaks any QualityOfService ties during queue trimming.
trimTieBreaker tieBreaker
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
maxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
Expand Down Expand Up @@ -61,19 +67,33 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
return nil
}

// trim removes messages with the lowest QualityOfService (taking `prioritizeOldest` into account)
// until the queue no longer violates `maxQueueSize“.
func (pq *priorityQueue) trim() {
if pq.sizeBytes <= pq.maxQueueBytes {
return
}

// Prioritize messages with the lowest QualityOfService such that `pq.Pop()` will return the message with.
pq.queueInTrimming = true
defer func() {
// Re-prioritize messages with the highest QualityOfService.
pq.queueInTrimming = false
// heap.Init() is required since the prioritization was switch to messages with the highest QualityOfService.
// The complexity of heap.Init() is O(n) where n = h.Len().
heap.Init(pq)
}()

// heap.Init() is required since the prioritization was switch to messages with the lowest QualityOfService.
// The complexity of heap.Init() is O(n) where n = h.Len().
heap.Init(pq)
// trim until the queue no longer violates maxQueueBytes.
for pq.sizeBytes > pq.maxQueueBytes {
// Note, priorityQueue.drop does not drop the least prioritized queued message.
// i.e.: a high priority queued message may be dropped instead of a lesser queued message.
pq.drop()
// Dequeue messages with the lowest QualityOfService.
pq.Dequeue()
}
}

func (pq *priorityQueue) drop() {
_ = heap.Remove(pq, pq.Len()-1).(wrp.Message)
}

// heap.Interface related implementations https://pkg.go.dev/container/heap#Interface

func (pq *priorityQueue) Len() int { return len(pq.queue) }
Expand All @@ -84,9 +104,19 @@ func (pq *priorityQueue) Less(i, j int) bool {

// Determine whether a tie breaker is required.
if iQOS != jQOS {
if pq.queueInTrimming {
// Prioritize messages with the lowest QualityOfService.
return iQOS < jQOS
}

return iQOS > jQOS
}

if pq.queueInTrimming {
// Tie breaker during queue trimming.
return pq.trimTieBreaker(iItem, jItem)
}

return pq.tieBreaker(iItem, jItem)
}

Expand Down
78 changes: 57 additions & 21 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ func testEnqueueDequeueAgePriority(t *testing.T) {
}
tests := []struct {
description string
tieBreaker tieBreaker
priority PriorityType
expectedMsg wrp.Message
}{
{
description: "drop incoming low priority messages while prioritizing older messages",
tieBreaker: PriorityOldestMsg,
priority: OldestType,
expectedMsg: smallLowQOSMsgOldest,
},
{
description: "drop incoming low priority messages while prioritizing newer messages",
tieBreaker: PriorityNewestMsg,
priority: NewestType,
expectedMsg: smallLowQOSMsgNewest,
},
}
Expand All @@ -74,8 +74,12 @@ func testEnqueueDequeueAgePriority(t *testing.T) {
pq := priorityQueue{
maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)),
maxMessageBytes: len(smallLowQOSMsgOldest.Payload),
tieBreaker: tc.tieBreaker,
}

var err error
pq.tieBreaker, pq.trimTieBreaker, err = priority(tc.priority)
require.NoError(err)

for _, msg := range messages {
pq.Enqueue(msg)
}
Expand Down Expand Up @@ -173,6 +177,14 @@ func testEnqueueDequeue(t *testing.T) {
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
},
{
description: "drop incoming low priority messages",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload) * 2,
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 2,
expectedDequeueSequence: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
},
{
description: "remove some low priority messages to fit a higher priority message",
messages: []wrp.Message{mediumMediumQosMsg, mediumMediumQosMsg, mediumMediumQosMsg, largeCriticalQOSMsg},
Expand Down Expand Up @@ -203,8 +215,12 @@ func testEnqueueDequeue(t *testing.T) {
pq := priorityQueue{
maxQueueBytes: int64(tc.maxQueueBytes),
maxMessageBytes: tc.maxMessageBytes,
tieBreaker: PriorityNewestMsg,
}

var err error
pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType)
require.NoError(err)

for _, msg := range tc.messages {
pq.Enqueue(msg)
}
Expand All @@ -227,19 +243,26 @@ func testEnqueueDequeue(t *testing.T) {

func testSize(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
msg := wrp.Message{
Destination: "mac:00deadbeef00/config",
Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"),
}
pq := priorityQueue{tieBreaker: PriorityNewestMsg}
pq := priorityQueue{}

var err error
pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType)
require.NoError(err)

assert.Equal(int64(0), pq.sizeBytes)
pq.Push(msg)
pq.Push(msg)
assert.Equal(int64(len(msg.Payload)*2), pq.sizeBytes)
}

func testLen(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
pq := priorityQueue{queue: []item{
{
msg: wrp.Message{
Expand All @@ -253,10 +276,11 @@ func testLen(t *testing.T) {
},
timestamp: time.Now(),
},
},
tieBreaker: PriorityNewestMsg,
}
}}

var err error
pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType)
require.NoError(err)
assert.Equal(len(pq.queue), pq.Len())
}

Expand Down Expand Up @@ -285,33 +309,32 @@ func testLess(t *testing.T) {
tests := []struct {
description string
priority PriorityType
tieBreaker tieBreaker
}{
{
description: "less",
priority: NewestType,
tieBreaker: PriorityNewestMsg,
},
{
description: "tie breaker prioritizing newer messages",
priority: NewestType,
tieBreaker: PriorityNewestMsg,
},
{
description: "tie breaker prioritizing older messages",
priority: OldestType,
tieBreaker: PriorityOldestMsg,
},
}
for _, tc := range tests {
t.Run(tc.description, func(t *testing.T) {
assert := assert.New(t)

require := require.New(t)
pq := priorityQueue{
queue: []item{oldestMsg, newestMsg, tieBreakerMsg},
tieBreaker: tc.tieBreaker,
queue: []item{oldestMsg, newestMsg, tieBreakerMsg},
}

var err error
pq.tieBreaker, pq.trimTieBreaker, err = priority(tc.priority)
require.NoError(err)

// wrp.QOSCriticalValue > wrp.QOSLowValue
assert.True(pq.Less(0, 1))
// wrp.QOSLowValue > wrp.QOSCriticalValue
Expand All @@ -334,6 +357,7 @@ func testLess(t *testing.T) {

func testSwap(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
msg0 := wrp.Message{
Destination: "mac:00deadbeef00/config",
}
Expand All @@ -355,10 +379,11 @@ func testSwap(t *testing.T) {
msg: msg2,
timestamp: time.Now(),
},
},
tieBreaker: PriorityNewestMsg,
}
}}

var err error
pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType)
require.NoError(err)
pq.Swap(0, 2)
// pq.queue[0] should contain msg2
assert.Equal(msg2, pq.queue[0].msg)
Expand All @@ -368,6 +393,7 @@ func testSwap(t *testing.T) {

func testPush(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
messages := []wrp.Message{
{
Destination: "mac:00deadbeef00/config",
Expand All @@ -379,7 +405,12 @@ func testPush(t *testing.T) {
Destination: "mac:00deadbeef02/config",
},
}
pq := priorityQueue{tieBreaker: PriorityNewestMsg}
pq := priorityQueue{}

var err error
pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType)
require.NoError(err)

for _, msg := range messages {
pq.Push(msg)
assert.Equal(msg, pq.queue[pq.Len()-1].msg)
Expand Down Expand Up @@ -435,7 +466,12 @@ func testPop(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

pq := priorityQueue{queue: tc.items, tieBreaker: PriorityNewestMsg}
pq := priorityQueue{queue: tc.items}

var err error
pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType)
require.NoError(err)

// no sorting is applied, Pop will pop the last message from priorityQueue's queue
switch msg := pq.Pop().(type) {
case nil:
Expand Down
3 changes: 3 additions & 0 deletions internal/wrphandlers/qos/qos.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Handler struct {
priority PriorityType
// tieBreaker breaks any QualityOfService ties.
tieBreaker tieBreaker
// trimTieBreaker breaks any QualityOfService ties during queue trimming.
trimTieBreaker tieBreaker
// maxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload.
maxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
Expand Down Expand Up @@ -130,6 +132,7 @@ func (h *Handler) serviceQOS(queue <-chan wrp.Message) {
maxQueueBytes: h.maxQueueBytes,
maxMessageBytes: h.maxMessageBytes,
tieBreaker: h.tieBreaker,
trimTieBreaker: h.trimTieBreaker,
}
for {
select {
Expand Down

0 comments on commit 150f0a6

Please sign in to comment.