From 5eed84a6ef3ab6706a12251da8589c097d1b256e Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Thu, 1 Aug 2024 16:16:47 -0400 Subject: [PATCH] feat: support disableable qos message size validation - disableable qos message size validation - this will match parodus's default behavior - no individual message size validation --- internal/wrphandlers/qos/options.go | 2 -- internal/wrphandlers/qos/priority_queue.go | 6 ++++-- internal/wrphandlers/qos/priority_queue_test.go | 6 ++++++ internal/wrphandlers/qos/qos_test.go | 13 ++++++++++++- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/internal/wrphandlers/qos/options.go b/internal/wrphandlers/qos/options.go index 4904f5b..bc3ce2a 100644 --- a/internal/wrphandlers/qos/options.go +++ b/internal/wrphandlers/qos/options.go @@ -37,8 +37,6 @@ func MaxMessageBytes(s int) Option { func(h *Handler) error { if s < 0 { return fmt.Errorf("%w: negative MaxMessageBytes", ErrMisconfiguredQOS) - } else if s == 0 { - s = DefaultMaxMessageBytes } h.maxMessageBytes = s diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 39a59cb..9397306 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -21,7 +21,8 @@ type priorityQueue struct { queue []item // tieBreaker breaks any QualityOfService ties. tieBreaker tieBreaker - // maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads + // maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads. + // Zero value will disable individual message size validation. maxQueueBytes int64 // MaxMessageBytes is the largest allowable wrp message payload. maxMessageBytes int @@ -52,7 +53,8 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) { // Enqueue queues the given message. func (pq *priorityQueue) Enqueue(msg wrp.Message) error { // Check whether msg violates maxMessageBytes. - if len(msg.Payload) > pq.maxMessageBytes { + // The zero value of `pq.maxMessageBytes` will disable individual message size validation. + if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes { return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes) } diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index e6ca034..817b149 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -166,6 +166,12 @@ func testEnqueueDequeue(t *testing.T) { maxMessageBytes: len(largeCriticalQOSMsg.Payload) - 1, expectedQueueSize: 0, }, + { + description: "allow any message size", + messages: []wrp.Message{largeCriticalQOSMsg}, + maxQueueBytes: len(largeCriticalQOSMsg.Payload), + expectedQueueSize: 1, + }, { description: "message too large with a nonempty queue", messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg}, diff --git a/internal/wrphandlers/qos/qos_test.go b/internal/wrphandlers/qos/qos_test.go index c643718..38a0069 100644 --- a/internal/wrphandlers/qos/qos_test.go +++ b/internal/wrphandlers/qos/qos_test.go @@ -45,6 +45,17 @@ func TestHandler_HandleWrp(t *testing.T) { expectedHandleWRPErr error }{ // success cases + { + description: "enqueued and delivered message prioritizing newer messages with no message size restriction", + maxQueueBytes: 100, + priority: qos.NewestType, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, { description: "enqueued and delivered message prioritizing newer messages", maxQueueBytes: 100, @@ -103,7 +114,7 @@ func TestHandler_HandleWrp(t *testing.T) { { description: "zero MaxQueueBytes option value", maxQueueBytes: 0, - maxMessageBytes: 50, + maxMessageBytes: qos.DefaultMaxMessageBytes, priority: qos.NewestType, nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error {