Skip to content

Commit

Permalink
feat: add a timestamp component to qos in order to prioritize older/n…
Browse files Browse the repository at this point in the history
…ewer messages

- 100% test cover for `internal/wrphandlers/qos`
- added `PrioritizeOldest` options for qos
- if `PrioritizeOldest` is false, then qos will favor the newest message during a qos value tie breaker (vice versa)
- Updated `priorityQueue.trim()` to remove messages with the lowest qos value (taking `PrioritizeOldest` into account as well)
  • Loading branch information
denopink committed May 29, 2024
1 parent e4dfaff commit e525819
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 53 deletions.
3 changes: 3 additions & 0 deletions cmd/xmidt-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type QOS struct {
MaxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
MaxMessageBytes int
// PrioritizeOldest determines whether to prioritize the oldest message during a QualityOfService tie breaker,
// with the default being to prioritize the newest messages.
PrioritizeOldest bool
}

type Pubsub struct {
Expand Down
1 change: 1 addition & 0 deletions cmd/xmidt-agent/wrphandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func provideQOSHandler(in qosIn) (*qos.Handler, error) {
in.WS,
qos.MaxQueueBytes(in.QOS.MaxQueueBytes),
qos.MaxMessageBytes(in.QOS.MaxMessageBytes),
qos.PrioritizeOldest(in.QOS.PrioritizeOldest),
)
}

Expand Down
11 changes: 11 additions & 0 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,14 @@ func MaxMessageBytes(s int) Option {
return nil
})
}

// PrioritizeOldest determines whether to prioritize the oldest message during a QualityOfService tie breaker,
// with the default being to prioritize the newest messages.
func PrioritizeOldest(b bool) Option {
return optionFunc(
func(h *Handler) error {
h.prioritizeOldest = b

return nil
})
}
78 changes: 63 additions & 15 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"container/heap"
"errors"
"fmt"
"time"

"github.com/xmidt-org/wrp-go/v3"
)
Expand All @@ -17,7 +18,14 @@ var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes
// https://xmidt.io/docs/wrp/basics/#qos-description-qos
type priorityQueue struct {
// queue for wrp messages, ingested by serviceQOS
queue []wrp.Message
queue []item
// prioritizeOldest determines whether to prioritize the oldest message during a QualityOfService tie breaker,
// with the default being to prioritize the newest messages.
prioritizeOldest bool
// prioritizeLowestQOS determines whether to prioritize messages with the lowest QualityOfService,
// with the default being to prioritize the highest QualityOfService messages.
// Only used during queue trimming.
prioritizeLowestQOS bool
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
maxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
Expand All @@ -27,6 +35,11 @@ type priorityQueue struct {
sizeBytes int64
}

type item struct {
msg wrp.Message
timestamp time.Time
}

// Dequeue returns the next highest priority message.
func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
// Required, otherwise heap.Pop will panic during an internal Swap call.
Expand All @@ -51,34 +64,69 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
return nil
}

// trim removes messages with the lowest QualityOfService (taking `prioritizeOldest` into account)
// until the queue no longer violates `maxQueueSize“.
func (pq *priorityQueue) trim() {
if pq.sizeBytes <= pq.maxQueueBytes {
return
}

// Prioritize messages with the lowest QualityOfService such that `pq.Pop()` will return said messages.
pq.prioritizeLowestQOS = true
defer func() {
// Re-prioritize messages with the highest QualityOfService.
pq.prioritizeLowestQOS = false
// heap.Init() is required since the prioritization was switch to messages with the highest QualityOfService.
// The complexity of heap.Init() is O(n) where n = h.Len().
heap.Init(pq)
}()

// heap.Init() is required since the prioritization was switch to messages with the lowest QualityOfService.
// The complexity of heap.Init() is O(n) where n = h.Len().
heap.Init(pq)
// trim until the queue no longer violates maxQueueBytes.
for pq.sizeBytes > pq.maxQueueBytes {
// Note, priorityQueue.drop does not drop the least prioritized queued message.
// i.e.: a high priority queued message may be dropped instead of a lesser queued message.
pq.drop()
// Dequeue messages with the lowest QualityOfService.
pq.Dequeue()
}
}

func (pq *priorityQueue) drop() {
_ = heap.Remove(pq, pq.Len()-1).(wrp.Message)
}

// heap.Interface related implementations https://pkg.go.dev/container/heap#Interface

func (pq *priorityQueue) Len() int { return len(pq.queue) }

func (pq *priorityQueue) Less(i, j int) bool {
return pq.queue[i].QualityOfService > pq.queue[j].QualityOfService
iQOS := pq.queue[i].msg.QualityOfService
jQOS := pq.queue[j].msg.QualityOfService
iTimestamp := pq.queue[i].timestamp
jTimestamp := pq.queue[j].timestamp

// Determine whether a tie breaker is required.
if iQOS == jQOS {
if pq.prioritizeOldest {
// Prioritize the oldest messages.
return iTimestamp.Before(jTimestamp)
}

// Prioritize the newest messages.
return iTimestamp.After(jTimestamp)
} else if pq.prioritizeLowestQOS {
// Prioritize messages with the lowest QualityOfService.
return iQOS < jQOS
}

// Prioritize messages with the highest QualityOfService.
return iQOS > jQOS

}

func (pq *priorityQueue) Swap(i, j int) {
pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
}

func (pq *priorityQueue) Push(x any) {
item := x.(wrp.Message)
pq.sizeBytes += int64(len(item.Payload))
item := item{msg: x.(wrp.Message), timestamp: time.Now()}
pq.sizeBytes += int64(len(item.msg.Payload))
pq.queue = append(pq.queue, item)
}

Expand All @@ -88,11 +136,11 @@ func (pq *priorityQueue) Pop() any {
return nil
}

item := pq.queue[last]
pq.sizeBytes -= int64(len(item.Payload))
msg := pq.queue[last].msg
pq.sizeBytes -= int64(len(msg.Payload))
// avoid memory leak
pq.queue[last] = wrp.Message{}
pq.queue[last] = item{}
pq.queue = pq.queue[0:last]

return item
return msg
}
Loading

0 comments on commit e525819

Please sign in to comment.