Skip to content

Commit

Permalink
added make bench, more efficiently log leaked goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
Danlock committed Sep 21, 2023
1 parent 7ef78d4 commit bf71af9
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 14 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ unit-test:
test:
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -v -race -count=2 -tags="rabbit" ./...

bench:
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -benchmem -run=^$ -v -count=2 -tags="rabbit" -bench . ./...

coverage:
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -covermode=count -tags="rabbit" -coverprofile=$(COVERAGE_PATH)

Expand Down
16 changes: 10 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel,
// Call them in a goroutine so we can timeout if necessary

respChan := make(chan internal.ChanResp[<-chan amqp.Delivery], 1)
shouldLog := make(chan struct{})
start := time.Now()
go func() {
var r internal.ChanResp[<-chan amqp.Delivery]
Expand All @@ -97,16 +98,19 @@ func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel,
r.Err = errors.Join(r.Err, mqChanErr)
}
}
respChan <- r

select {
case <-shouldLog:
c.config.Log(ctx, slog.LevelWarn, logPrefix+" completed after it's context finished. It took %s. Err: %+v", time.Since(start), r.Err)
default:
respChan <- r
}
}()

select {
case <-ctx.Done():
go func() {
// Log our leaked goroutine's response whenever it finally finishes in case it has useful information.
r := <-respChan
c.config.Log(ctx, slog.LevelWarn, logPrefix+" completed after it's context finished. It took %s. Err: %+v", time.Since(start), r.Err)
}()
// Log our leaked goroutine's response whenever it finally finishes in case it has useful information.
close(shouldLog)
return nil, nil, fmt.Errorf(logPrefix+" unable to complete before context did due to %w", context.Cause(ctx))
case r := <-respChan:
return mqChan, r.Val, r.Err
Expand Down
4 changes: 0 additions & 4 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ func (p *Publishing) publish(mqChan *amqp.Channel) {
p.req.RespChan <- resp
}

func (p *Publishing) empty() bool {
return p.Exchange == "" && p.RoutingKey == "" && len(p.Body) == 0
}

// Publish send a Publishing on rmq.Publisher's current amqp.Channel.
// Returns amqp.DefferedConfirmation's only if the rmq.Publisher has Confirm set.
// If an error is returned, rmq.Publisher will grab another amqp.Channel from rmq.Connection, which itself will redial AMQP if necessary.
Expand Down
14 changes: 10 additions & 4 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topo
// Call them in a goroutine so we can bail if necessary
start := time.Now()
errChan := make(chan error, 1)
shouldLog := make(chan struct{})

go func() {
mqChan, err := amqpConn.Channel()
if err != nil {
Expand All @@ -74,17 +76,21 @@ func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topo
if mqChanErr != nil && !errors.Is(mqChanErr, amqp.ErrClosed) {
err = errors.Join(err, mqChanErr)
}
errChan <- err

select {
case <-shouldLog:
topology.Log(ctx, slog.LevelWarn, logPrefix+" completed after it's context finished. It took %s. Err: %+v", time.Since(start), err)
default:
errChan <- err
}
}()

select {
case <-ctx.Done():
// Log our leaked goroutine's response whenever it finally finishes since it may have useful debugging information.
if topology.Log != nil {
internal.WrapLogFunc(&topology.Log)
go func() {
topology.Log(ctx, slog.LevelWarn, logPrefix+" completed after it's context finished. It took %s. Err: %+v", time.Since(start), <-errChan)
}()
close(shouldLog)
}
return fmt.Errorf(logPrefix+" unable to complete before context due to %w", context.Cause(ctx))
case err := <-errChan:
Expand Down

0 comments on commit bf71af9

Please sign in to comment.