Skip to content

Commit

Permalink
Rename the FetchGroup and PullGroup options to `WithPriorityGroup…
Browse files Browse the repository at this point in the history
…` and `PriorityGroup` respectively.

Validate in `Consume()` that the priority group option matches the consumer group(s) configuration.

Signed-off-by: Jean-Noël Moyne <[email protected]>
  • Loading branch information
jnmoyne committed Aug 24, 2024
1 parent ad081d9 commit 9532dd8
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 14 deletions.
8 changes: 4 additions & 4 deletions jetstream/jetstream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ func (min PullMinAckPending) configureMessages(opts *consumeOpts) error {
return nil
}

type PullGroup string
type PriorityGroup string

func (group PullGroup) configureConsume(opts *consumeOpts) error {
func (group PriorityGroup) configureConsume(opts *consumeOpts) error {
opts.Group = string(group)
return nil
}

func (group PullGroup) configureMessages(opts *consumeOpts) error {
func (group PriorityGroup) configureMessages(opts *consumeOpts) error {
opts.Group = string(group)
return nil
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func FetchMinAckPending(min int64) FetchOpt {
}
}

func FetchGroup(group string) FetchOpt {
func WithPriorityGroup(group string) FetchOpt {
return func(req *pullRequest) error {
req.Group = group
return nil
Expand Down
14 changes: 14 additions & 0 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"math"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -185,6 +186,19 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}

if len(p.info.Config.PriorityGroups) != 0 {
if consumeOpts.Group == "" {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is required for priority consumer")
}

if !slices.Contains(p.info.Config.PriorityGroups, consumeOpts.Group) {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "invalid priority group")
}
} else if consumeOpts.Group != "" {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is not supported for this consumer")
}

p.Lock()

subject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiRequestNextT, p.stream, p.name))
Expand Down
36 changes: 26 additions & 10 deletions jetstream/test/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestConsumerOverflow(t *testing.T) {
}

// We are below overflow, so we should not get any moessages.
msgs, err := c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchMaxWait(1*time.Second), jetstream.FetchGroup("A"))
msgs, err := c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchMaxWait(1*time.Second), jetstream.WithPriorityGroup("A"))
count := 0
for msg := range msgs.Messages() {
msg.Ack()
Expand All @@ -181,7 +181,7 @@ func TestConsumerOverflow(t *testing.T) {
_, err = js.Publish(ctx, "FOO.bar", []byte("hello"))
}

msgs, err = c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchGroup("A"))
msgs, err = c.Fetch(10, jetstream.FetchMinPending(110), jetstream.WithPriorityGroup("A"))
count = 0
for msg := range msgs.Messages() {
msg.Ack()
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestConsumerPinned(t *testing.T) {
_, err = js.Publish(ctx, "FOO.bar", []byte("hello"))
}

msgs, err := c.Messages(jetstream.PullGroup("A"))
msgs, err := c.Messages(jetstream.PriorityGroup("A"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -251,7 +251,7 @@ func TestConsumerPinned(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

noMsgs, err := second.Messages(jetstream.PullGroup("A"))
noMsgs, err := second.Messages(jetstream.PriorityGroup("A"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -326,12 +326,28 @@ func TestConsumerPinned(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// test priority group validation
// invalid priority group
_, err = initialyPinned.Consume(func(m jetstream.Msg) {
}, jetstream.PriorityGroup("BAD"))
if err == nil || err.Error() != "nats: invalid jetstream option: invalid priority group" {
t.Fatalf("Expected invalid priority group error")
}

// no priority group
_, err = initialyPinned.Consume(func(m jetstream.Msg) {
})
if err == nil || err.Error() != "nats: invalid jetstream option: priority group is required for priority consumer" {
t.Fatalf("Expected invalid priority group error")
}

count := 0
ip, err := initialyPinned.Consume(func(m jetstream.Msg) {
m.Ack()
count++
gcount <- struct{}{}
}, jetstream.PullThresholdMessages(10), jetstream.PullGroup("A"))
}, jetstream.PullThresholdMessages(10), jetstream.PriorityGroup("A"))
defer ip.Stop()

// Second consume instance that should remain passive.
Expand All @@ -340,7 +356,7 @@ func TestConsumerPinned(t *testing.T) {
m.Ack()
notPinnedC++
gcount <- struct{}{}
}, jetstream.PullGroup("A"))
}, jetstream.PriorityGroup("A"))
defer np.Stop()

outer:
Expand Down Expand Up @@ -413,7 +429,7 @@ func TestConsumerPinned(t *testing.T) {

// Initial fetch.
// Should get all messages and get a Pin ID.
msgs, err := c.Fetch(10, jetstream.FetchGroup("A"))
msgs, err := c.Fetch(10, jetstream.WithPriorityGroup("A"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -438,7 +454,7 @@ func TestConsumerPinned(t *testing.T) {

// Different
cdiff, err := js.Consumer(ctx, "foo", "cons")
msgs2, err := cdiff.Fetch(10, jetstream.FetchMaxWait(1*time.Second), jetstream.FetchGroup("A"))
msgs2, err := cdiff.Fetch(10, jetstream.FetchMaxWait(1*time.Second), jetstream.WithPriorityGroup("A"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -457,7 +473,7 @@ func TestConsumerPinned(t *testing.T) {
count = 0

// the same again, should be fine
msgs3, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.FetchGroup("A"))
msgs3, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.WithPriorityGroup("A"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -479,7 +495,7 @@ func TestConsumerPinned(t *testing.T) {
count = 0
time.Sleep(10 * time.Second)
// The same instance, should work fine.
msgs4, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.FetchGroup("A"))
msgs4, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.WithPriorityGroup("A"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit 9532dd8

Please sign in to comment.