Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a timestamp component to qos in order to prioritize older/newer messages #148

Merged
merged 5 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/xmidt-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type QOS struct {
MaxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
MaxMessageBytes int
// PrioritizeOldest determines whether to prioritize the oldest message during a QualityOfService tie breaker,
// with the default being to prioritize the newest messages.
PrioritizeOldest bool
denopink marked this conversation as resolved.
Show resolved Hide resolved
}

type Pubsub struct {
Expand Down
1 change: 1 addition & 0 deletions cmd/xmidt-agent/wrphandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func provideQOSHandler(in qosIn) (*qos.Handler, error) {
in.WS,
qos.MaxQueueBytes(in.QOS.MaxQueueBytes),
qos.MaxMessageBytes(in.QOS.MaxMessageBytes),
qos.PrioritizeOldest(in.QOS.PrioritizeOldest),
)
}

Expand Down
11 changes: 11 additions & 0 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,14 @@ func MaxMessageBytes(s int) Option {
return nil
})
}

// PrioritizeOldest determines whether to prioritize the oldest message during a QualityOfService tie breaker,
// with the default being to prioritize the newest messages.
func PrioritizeOldest(b bool) Option {
return optionFunc(
func(h *Handler) error {
h.prioritizeOldest = b
denopink marked this conversation as resolved.
Show resolved Hide resolved

return nil
})
}
78 changes: 63 additions & 15 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"container/heap"
"errors"
"fmt"
"time"

"github.com/xmidt-org/wrp-go/v3"
)
Expand All @@ -17,7 +18,14 @@ var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes
// https://xmidt.io/docs/wrp/basics/#qos-description-qos
type priorityQueue struct {
// queue for wrp messages, ingested by serviceQOS
queue []wrp.Message
queue []item
// prioritizeOldest determines whether to prioritize the oldest message during a QualityOfService tie breaker,
// with the default being to prioritize the newest messages.
prioritizeOldest bool
// prioritizeLowestQOS determines whether to prioritize messages with the lowest QualityOfService,
denopink marked this conversation as resolved.
Show resolved Hide resolved
// with the default being to prioritize the highest QualityOfService messages.
// Only used during queue trimming.
prioritizeLowestQOS bool
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
maxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
Expand All @@ -27,6 +35,11 @@ type priorityQueue struct {
sizeBytes int64
}

type item struct {
msg wrp.Message
timestamp time.Time
}

// Dequeue returns the next highest priority message.
func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
// Required, otherwise heap.Pop will panic during an internal Swap call.
Expand All @@ -51,34 +64,69 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
return nil
}

// trim removes messages with the lowest QualityOfService (taking `prioritizeOldest` into account)
// until the queue no longer violates `maxQueueSize“.
func (pq *priorityQueue) trim() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced this since we described a desire to keep messages with higher qos value taking the age of the message into account as well

if pq.sizeBytes <= pq.maxQueueBytes {
return
}

// Prioritize messages with the lowest QualityOfService such that `pq.Pop()` will return said messages.
pq.prioritizeLowestQOS = true
defer func() {
// Re-prioritize messages with the highest QualityOfService.
pq.prioritizeLowestQOS = false
// heap.Init() is required since the prioritization was switch to messages with the highest QualityOfService.
// The complexity of heap.Init() is O(n) where n = h.Len().
heap.Init(pq)
}()

// heap.Init() is required since the prioritization was switch to messages with the lowest QualityOfService.
// The complexity of heap.Init() is O(n) where n = h.Len().
heap.Init(pq)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we end up needing to trim often, then the O(n) cost of heap.Init() may start to sting...

I wanted to point that out in case we can't afford the O(n) cost of heap.Init() and we would rather trim() random messages instead.

// trim until the queue no longer violates maxQueueBytes.
for pq.sizeBytes > pq.maxQueueBytes {
// Note, priorityQueue.drop does not drop the least prioritized queued message.
// i.e.: a high priority queued message may be dropped instead of a lesser queued message.
pq.drop()
// Dequeue messages with the lowest QualityOfService.
pq.Dequeue()
}
}

func (pq *priorityQueue) drop() {
_ = heap.Remove(pq, pq.Len()-1).(wrp.Message)
}

// heap.Interface related implementations https://pkg.go.dev/container/heap#Interface

func (pq *priorityQueue) Len() int { return len(pq.queue) }

func (pq *priorityQueue) Less(i, j int) bool {
return pq.queue[i].QualityOfService > pq.queue[j].QualityOfService
iQOS := pq.queue[i].msg.QualityOfService
jQOS := pq.queue[j].msg.QualityOfService
iTimestamp := pq.queue[i].timestamp
jTimestamp := pq.queue[j].timestamp

denopink marked this conversation as resolved.
Show resolved Hide resolved
// Determine whether a tie breaker is required.
if iQOS == jQOS {
if pq.prioritizeOldest {
// Prioritize the oldest messages.
return iTimestamp.Before(jTimestamp)
}

// Prioritize the newest messages.
return iTimestamp.After(jTimestamp)
} else if pq.prioritizeLowestQOS {
// Prioritize messages with the lowest QualityOfService.
return iQOS < jQOS
}

// Prioritize messages with the highest QualityOfService.
return iQOS > jQOS

}

func (pq *priorityQueue) Swap(i, j int) {
pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
}

func (pq *priorityQueue) Push(x any) {
item := x.(wrp.Message)
pq.sizeBytes += int64(len(item.Payload))
item := item{msg: x.(wrp.Message), timestamp: time.Now()}
pq.sizeBytes += int64(len(item.msg.Payload))
pq.queue = append(pq.queue, item)
}

Expand All @@ -88,11 +136,11 @@ func (pq *priorityQueue) Pop() any {
return nil
}

item := pq.queue[last]
pq.sizeBytes -= int64(len(item.Payload))
msg := pq.queue[last].msg
pq.sizeBytes -= int64(len(msg.Payload))
// avoid memory leak
pq.queue[last] = wrp.Message{}
pq.queue[last] = item{}
pq.queue = pq.queue[0:last]

return item
return msg
}
Loading
Loading