Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support disableable qos message size validation #209

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func MaxMessageBytes(s int) Option {
func(h *Handler) error {
if s < 0 {
return fmt.Errorf("%w: negative MaxMessageBytes", ErrMisconfiguredQOS)
} else if s == 0 {
s = DefaultMaxMessageBytes
}

h.maxMessageBytes = s
Expand Down
6 changes: 4 additions & 2 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type priorityQueue struct {
queue []item
// tieBreaker breaks any QualityOfService ties.
tieBreaker tieBreaker
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads.
// Zero value will disable individual message size validation.
maxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
maxMessageBytes int
Expand Down Expand Up @@ -52,7 +53,8 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
// Enqueue queues the given message.
func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
// Check whether msg violates maxMessageBytes.
if len(msg.Payload) > pq.maxMessageBytes {
// The zero value of `pq.maxMessageBytes` will disable individual message size validation.
if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes {
return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
}

Expand Down
6 changes: 6 additions & 0 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ func testEnqueueDequeue(t *testing.T) {
maxMessageBytes: len(largeCriticalQOSMsg.Payload) - 1,
expectedQueueSize: 0,
},
{
description: "allow any message size",
messages: []wrp.Message{largeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
},
{
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
Expand Down
13 changes: 12 additions & 1 deletion internal/wrphandlers/qos/qos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func TestHandler_HandleWrp(t *testing.T) {
expectedHandleWRPErr error
}{
// success cases
{
description: "enqueued and delivered message prioritizing newer messages with no message size restriction",
maxQueueBytes: 100,
priority: qos.NewestType,
nextCallCount: 1,
next: wrpkit.HandlerFunc(func(wrp.Message) error {
nextCallCount.Add(1)

return nil
}),
},
{
description: "enqueued and delivered message prioritizing newer messages",
maxQueueBytes: 100,
Expand Down Expand Up @@ -103,7 +114,7 @@ func TestHandler_HandleWrp(t *testing.T) {
{
description: "zero MaxQueueBytes option value",
maxQueueBytes: 0,
maxMessageBytes: 50,
maxMessageBytes: qos.DefaultMaxMessageBytes,
priority: qos.NewestType,
nextCallCount: 1,
next: wrpkit.HandlerFunc(func(wrp.Message) error {
Expand Down
Loading