Skip to content

Commit

Permalink
AMQP consumer shutdown improvements
Browse files Browse the repository at this point in the history
Without an exlicit session.Close(), RMQ would
sometimes attempt to delete the queue used by the consumer
before the session process was stopped, leading to an
error in the logs.
  • Loading branch information
mkuratczyk committed Nov 28, 2024
1 parent dc79fc9 commit 15a3ce4
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
outcome, err := c.outcome(ctx, msg)
if err != nil {
if err == context.Canceled {
c.Stop("context canceled")
return
}
log.Error("failed to "+outcome+" message", "id", c.Id, "terminus", c.Terminus, "error", err)
Expand Down Expand Up @@ -231,6 +232,7 @@ func pastTense(outcome string) string {
}

func (c *Amqp10Consumer) Stop(reason string) {
_ = c.Session.Close(context.Background())
err := c.Connection.Close()
if err != nil {
log.Info("consumer stopped with an error", "id", c.Id, "error", err.Error())
Expand Down

0 comments on commit 15a3ce4

Please sign in to comment.