Skip to content

Commit

Permalink
cleaned up amqp.Channel's in cases where a success happened after a t…
Browse files Browse the repository at this point in the history
…imeout
  • Loading branch information
Danlock committed Oct 14, 2023
1 parent 4b49628 commit 6e9b099
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ update-proxy-cache:
@GOPROXY=https://proxy.golang.org go get github.com/danlock/rmq

release:
@$(MAKE) deps
ifeq ($(findstring dirty,$(SHORTBUILDTAG)),dirty)
@echo "Version $(SHORTBUILDTAG) is filthy, commit to clean it" && exit 1
endif
@read -t 5 -p "$(SHORTBUILDTAG) will be the new released version. Hit enter to proceed, CTRL-C to cancel."
@$(MAKE) deps
@$(MAKE) test
@$(MAKE) bench
@git tag $(SHORTBUILDTAG)
Expand Down
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,14 @@ cfg := rmq.Args{Log: slog.Log}
rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: cfg}, os.Getenv("AMQP_URL"), amqp.Config{})
consCfg := rmq.ConsumerArgs{
Args: cfg,
Queue: rmq.Queue{Name: "q2d2", AutoDelete: true},
Qos: rmq.Qos{PrefetchCount: 1000},
Args: cfg,
Queue: rmq.Queue{Name: "q2d2", AutoDelete: true},
Qos: rmq.Qos{PrefetchCount: 1000},
}
rmq.NewConsumer(rmqConn, consCfg).ConsumeConcurrently(ctx, 100, func(ctx context.Context, msg amqp.Delivery) {
process(msg)
if err := msg.Ack(false); err != nil {
handleErr(err)
}
handleAckErr(msg.Ack(false))
})
```

Expand Down
16 changes: 10 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,19 @@ func (c *Connection) safeChannel(ctx context.Context, amqpConn AMQPConnection) (
go func() {
var resp internal.ChanResp[*amqp.Channel]
resp.Val, resp.Err = amqpConn.Channel()
respChan <- resp
// If our contexts timed out, close successfully created channels within this goroutine.
if resp.Err == nil && resp.Val != nil && (c.ctx.Err() != nil || ctx.Err() != nil) {
if err := resp.Val.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) {
c.config.Log(ctx, slog.LevelError, logPrefix+" failed to close channel due to err: %+v", err)
}
} else {
respChan <- resp
}
}()

select {
case <-c.ctx.Done(): // Close the current amqp.Connection when rmq.Connection is shutting down
if err := amqpConn.CloseDeadline(time.Now().Add(c.config.AMQPTimeout)); err != nil && !errors.Is(err, amqp.ErrClosed) {
c.config.Log(c.ctx, slog.LevelError, logPrefix+" failed to close connection due to err: %+v", err)
}
return nil, fmt.Errorf(logPrefix+" unable to complete before %w", context.Cause(c.ctx))
case <-c.ctx.Done():
return nil, fmt.Errorf(logPrefix+" unable to complete before Connection %w", context.Cause(c.ctx))
case <-ctx.Done():
return nil, fmt.Errorf(logPrefix+" unable to complete before %w", context.Cause(ctx))
case resp := <-respChan:
Expand Down
14 changes: 6 additions & 8 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,28 @@ func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel,
// Call them in a goroutine so we can timeout if necessary

respChan := make(chan internal.ChanResp[<-chan amqp.Delivery], 1)
shouldLog := make(chan struct{})
start := time.Now()
go func() {
var r internal.ChanResp[<-chan amqp.Delivery]
r.Val, r.Err = c.declareAndConsume(ctx, mqChan)
if r.Err != nil {
ctxDone := ctx.Err() != nil
// Close the channel on errors or if the context times out, so the amqp channel isn't leaked
if r.Err != nil || ctxDone {
mqChanErr := mqChan.Close()
if mqChanErr != nil && !errors.Is(mqChanErr, amqp.ErrClosed) {
r.Err = errors.Join(r.Err, mqChanErr)
}
}

select {
case <-shouldLog:
if ctxDone {
// Log our leaked goroutine's response whenever it finally finishes in case it has useful information.
c.config.Log(ctx, slog.LevelWarn, logPrefix+" completed after it's context finished. It took %s. Err: %+v", time.Since(start), r.Err)
default:
} else {
respChan <- r
}
}()

select {
case <-ctx.Done():
// Log our leaked goroutine's response whenever it finally finishes in case it has useful information.
close(shouldLog)
return nil, nil, fmt.Errorf(logPrefix+" unable to complete before context did due to %w", context.Cause(ctx))
case r := <-respChan:
return mqChan, r.Val, r.Err
Expand Down
1 change: 1 addition & 0 deletions healthcheck_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestHealthcheck(t *testing.T) {
// Example shows how to write an unsophisticated healthcheck for a service intending to ensure it's rmq.Connection is capable of processing messages.
// Even though rmq.Connection reconnects on errors, there can always be unforeseen networking/DNS/RNGesus issues
// that necessitate a docker/kubernetes healthcheck restarting the service when unhealthy.
// While this is useful as an example, it wouldn't be used on production for several reasons, only one of which is the lack of reuse of AMQP connections and AMQP channels.
func Example() {
// Real applications should use a real context. If this healthcheck was called via HTTP request for example,
// that HTTP request's context would be a good candidate.
Expand Down
11 changes: 6 additions & 5 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (p *Publisher) handleReturns(mqChan *amqp.Channel) {
notifyReturns := mqChan.NotifyReturn(make(chan amqp.Return))
go func() {
dropTimer := time.NewTimer(0)
<-dropTimer.C
for r := range notifyReturns {
if p.config.LogReturns {
// A Body can be arbitrarily large and/or contain sensitve info. Don't log it by default.
Expand All @@ -103,17 +104,17 @@ func (p *Publisher) handleReturns(mqChan *amqp.Channel) {
if p.config.NotifyReturn == nil {
continue
}
// Why is reusing a timer so bloody complicated... It's almost worth the timer leak just to reduce complexity
if !dropTimer.Stop() {
<-dropTimer.C
}

dropTimer.Reset(dropReturnsAfter)
// Try not to repeat streadway/amqp's mistake of deadlocking if a client isn't listening to their Notify* channel.
// (https://github.com/rabbitmq/amqp091-go/issues/18)
// If they aren't listening to p.config.NotifyReturn, just drop the amqp.Return instead of deadlocking and leaking goroutines
select {
case p.config.NotifyReturn <- r:
dropTimer.Stop()
// Why is reusing a timer so bloody complicated... It's almost worth the timer leak just to reduce complexity
if !dropTimer.Stop() {
<-dropTimer.C
}
case <-dropTimer.C:
}
}
Expand Down
2 changes: 1 addition & 1 deletion topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// Exchange contains args for amqp.Channel.ExchangeDeclare
type Exchange struct {
Name string
Name string // Name is required by ExchangeDeclare.
Kind string // Kind is required by ExchangeDeclare. amqp091-go exports valid values like amqp.ExchangeDirect, etc
Durable bool
AutoDelete bool
Expand Down

0 comments on commit 6e9b099

Please sign in to comment.