From 4125b3bd2bbf14e75f4db1598ed77a40cd67a535 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 5 Aug 2024 15:46:33 +0200 Subject: [PATCH] Fix setting deliver policy in Fetch() for OrderedConsumer Signed-off-by: Piotr Piotrowski --- jetstream/ordered.go | 3 +- jetstream/test/ordered_test.go | 65 ++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/jetstream/ordered.go b/jetstream/ordered.go index 85b7ea9e9..9f556bbca 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -583,7 +583,8 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig { cfg.InactiveThreshold = c.cfg.InactiveThreshold } - if c.serial != 1 { + // if the cursor is not yet set, use the provided deliver policy + if c.cursor.streamSeq != 0 { return cfg } diff --git a/jetstream/test/ordered_test.go b/jetstream/test/ordered_test.go index 522c92196..2a7960d0a 100644 --- a/jetstream/test/ordered_test.go +++ b/jetstream/test/ordered_test.go @@ -1119,6 +1119,71 @@ func TestOrderedConsumerFetch(t *testing.T) { } }) + t.Run("with custom deliver policy", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + msgs := make([]jetstream.Msg, 0) + + for i := 0; i < 5; i++ { + if _, err := js.Publish(context.Background(), "FOO.A", []byte("msg")); err != nil { + t.Fatalf("Unexpected error during publish: %s", err) + } + } + for i := 0; i < 5; i++ { + if _, err := js.Publish(context.Background(), "FOO.B", []byte("msg")); err != nil { + t.Fatalf("Unexpected error during publish: %s", err) + } + } + + c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{ + DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + res, err := c.Fetch(int(c.CachedInfo().NumPending), jetstream.FetchMaxWait(1*time.Second)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + for msg := range res.Messages() { + msgs = append(msgs, msg) + } + + if res.Error() != nil { + t.Fatalf("Unexpected error: %s", err) + } + + if len(msgs) != 2 { + t.Fatalf("Expected %d messages; got: %d", 2, len(msgs)) + } + expectedSubjects := []string{"FOO.A", "FOO.B"} + + for i := range msgs { + if msgs[i].Subject() != expectedSubjects[i] { + t.Fatalf("Expected subject: %s; got: %s", expectedSubjects[i], msgs[i].Subject()) + } + } + }) + t.Run("consumer used as consume", func(t *testing.T) { srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv)