Skip to content

Commit

Permalink
Handle context cancellation in AMQP handleErrors
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 10, 2024
1 parent 21615d0 commit 97cf3ec
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions pkg/amqp10/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (p *Amqp10Publisher) SendAsync(ctx context.Context) error {
case <-ctx.Done():
return nil
default:
err = p.handleSendErrors(err)
err = p.handleSendErrors(ctx, err)
if err != nil {
return err
}
Expand All @@ -249,7 +249,7 @@ func (p *Amqp10Publisher) SendSync(ctx context.Context) error {
err := p.Sender.Send(context.TODO(), msg, nil)
latency := time.Since(startTime)
log.Debug("message sent", "id", p.Id, "destination", p.Terminus, "latency", latency, "appProps", msg.ApplicationProperties)
err = p.handleSendErrors(err)
err = p.handleSendErrors(ctx, err)
if err != nil {
return err
}
Expand All @@ -262,19 +262,29 @@ func (p *Amqp10Publisher) SendSync(ctx context.Context) error {
// handleSendErrors returns an error if the error suggests we should reconnect
// (this is native, but amqp-go-client should handle this better in the future)
// otherwise we log an error but return nil to keep publishing
func (p *Amqp10Publisher) handleSendErrors(err error) error {
var connErr *amqp.ConnError
var linkErr *amqp.LinkError
if errors.As(err, &connErr) {
log.Error("publisher connection failure; reconnecting...", "id", p.Id, "error", connErr.Error())
return err
} else if errors.As(err, &linkErr) {
log.Error("publisher link failure; reconnecting...", "id", p.Id, "error", connErr.Error())
return err
} else if err != nil {
log.Error("message sending failure", "id", p.Id, "error", err)
func (p *Amqp10Publisher) handleSendErrors(ctx context.Context, err error) error {
select {
case <-ctx.Done():
return nil
default:
var connErr *amqp.ConnError
var linkErr *amqp.LinkError
if errors.As(err, &connErr) {
log.Error("publisher connection failure; reconnecting...", "id", p.Id, "error", connErr.Error())
return err
}

if errors.As(err, &linkErr) {
log.Error("publisher link failure; reconnecting...", "id", p.Id, "error", connErr.Error())
return err
}

if err != nil {
log.Error("message sending failure", "id", p.Id, "error", err)
}

return nil
}
return nil
}

func (p *Amqp10Publisher) handleSent(receipt *amqp.SendReceipt, published time.Time) {
Expand Down

0 comments on commit 97cf3ec

Please sign in to comment.