diff --git a/Makefile b/Makefile index 3c9399c..e6f439d 100644 --- a/Makefile +++ b/Makefile @@ -51,11 +51,11 @@ update-proxy-cache: @GOPROXY=https://proxy.golang.org go get github.com/danlock/rmq release: + @$(MAKE) deps ifeq ($(findstring dirty,$(SHORTBUILDTAG)),dirty) @echo "Version $(SHORTBUILDTAG) is filthy, commit to clean it" && exit 1 endif @read -t 5 -p "$(SHORTBUILDTAG) will be the new released version. Hit enter to proceed, CTRL-C to cancel." - @$(MAKE) deps @$(MAKE) test @$(MAKE) bench @git tag $(SHORTBUILDTAG) diff --git a/README.md b/README.md index bcb8a7c..3a27474 100644 --- a/README.md +++ b/README.md @@ -49,16 +49,14 @@ cfg := rmq.Args{Log: slog.Log} rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: cfg}, os.Getenv("AMQP_URL"), amqp.Config{}) consCfg := rmq.ConsumerArgs{ - Args: cfg, - Queue: rmq.Queue{Name: "q2d2", AutoDelete: true}, - Qos: rmq.Qos{PrefetchCount: 1000}, + Args: cfg, + Queue: rmq.Queue{Name: "q2d2", AutoDelete: true}, + Qos: rmq.Qos{PrefetchCount: 1000}, } rmq.NewConsumer(rmqConn, consCfg).ConsumeConcurrently(ctx, 100, func(ctx context.Context, msg amqp.Delivery) { process(msg) - if err := msg.Ack(false); err != nil { - handleErr(err) - } + handleAckErr(msg.Ack(false)) }) ``` diff --git a/connection.go b/connection.go index 2d308cd..5eb2077 100644 --- a/connection.go +++ b/connection.go @@ -246,15 +246,19 @@ func (c *Connection) safeChannel(ctx context.Context, amqpConn AMQPConnection) ( go func() { var resp internal.ChanResp[*amqp.Channel] resp.Val, resp.Err = amqpConn.Channel() - respChan <- resp + // If our contexts timed out, close successfully created channels within this goroutine. + if resp.Err == nil && resp.Val != nil && (c.ctx.Err() != nil || ctx.Err() != nil) { + if err := resp.Val.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) { + c.config.Log(ctx, slog.LevelError, logPrefix+" failed to close channel due to err: %+v", err) + } + } else { + respChan <- resp + } }() select { - case <-c.ctx.Done(): // Close the current amqp.Connection when rmq.Connection is shutting down - if err := amqpConn.CloseDeadline(time.Now().Add(c.config.AMQPTimeout)); err != nil && !errors.Is(err, amqp.ErrClosed) { - c.config.Log(c.ctx, slog.LevelError, logPrefix+" failed to close connection due to err: %+v", err) - } - return nil, fmt.Errorf(logPrefix+" unable to complete before %w", context.Cause(c.ctx)) + case <-c.ctx.Done(): + return nil, fmt.Errorf(logPrefix+" unable to complete before Connection %w", context.Cause(c.ctx)) case <-ctx.Done(): return nil, fmt.Errorf(logPrefix+" unable to complete before %w", context.Cause(ctx)) case resp := <-respChan: diff --git a/consumer.go b/consumer.go index a2b7c55..25689fa 100644 --- a/consumer.go +++ b/consumer.go @@ -87,30 +87,28 @@ func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel, // Call them in a goroutine so we can timeout if necessary respChan := make(chan internal.ChanResp[<-chan amqp.Delivery], 1) - shouldLog := make(chan struct{}) start := time.Now() go func() { var r internal.ChanResp[<-chan amqp.Delivery] r.Val, r.Err = c.declareAndConsume(ctx, mqChan) - if r.Err != nil { + ctxDone := ctx.Err() != nil + // Close the channel on errors or if the context times out, so the amqp channel isn't leaked + if r.Err != nil || ctxDone { mqChanErr := mqChan.Close() if mqChanErr != nil && !errors.Is(mqChanErr, amqp.ErrClosed) { r.Err = errors.Join(r.Err, mqChanErr) } } - - select { - case <-shouldLog: + if ctxDone { + // Log our leaked goroutine's response whenever it finally finishes in case it has useful information. c.config.Log(ctx, slog.LevelWarn, logPrefix+" completed after it's context finished. It took %s. Err: %+v", time.Since(start), r.Err) - default: + } else { respChan <- r } }() select { case <-ctx.Done(): - // Log our leaked goroutine's response whenever it finally finishes in case it has useful information. - close(shouldLog) return nil, nil, fmt.Errorf(logPrefix+" unable to complete before context did due to %w", context.Cause(ctx)) case r := <-respChan: return mqChan, r.Val, r.Err diff --git a/healthcheck_int_test.go b/healthcheck_int_test.go index cb8a867..2eb1753 100644 --- a/healthcheck_int_test.go +++ b/healthcheck_int_test.go @@ -23,6 +23,7 @@ func TestHealthcheck(t *testing.T) { // Example shows how to write an unsophisticated healthcheck for a service intending to ensure it's rmq.Connection is capable of processing messages. // Even though rmq.Connection reconnects on errors, there can always be unforeseen networking/DNS/RNGesus issues // that necessitate a docker/kubernetes healthcheck restarting the service when unhealthy. +// While this is useful as an example, it wouldn't be used on production for several reasons, only one of which is the lack of reuse of AMQP connections and AMQP channels. func Example() { // Real applications should use a real context. If this healthcheck was called via HTTP request for example, // that HTTP request's context would be a good candidate. diff --git a/publisher.go b/publisher.go index c7b63a2..c2cdb07 100644 --- a/publisher.go +++ b/publisher.go @@ -92,6 +92,7 @@ func (p *Publisher) handleReturns(mqChan *amqp.Channel) { notifyReturns := mqChan.NotifyReturn(make(chan amqp.Return)) go func() { dropTimer := time.NewTimer(0) + <-dropTimer.C for r := range notifyReturns { if p.config.LogReturns { // A Body can be arbitrarily large and/or contain sensitve info. Don't log it by default. @@ -103,17 +104,17 @@ func (p *Publisher) handleReturns(mqChan *amqp.Channel) { if p.config.NotifyReturn == nil { continue } - // Why is reusing a timer so bloody complicated... It's almost worth the timer leak just to reduce complexity - if !dropTimer.Stop() { - <-dropTimer.C - } + dropTimer.Reset(dropReturnsAfter) // Try not to repeat streadway/amqp's mistake of deadlocking if a client isn't listening to their Notify* channel. // (https://github.com/rabbitmq/amqp091-go/issues/18) // If they aren't listening to p.config.NotifyReturn, just drop the amqp.Return instead of deadlocking and leaking goroutines select { case p.config.NotifyReturn <- r: - dropTimer.Stop() + // Why is reusing a timer so bloody complicated... It's almost worth the timer leak just to reduce complexity + if !dropTimer.Stop() { + <-dropTimer.C + } case <-dropTimer.C: } } diff --git a/topology.go b/topology.go index 07ac1c0..d4ddb5c 100644 --- a/topology.go +++ b/topology.go @@ -14,7 +14,7 @@ import ( // Exchange contains args for amqp.Channel.ExchangeDeclare type Exchange struct { - Name string + Name string // Name is required by ExchangeDeclare. Kind string // Kind is required by ExchangeDeclare. amqp091-go exports valid values like amqp.ExchangeDirect, etc Durable bool AutoDelete bool