Skip to content

Commit

Permalink
Fix setting deliver policy in Fetch() for OrderedConsumer
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Aug 5, 2024
1 parent e3df53d commit 4125b3b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
3 changes: 2 additions & 1 deletion jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

if c.serial != 1 {
// if the cursor is not yet set, use the provided deliver policy
if c.cursor.streamSeq != 0 {
return cfg
}

Expand Down
65 changes: 65 additions & 0 deletions jetstream/test/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,71 @@ func TestOrderedConsumerFetch(t *testing.T) {
}
})

t.Run("with custom deliver policy", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
msgs := make([]jetstream.Msg, 0)

for i := 0; i < 5; i++ {
if _, err := js.Publish(context.Background(), "FOO.A", []byte("msg")); err != nil {
t.Fatalf("Unexpected error during publish: %s", err)
}
}
for i := 0; i < 5; i++ {
if _, err := js.Publish(context.Background(), "FOO.B", []byte("msg")); err != nil {
t.Fatalf("Unexpected error during publish: %s", err)
}
}

c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{
DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

res, err := c.Fetch(int(c.CachedInfo().NumPending), jetstream.FetchMaxWait(1*time.Second))
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}

for msg := range res.Messages() {
msgs = append(msgs, msg)
}

if res.Error() != nil {
t.Fatalf("Unexpected error: %s", err)
}

if len(msgs) != 2 {
t.Fatalf("Expected %d messages; got: %d", 2, len(msgs))
}
expectedSubjects := []string{"FOO.A", "FOO.B"}

for i := range msgs {
if msgs[i].Subject() != expectedSubjects[i] {
t.Fatalf("Expected subject: %s; got: %s", expectedSubjects[i], msgs[i].Subject())
}
}
})

t.Run("consumer used as consume", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
Expand Down

0 comments on commit 4125b3b

Please sign in to comment.