diff --git a/broker/nats.go b/broker/nats.go index 85b39623..aa0e2dd0 100644 --- a/broker/nats.go +++ b/broker/nats.go @@ -364,7 +364,7 @@ func (n *NATS) addStreamConsumer(stream string) { createConsumer: prefixedStream := streamPrefix + stream - _, err = n.jconsumers.fetch(stream, func() (jetstream.Consumer, error) { // nolint:errcheck + _, cerr := n.jconsumers.fetch(stream, func() (jetstream.Consumer, error) { // nolint:errcheck cons, err := n.js.CreateConsumer(context.Background(), prefixedStream, jetstream.ConsumerConfig{ AckPolicy: jetstream.AckNonePolicy, }) @@ -411,8 +411,8 @@ createConsumer: n.js.DeleteConsumer(context.Background(), prefixedStream, name) // nolint:errcheck }) - if err != nil { - if context.DeadlineExceeded == err { + if cerr != nil { + if context.DeadlineExceeded == cerr { if attempts > 0 { attempts-- n.log.Warnf("failed to create consumer for stream %s, retrying in 500ms...", stream)