Skip to content

Commit

Permalink
Don't apply stream offset by default
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Nov 27, 2023
1 parent f4eb0d6 commit 8e11614
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
4 changes: 3 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ func RootCmd() *cobra.Command {

// parse stream offset
switch streamOffset {
case "":
cfg.StreamOffset = nil
case "next", "first", "last":
cfg.StreamOffset = streamOffset
default:
Expand Down Expand Up @@ -199,7 +201,7 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().StringVar(&cfg.Amqp.Subject, "amqp-subject", "", "AMQP 1.0 message subject")
rootCmd.PersistentFlags().
BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)")
rootCmd.PersistentFlags().StringVar(&streamOffset, "stream-offset", "next", "Stream consumer offset specification (default=next)")
rootCmd.PersistentFlags().StringVar(&streamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down
7 changes: 5 additions & 2 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ func (c Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
case config.UnsettledState:
durability = amqp.DurabilityUnsettledState
}
filters := amqp.NewLinkFilter("rabbitmq:stream-offset-spec", 0, c.Config.StreamOffset)
receiver, err := c.Session.NewReceiver(context.TODO(), c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.Amqp.ConsumerCredits), Filters: []amqp.LinkFilter{filters}})
var filter amqp.LinkFilter
if c.Config.StreamOffset != nil {
filter = amqp.NewLinkFilter("rabbitmq:stream-offset-spec", 0, c.Config.StreamOffset)
}
receiver, err := c.Session.NewReceiver(context.TODO(), c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.Amqp.ConsumerCredits), Filters: []amqp.LinkFilter{filter}})
if err != nil {
log.Error("consumer failed to create a receiver", "protocol", "amqp-1.0", "consumerId", c.Id, "error", err.Error())
return
Expand Down

0 comments on commit 8e11614

Please sign in to comment.