Skip to content

Commit

Permalink
feat: support disableable qos message size validation
Browse files Browse the repository at this point in the history
- disableable qos message size validation
- this will match parodus's default behavior
  - no individual message size validation
  • Loading branch information
denopink committed Aug 1, 2024
1 parent 31387eb commit f48fafb
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
2 changes: 0 additions & 2 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ func MaxQueueBytes(s int64) Option {
func(h *Handler) error {
if s < 0 {
return fmt.Errorf("%w: negative MaxQueueBytes", ErrMisconfiguredQOS)
} else if s == 0 {
s = DefaultMaxQueueBytes
}

h.maxQueueBytes = s
Expand Down
6 changes: 4 additions & 2 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type priorityQueue struct {
queue []item
// tieBreaker breaks any QualityOfService ties.
tieBreaker tieBreaker
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads.
// Zero value will disable individual message size validation.
maxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
maxMessageBytes int
Expand Down Expand Up @@ -52,7 +53,8 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
// Enqueue queues the given message.
func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
// Check whether msg violates maxMessageBytes.
if len(msg.Payload) > pq.maxMessageBytes {
// The zero value of `pq.maxMessageBytes` will disable individual message size validation.
if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes {
return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
}

Expand Down
6 changes: 6 additions & 0 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ func testEnqueueDequeue(t *testing.T) {
maxMessageBytes: len(largeCriticalQOSMsg.Payload) - 1,
expectedQueueSize: 0,
},
{
description: "allow any message size",
messages: []wrp.Message{largeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
},
{
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
Expand Down
13 changes: 12 additions & 1 deletion internal/wrphandlers/qos/qos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func TestHandler_HandleWrp(t *testing.T) {
expectedHandleWRPErr error
}{
// success cases
{
description: "enqueued and delivered message prioritizing newer messages with no message size restriction",
maxQueueBytes: 100,
priority: qos.NewestType,
nextCallCount: 1,
next: wrpkit.HandlerFunc(func(wrp.Message) error {
nextCallCount.Add(1)

return nil
}),
},
{
description: "enqueued and delivered message prioritizing newer messages",
maxQueueBytes: 100,
Expand Down Expand Up @@ -103,7 +114,7 @@ func TestHandler_HandleWrp(t *testing.T) {
{
description: "zero MaxQueueBytes option value",
maxQueueBytes: 0,
maxMessageBytes: 50,
maxMessageBytes: qos.DefaultMaxMessageBytes,
priority: qos.NewestType,
nextCallCount: 1,
next: wrpkit.HandlerFunc(func(wrp.Message) error {
Expand Down

0 comments on commit f48fafb

Please sign in to comment.