diff --git a/internal/wrphandlers/qos/internal_options.go b/internal/wrphandlers/qos/internal_options.go index b986793..6801f67 100644 --- a/internal/wrphandlers/qos/internal_options.go +++ b/internal/wrphandlers/qos/internal_options.go @@ -33,8 +33,8 @@ func validatePriority() Option { func validateTieBreaker() Option { return optionFunc( func(h *Handler) error { - if h.tieBreaker == nil { - return errors.Join(fmt.Errorf("%w: nil tiebreak", ErrPriorityTypeInvalid), ErrMisconfiguredQOS) + if h.tieBreaker == nil || h.trimTieBreaker == nil { + return errors.Join(fmt.Errorf("%w: nil tiebreak/trimTieBreaker", ErrPriorityTypeInvalid), ErrMisconfiguredQOS) } return nil diff --git a/internal/wrphandlers/qos/options.go b/internal/wrphandlers/qos/options.go index 4904f5b..1ba2ba7 100644 --- a/internal/wrphandlers/qos/options.go +++ b/internal/wrphandlers/qos/options.go @@ -51,21 +51,31 @@ func MaxMessageBytes(s int) Option { // with the default being to prioritize the newest messages. func Priority(p PriorityType) Option { return optionFunc( - func(h *Handler) error { - // Determine what will be used as a QualityOfService tie breaker. - switch p { - case NewestType: - // Prioritize the newest messages. - h.tieBreaker = PriorityNewestMsg - case OldestType: - // Prioritize the oldest messages. - h.tieBreaker = PriorityOldestMsg - default: - return errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, h.priority), ErrMisconfiguredQOS) - } - + func(h *Handler) (err error) { + h.tieBreaker, h.trimTieBreaker, err = priority(p) h.priority = p - return nil + return err }) } + +// priority determines which tie breakers are used during normal enqueueing and queue trimming. +func priority(p PriorityType) (enqueueTieBreaker tieBreaker, trimTieBreaker tieBreaker, err error) { + // Determine what will be used as a QualityOfService tie breaker during normal enqueueing and queue trimming. + switch p { + case NewestType: + // Prioritize the newest messages. + enqueueTieBreaker = PriorityNewestMsg + // Remove the oldest messages during trimming. + trimTieBreaker = PriorityOldestMsg + case OldestType: + // Prioritize the oldest messages. + enqueueTieBreaker = PriorityOldestMsg + // Remove the newest messages during trimming. + trimTieBreaker = PriorityNewestMsg + default: + return nil, nil, errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, p), ErrMisconfiguredQOS) + } + + return enqueueTieBreaker, trimTieBreaker, nil +} diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 39a59cb..de2515e 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -21,6 +21,12 @@ type priorityQueue struct { queue []item // tieBreaker breaks any QualityOfService ties. tieBreaker tieBreaker + // queueInTrimming indicates the queue is actively being trimmed, where messages with the lowest + // QualityOfService are prioritize and removed. + // Only used during `priorityQueue.trim()`. + queueInTrimming bool + // trimTieBreaker breaks any QualityOfService ties during queue trimming. + trimTieBreaker tieBreaker // maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads maxQueueBytes int64 // MaxMessageBytes is the largest allowable wrp message payload. @@ -61,19 +67,33 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error { return nil } +// trim removes messages with the lowest QualityOfService (taking `prioritizeOldest` into account) +// until the queue no longer violates `maxQueueSize“. func (pq *priorityQueue) trim() { + if pq.sizeBytes <= pq.maxQueueBytes { + return + } + + // Prioritize messages with the lowest QualityOfService such that `pq.Pop()` will return the message with. + pq.queueInTrimming = true + defer func() { + // Re-prioritize messages with the highest QualityOfService. + pq.queueInTrimming = false + // heap.Init() is required since the prioritization was switch to messages with the highest QualityOfService. + // The complexity of heap.Init() is O(n) where n = h.Len(). + heap.Init(pq) + }() + + // heap.Init() is required since the prioritization was switch to messages with the lowest QualityOfService. + // The complexity of heap.Init() is O(n) where n = h.Len(). + heap.Init(pq) // trim until the queue no longer violates maxQueueBytes. for pq.sizeBytes > pq.maxQueueBytes { - // Note, priorityQueue.drop does not drop the least prioritized queued message. - // i.e.: a high priority queued message may be dropped instead of a lesser queued message. - pq.drop() + // Dequeue messages with the lowest QualityOfService. + pq.Dequeue() } } -func (pq *priorityQueue) drop() { - _ = heap.Remove(pq, pq.Len()-1).(wrp.Message) -} - // heap.Interface related implementations https://pkg.go.dev/container/heap#Interface func (pq *priorityQueue) Len() int { return len(pq.queue) } @@ -84,9 +104,19 @@ func (pq *priorityQueue) Less(i, j int) bool { // Determine whether a tie breaker is required. if iQOS != jQOS { + if pq.queueInTrimming { + // Prioritize messages with the lowest QualityOfService. + return iQOS < jQOS + } + return iQOS > jQOS } + if pq.queueInTrimming { + // Tie breaker during queue trimming. + return pq.trimTieBreaker(iItem, jItem) + } + return pq.tieBreaker(iItem, jItem) } diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index e6ca034..e0bd1c9 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -52,17 +52,17 @@ func testEnqueueDequeueAgePriority(t *testing.T) { } tests := []struct { description string - tieBreaker tieBreaker + priority PriorityType expectedMsg wrp.Message }{ { description: "drop incoming low priority messages while prioritizing older messages", - tieBreaker: PriorityOldestMsg, + priority: OldestType, expectedMsg: smallLowQOSMsgOldest, }, { description: "drop incoming low priority messages while prioritizing newer messages", - tieBreaker: PriorityNewestMsg, + priority: NewestType, expectedMsg: smallLowQOSMsgNewest, }, } @@ -74,8 +74,12 @@ func testEnqueueDequeueAgePriority(t *testing.T) { pq := priorityQueue{ maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)), maxMessageBytes: len(smallLowQOSMsgOldest.Payload), - tieBreaker: tc.tieBreaker, } + + var err error + pq.tieBreaker, pq.trimTieBreaker, err = priority(tc.priority) + require.NoError(err) + for _, msg := range messages { pq.Enqueue(msg) } @@ -173,6 +177,14 @@ func testEnqueueDequeue(t *testing.T) { maxMessageBytes: len(largeCriticalQOSMsg.Payload), expectedQueueSize: 1, }, + { + description: "drop incoming low priority messages", + messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg}, + maxQueueBytes: len(largeCriticalQOSMsg.Payload) * 2, + maxMessageBytes: len(largeCriticalQOSMsg.Payload), + expectedQueueSize: 2, + expectedDequeueSequence: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg}, + }, { description: "remove some low priority messages to fit a higher priority message", messages: []wrp.Message{mediumMediumQosMsg, mediumMediumQosMsg, mediumMediumQosMsg, largeCriticalQOSMsg}, @@ -203,8 +215,12 @@ func testEnqueueDequeue(t *testing.T) { pq := priorityQueue{ maxQueueBytes: int64(tc.maxQueueBytes), maxMessageBytes: tc.maxMessageBytes, - tieBreaker: PriorityNewestMsg, } + + var err error + pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType) + require.NoError(err) + for _, msg := range tc.messages { pq.Enqueue(msg) } @@ -227,19 +243,26 @@ func testEnqueueDequeue(t *testing.T) { func testSize(t *testing.T) { assert := assert.New(t) + require := require.New(t) msg := wrp.Message{ Destination: "mac:00deadbeef00/config", Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), } - pq := priorityQueue{tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + + var err error + pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType) + require.NoError(err) assert.Equal(int64(0), pq.sizeBytes) pq.Push(msg) pq.Push(msg) assert.Equal(int64(len(msg.Payload)*2), pq.sizeBytes) } + func testLen(t *testing.T) { assert := assert.New(t) + require := require.New(t) pq := priorityQueue{queue: []item{ { msg: wrp.Message{ @@ -253,10 +276,11 @@ func testLen(t *testing.T) { }, timestamp: time.Now(), }, - }, - tieBreaker: PriorityNewestMsg, - } + }} + var err error + pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType) + require.NoError(err) assert.Equal(len(pq.queue), pq.Len()) } @@ -285,33 +309,32 @@ func testLess(t *testing.T) { tests := []struct { description string priority PriorityType - tieBreaker tieBreaker }{ { description: "less", priority: NewestType, - tieBreaker: PriorityNewestMsg, }, { description: "tie breaker prioritizing newer messages", priority: NewestType, - tieBreaker: PriorityNewestMsg, }, { description: "tie breaker prioritizing older messages", priority: OldestType, - tieBreaker: PriorityOldestMsg, }, } for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) - + require := require.New(t) pq := priorityQueue{ - queue: []item{oldestMsg, newestMsg, tieBreakerMsg}, - tieBreaker: tc.tieBreaker, + queue: []item{oldestMsg, newestMsg, tieBreakerMsg}, } + var err error + pq.tieBreaker, pq.trimTieBreaker, err = priority(tc.priority) + require.NoError(err) + // wrp.QOSCriticalValue > wrp.QOSLowValue assert.True(pq.Less(0, 1)) // wrp.QOSLowValue > wrp.QOSCriticalValue @@ -334,6 +357,7 @@ func testLess(t *testing.T) { func testSwap(t *testing.T) { assert := assert.New(t) + require := require.New(t) msg0 := wrp.Message{ Destination: "mac:00deadbeef00/config", } @@ -355,10 +379,11 @@ func testSwap(t *testing.T) { msg: msg2, timestamp: time.Now(), }, - }, - tieBreaker: PriorityNewestMsg, - } + }} + var err error + pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType) + require.NoError(err) pq.Swap(0, 2) // pq.queue[0] should contain msg2 assert.Equal(msg2, pq.queue[0].msg) @@ -368,6 +393,7 @@ func testSwap(t *testing.T) { func testPush(t *testing.T) { assert := assert.New(t) + require := require.New(t) messages := []wrp.Message{ { Destination: "mac:00deadbeef00/config", @@ -379,7 +405,12 @@ func testPush(t *testing.T) { Destination: "mac:00deadbeef02/config", }, } - pq := priorityQueue{tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + + var err error + pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType) + require.NoError(err) + for _, msg := range messages { pq.Push(msg) assert.Equal(msg, pq.queue[pq.Len()-1].msg) @@ -435,7 +466,12 @@ func testPop(t *testing.T) { assert := assert.New(t) require := require.New(t) - pq := priorityQueue{queue: tc.items, tieBreaker: PriorityNewestMsg} + pq := priorityQueue{queue: tc.items} + + var err error + pq.tieBreaker, pq.trimTieBreaker, err = priority(NewestType) + require.NoError(err) + // no sorting is applied, Pop will pop the last message from priorityQueue's queue switch msg := pq.Pop().(type) { case nil: diff --git a/internal/wrphandlers/qos/qos.go b/internal/wrphandlers/qos/qos.go index ff94e96..4039457 100644 --- a/internal/wrphandlers/qos/qos.go +++ b/internal/wrphandlers/qos/qos.go @@ -38,6 +38,8 @@ type Handler struct { priority PriorityType // tieBreaker breaks any QualityOfService ties. tieBreaker tieBreaker + // trimTieBreaker breaks any QualityOfService ties during queue trimming. + trimTieBreaker tieBreaker // maxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload. maxQueueBytes int64 // MaxMessageBytes is the largest allowable wrp message payload. @@ -130,6 +132,7 @@ func (h *Handler) serviceQOS(queue <-chan wrp.Message) { maxQueueBytes: h.maxQueueBytes, maxMessageBytes: h.maxMessageBytes, tieBreaker: h.tieBreaker, + trimTieBreaker: h.trimTieBreaker, } for { select {