From 9532dd8c99f0ad4f64400e88e416b67d4eaba33c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Sat, 24 Aug 2024 14:16:19 -0700 Subject: [PATCH] Rename the `FetchGroup` and `PullGroup` options to `WithPriorityGroup` and `PriorityGroup` respectively. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Validate in `Consume()` that the priority group option matches the consumer group(s) configuration. Signed-off-by: Jean-Noël Moyne --- jetstream/jetstream_options.go | 8 ++++---- jetstream/pull.go | 14 +++++++++++++ jetstream/test/consumer_test.go | 36 ++++++++++++++++++++++++--------- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/jetstream/jetstream_options.go b/jetstream/jetstream_options.go index f13896627..fecb83e30 100644 --- a/jetstream/jetstream_options.go +++ b/jetstream/jetstream_options.go @@ -230,14 +230,14 @@ func (min PullMinAckPending) configureMessages(opts *consumeOpts) error { return nil } -type PullGroup string +type PriorityGroup string -func (group PullGroup) configureConsume(opts *consumeOpts) error { +func (group PriorityGroup) configureConsume(opts *consumeOpts) error { opts.Group = string(group) return nil } -func (group PullGroup) configureMessages(opts *consumeOpts) error { +func (group PriorityGroup) configureMessages(opts *consumeOpts) error { opts.Group = string(group) return nil } @@ -327,7 +327,7 @@ func FetchMinAckPending(min int64) FetchOpt { } } -func FetchGroup(group string) FetchOpt { +func WithPriorityGroup(group string) FetchOpt { return func(req *pullRequest) error { req.Group = group return nil diff --git a/jetstream/pull.go b/jetstream/pull.go index c166dc575..954aa17ac 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "math" + "slices" "sync" "sync/atomic" "time" @@ -185,6 +186,19 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( if err != nil { return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) } + + if len(p.info.Config.PriorityGroups) != 0 { + if consumeOpts.Group == "" { + return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is required for priority consumer") + } + + if !slices.Contains(p.info.Config.PriorityGroups, consumeOpts.Group) { + return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "invalid priority group") + } + } else if consumeOpts.Group != "" { + return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is not supported for this consumer") + } + p.Lock() subject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiRequestNextT, p.stream, p.name)) diff --git a/jetstream/test/consumer_test.go b/jetstream/test/consumer_test.go index cb476caf0..f2429cc8c 100644 --- a/jetstream/test/consumer_test.go +++ b/jetstream/test/consumer_test.go @@ -166,7 +166,7 @@ func TestConsumerOverflow(t *testing.T) { } // We are below overflow, so we should not get any moessages. - msgs, err := c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchMaxWait(1*time.Second), jetstream.FetchGroup("A")) + msgs, err := c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchMaxWait(1*time.Second), jetstream.WithPriorityGroup("A")) count := 0 for msg := range msgs.Messages() { msg.Ack() @@ -181,7 +181,7 @@ func TestConsumerOverflow(t *testing.T) { _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) } - msgs, err = c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchGroup("A")) + msgs, err = c.Fetch(10, jetstream.FetchMinPending(110), jetstream.WithPriorityGroup("A")) count = 0 for msg := range msgs.Messages() { msg.Ack() @@ -232,7 +232,7 @@ func TestConsumerPinned(t *testing.T) { _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) } - msgs, err := c.Messages(jetstream.PullGroup("A")) + msgs, err := c.Messages(jetstream.PriorityGroup("A")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -251,7 +251,7 @@ func TestConsumerPinned(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - noMsgs, err := second.Messages(jetstream.PullGroup("A")) + noMsgs, err := second.Messages(jetstream.PriorityGroup("A")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -326,12 +326,28 @@ func TestConsumerPinned(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + + // test priority group validation + // invalid priority group + _, err = initialyPinned.Consume(func(m jetstream.Msg) { + }, jetstream.PriorityGroup("BAD")) + if err == nil || err.Error() != "nats: invalid jetstream option: invalid priority group" { + t.Fatalf("Expected invalid priority group error") + } + + // no priority group + _, err = initialyPinned.Consume(func(m jetstream.Msg) { + }) + if err == nil || err.Error() != "nats: invalid jetstream option: priority group is required for priority consumer" { + t.Fatalf("Expected invalid priority group error") + } + count := 0 ip, err := initialyPinned.Consume(func(m jetstream.Msg) { m.Ack() count++ gcount <- struct{}{} - }, jetstream.PullThresholdMessages(10), jetstream.PullGroup("A")) + }, jetstream.PullThresholdMessages(10), jetstream.PriorityGroup("A")) defer ip.Stop() // Second consume instance that should remain passive. @@ -340,7 +356,7 @@ func TestConsumerPinned(t *testing.T) { m.Ack() notPinnedC++ gcount <- struct{}{} - }, jetstream.PullGroup("A")) + }, jetstream.PriorityGroup("A")) defer np.Stop() outer: @@ -413,7 +429,7 @@ func TestConsumerPinned(t *testing.T) { // Initial fetch. // Should get all messages and get a Pin ID. - msgs, err := c.Fetch(10, jetstream.FetchGroup("A")) + msgs, err := c.Fetch(10, jetstream.WithPriorityGroup("A")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -438,7 +454,7 @@ func TestConsumerPinned(t *testing.T) { // Different cdiff, err := js.Consumer(ctx, "foo", "cons") - msgs2, err := cdiff.Fetch(10, jetstream.FetchMaxWait(1*time.Second), jetstream.FetchGroup("A")) + msgs2, err := cdiff.Fetch(10, jetstream.FetchMaxWait(1*time.Second), jetstream.WithPriorityGroup("A")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -457,7 +473,7 @@ func TestConsumerPinned(t *testing.T) { count = 0 // the same again, should be fine - msgs3, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.FetchGroup("A")) + msgs3, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.WithPriorityGroup("A")) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -479,7 +495,7 @@ func TestConsumerPinned(t *testing.T) { count = 0 time.Sleep(10 * time.Second) // The same instance, should work fine. - msgs4, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.FetchGroup("A")) + msgs4, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.WithPriorityGroup("A")) if err != nil { t.Fatalf("Unexpected error: %v", err) }