From 8e11614357ab70d1f48c62a323d78b469c1e6c83 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Mon, 27 Nov 2023 10:56:49 +0100 Subject: [PATCH] Don't apply stream offset by default --- cmd/root.go | 4 +++- pkg/amqp10_client/consumer.go | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index b12aff3..c12709e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -159,6 +159,8 @@ func RootCmd() *cobra.Command { // parse stream offset switch streamOffset { + case "": + cfg.StreamOffset = nil case "next", "first", "last": cfg.StreamOffset = streamOffset default: @@ -199,7 +201,7 @@ func RootCmd() *cobra.Command { rootCmd.PersistentFlags().StringVar(&cfg.Amqp.Subject, "amqp-subject", "", "AMQP 1.0 message subject") rootCmd.PersistentFlags(). BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)") - rootCmd.PersistentFlags().StringVar(&streamOffset, "stream-offset", "next", "Stream consumer offset specification (default=next)") + rootCmd.PersistentFlags().StringVar(&streamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)") rootCmd.AddCommand(amqp_amqp) rootCmd.AddCommand(amqp_stomp) diff --git a/pkg/amqp10_client/consumer.go b/pkg/amqp10_client/consumer.go index 6d0a65c..45e9996 100644 --- a/pkg/amqp10_client/consumer.go +++ b/pkg/amqp10_client/consumer.go @@ -59,8 +59,11 @@ func (c Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) { case config.UnsettledState: durability = amqp.DurabilityUnsettledState } - filters := amqp.NewLinkFilter("rabbitmq:stream-offset-spec", 0, c.Config.StreamOffset) - receiver, err := c.Session.NewReceiver(context.TODO(), c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.Amqp.ConsumerCredits), Filters: []amqp.LinkFilter{filters}}) + var filter amqp.LinkFilter + if c.Config.StreamOffset != nil { + filter = amqp.NewLinkFilter("rabbitmq:stream-offset-spec", 0, c.Config.StreamOffset) + } + receiver, err := c.Session.NewReceiver(context.TODO(), c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.Amqp.ConsumerCredits), Filters: []amqp.LinkFilter{filter}}) if err != nil { log.Error("consumer failed to create a receiver", "protocol", "amqp-1.0", "consumerId", c.Id, "error", err.Error()) return