Skip to content

Commit

Permalink
amqp091 update brought ConsumeWithContext. It doesn't actually solve …
Browse files Browse the repository at this point in the history
…my main issues with the function, but it did expose some bugs in danlock/rmq's retry logic.

now use standardized retry function. All classes now count short successes as failures, to prevent effective failures with no backoff.
  • Loading branch information
Danlock committed Oct 14, 2023
1 parent 99f87ee commit 3bcfb30
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 77 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ version:
@echo $(SHORTBUILDTAG)

unit-test:
@go test -race -count=3 ./...
@go test -failfast -race -count=3 ./...

test:
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -v -race -count=2 -tags="rabbit" ./...
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -failfast -v -race -count=2 -tags="rabbit" ./...

bench:
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -benchmem -run=^$ -v -count=2 -tags="rabbit" -bench . ./...
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -failfast -benchmem -run=^$ -v -count=2 -tags="rabbit" -bench . ./...

coverage:
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -covermode=count -tags="rabbit" -coverprofile=$(COVERAGE_PATH)
@TEST_AMQP_URI=$(TEST_AMQP_URI) go test -failfast -covermode=count -tags="rabbit" -coverprofile=$(COVERAGE_PATH)

coverage-html:
@rm $(COVERAGE_PATH) || true
Expand Down
35 changes: 10 additions & 25 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,38 +174,23 @@ func (c *Connection) CurrentConnection(ctx context.Context) (AMQPConnection, err
}

func (c *Connection) redial(dialFn func() (AMQPConnection, error)) {
logPrefix := "rmq.Connection.redial"
var dialDelay time.Duration
attempt := 0
for {
select {
case <-c.ctx.Done():
return
case <-time.After(dialDelay):
}

internal.Retry(c.ctx, c.config.Delay, func(delay time.Duration) (time.Duration, bool) {
logPrefix := "rmq.Connection.redial"
amqpConn, err := dialFn()
if err != nil {
dialDelay = c.config.Delay(attempt)
attempt++
c.config.Log(c.ctx, slog.LevelError, logPrefix+" failed, retrying after %s. err: %+v", dialDelay.String(), err)
continue
c.config.Log(c.ctx, slog.LevelError, logPrefix+" failed, retrying after %s due to %+v", delay.String(), err)
return 0, false
}
logPrefix = fmt.Sprintf("rmq.Connection.redial's AMQPConnection (%s -> %s)", amqpConn.LocalAddr(), amqpConn.RemoteAddr())

// Redeclare Topology if we have one. This has the bonus aspect of making sure the connection is actually usable, better than a Ping.
if err := DeclareTopology(c.ctx, amqpConn, c.config.Topology); err != nil {
dialDelay = c.config.Delay(attempt)
attempt++
c.config.Log(c.ctx, slog.LevelError, logPrefix+" DeclareTopology failed, retrying after %s. err: %+v", dialDelay.String(), err)
continue
c.config.Log(c.ctx, slog.LevelError, logPrefix+" DeclareTopology failed, retrying after %s due to %+v", delay.String(), err)
return 0, false
}

// After a successful dial and topology declare, reset our attempts and delay
dialDelay, attempt = 0, 0

start := time.Now()
c.listen(amqpConn)
}
return time.Since(start), true
})
}

// listen listens and responds to Channel and Connection requests. It returns on any failure to prompt another redial.
Expand All @@ -231,7 +216,7 @@ func (c *Connection) listen(amqpConn AMQPConnection) {
resp.Val, resp.Err = c.safeChannel(chanReq.Ctx, amqpConn)
chanReq.RespChan <- resp
if resp.Err != nil {
// redial on failed Channel requests
c.config.Log(c.ctx, slog.LevelDebug, logPrefix+" redialing due to %+v", resp.Err)
return
}
}
Expand Down
41 changes: 17 additions & 24 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,22 @@ func NewConsumer(rmqConn *Connection, config ConsumerArgs) *Consumer {
// Closes the amqp.Channel on errors.
func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel, _ <-chan amqp.Delivery, err error) {
logPrefix := fmt.Sprintf("rmq.Consumer.safeDeclareAndConsume for queue %s", c.config.Queue.Name)
ctx, cancel := context.WithTimeout(ctx, c.config.AMQPTimeout)
timeoutCtx, cancel := context.WithTimeout(ctx, c.config.AMQPTimeout)
defer cancel()

mqChan, err := c.conn.Channel(ctx)
mqChan, err := c.conn.Channel(timeoutCtx)
if err != nil {
return nil, nil, fmt.Errorf(logPrefix+" failed to get a channel due to err %w", err)
}
// Network calls that don't take a context can block indefintely.
// Network calls that don't take a context can block indefinitely.
// Call them in a goroutine so we can timeout if necessary

respChan := make(chan internal.ChanResp[<-chan amqp.Delivery], 1)
start := time.Now()
go func() {
var r internal.ChanResp[<-chan amqp.Delivery]
r.Val, r.Err = c.declareAndConsume(ctx, mqChan)
ctxDone := ctx.Err() != nil
ctxDone := timeoutCtx.Err() != nil
// Close the channel on errors or if the context times out, so the amqp channel isn't leaked
if r.Err != nil || ctxDone {
mqChanErr := mqChan.Close()
Expand All @@ -108,7 +108,7 @@ func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel,
}()

select {
case <-ctx.Done():
case <-timeoutCtx.Done():
return nil, nil, fmt.Errorf(logPrefix+" unable to complete before context did due to %w", context.Cause(ctx))
case r := <-respChan:
return mqChan, r.Val, r.Err
Expand Down Expand Up @@ -154,7 +154,9 @@ func (c *Consumer) declareAndConsume(ctx context.Context, mqChan *amqp.Channel)
}
}

// https://github.com/rabbitmq/amqp091-go/pull/192 Channel.ConsumeWithContext doesn't hold up under scrutiny. The actual network call doesn't respect the passed in context.
// https://github.com/rabbitmq/amqp091-go/pull/192 Channel.ConsumeWithContext doesn't hold up under scrutiny. The actual network call (ch.call()) doesn't respect the passed in context.
// As of amqp091-go 1.9.0 it doesn't look like we can use ConsumeWithContext to timeout network calls, so we're stuck with this wrapper.
// Now ConsumeWithContext cancels itself when the context is finished, which seems unneccessary since callers can call Cancel, or in danlock/rmq, Close(), themselves.
deliveries, err := mqChan.ConsumeWithContext(
ctx,
c.config.Queue.Name,
Expand All @@ -179,29 +181,19 @@ func (c *Consumer) declareAndConsume(ctx context.Context, mqChan *amqp.Channel)
func (c *Consumer) Consume(ctx context.Context) <-chan amqp.Delivery {
outChan := make(chan amqp.Delivery)
go func() {
logPrefix := fmt.Sprintf("rmq.Consumer.Consume for queue (%s)", c.config.Queue.Name)
var delay time.Duration
attempt := 0
for {
select {
case <-ctx.Done():
close(outChan)
return
case <-time.After(delay):
}
internal.Retry(ctx, c.config.Delay, func(delay time.Duration) (time.Duration, bool) {
logPrefix := fmt.Sprintf("rmq.Consumer.Consume for queue (%s)", c.config.Queue.Name)
mqChan, inChan, err := c.safeDeclareAndConsume(ctx)
if err != nil {
delay = c.config.Delay(attempt)
attempt++
c.config.Log(ctx, slog.LevelError, logPrefix+" failed to safeDeclareAndConsume. Retrying in %s due to %v", delay.String(), err)
continue
c.config.Log(ctx, slog.LevelError, logPrefix+" failed to safeDeclareAndConsume, retrying in %s due to %v", delay.String(), err)
return 0, false
}

// Successfully redeclared our topology, so reset the backoff
delay, attempt = 0, 0

start := time.Now()
c.forwardDeliveries(ctx, mqChan, inChan, outChan)
}
return time.Since(start), true
})
close(outChan)
}()
return outChan
}
Expand All @@ -224,6 +216,7 @@ func (c *Consumer) forwardDeliveries(ctx context.Context, mqChan *amqp.Channel,
}
case msg, ok := <-inChan:
if !ok {
c.config.Log(ctx, slog.LevelDebug, logPrefix+" amqp.Channel.ConsumeWithContext channel closed")
return
}
// If the client never listens to outChan, this blocks forever
Expand Down
9 changes: 5 additions & 4 deletions consumer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func ForceRedial(ctx context.Context, rmqConn *rmq.Connection) error {
func TestConsumer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
connectCfg := rmq.ConnectArgs{Args: rmq.Args{Log: slog.Log}}
slogLog := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})).Log
connectCfg := rmq.ConnectArgs{Args: rmq.Args{Log: slogLog}}
rmqConn := rmq.ConnectWithURLs(ctx, connectCfg, "amqp://dont.exist", os.Getenv("TEST_AMQP_URI"))

baseConsConfig := rmq.ConsumerArgs{
Expand Down Expand Up @@ -72,7 +73,7 @@ func TestConsumer(t *testing.T) {
wantedPub := rmq.Publishing{RoutingKey: baseConsConfig.Queue.Name}
wantedPub.Body = []byte("TestRMQPublisher")

pubCount := 10
pubCount := 3
errChan := make(chan error, pubCount)
for i := 0; i < pubCount; i++ {
go func() {
Expand Down Expand Up @@ -107,7 +108,6 @@ func TestConsumer(t *testing.T) {
func TestConsumer_Load(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute/2)
defer cancel()

logf := slog.Log

baseName := fmt.Sprint("TestRMQConsumer_Load_Base_", rand.Uint64())
Expand Down Expand Up @@ -228,8 +228,9 @@ func TestConsumer_Load(t *testing.T) {

// RabbitMQ behaviour around auto generated names and restricting declaring queues with amq prefix
func TestRMQConsumer_AutogeneratedQueueNames(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))
baseCfg := rmq.Args{Log: slog.Log}
rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: baseCfg}, os.Getenv("TEST_AMQP_URI"), amqp.Config{})

Expand Down
23 changes: 23 additions & 0 deletions internal/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,26 @@ type AMQP091Logger struct {
func (l AMQP091Logger) Printf(format string, v ...interface{}) {
l.Log(l.Ctx, slog.LevelError, "rabbitmq/amqp091-go: "+fmt.Sprintf(format, v...))
}

const healthyLifetime = 20 * time.Millisecond

// Retry attempts do repeatedly until it's ctx ends. If do returns false delayForAttempt is used to backoff retries.
// If do is true and ran longer than healthyLifetime the backoff is reset. do returns it's own lifetime, since it may do some setup beforehand.
func Retry(ctx context.Context, delayForAttempt func(int) time.Duration, do func(time.Duration) (time.Duration, bool)) {
var delay time.Duration
var attempt = 0
for {
select {
case <-ctx.Done():
return
case <-time.After(delay):
}

delay = delayForAttempt(attempt)
attempt++
lifetime, ok := do(delay)
if ok && lifetime >= healthyLifetime {
delay, attempt = 0, 0
}
}
}
29 changes: 9 additions & 20 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,36 +49,25 @@ func NewPublisher(ctx context.Context, rmqConn *Connection, config PublisherArgs

// connect grabs an amqp.Channel from rmq.Connection. It does so repeatedly on any error until it's context finishes.
func (p *Publisher) connect(rmqConn *Connection) {
logPrefix := "rmq.Publisher.connect"
var delay time.Duration
attempt := 0
for {
select {
case <-p.ctx.Done():
return
case <-time.After(delay):
}
internal.Retry(p.ctx, p.config.Delay, func(delay time.Duration) (time.Duration, bool) {
logPrefix := "rmq.Publisher.connect"
mqChan, err := rmqConn.Channel(p.ctx)
if err != nil {
delay = p.config.Delay(attempt)
attempt++
p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to get amqp.Channel. Retrying in %s due to err %+v", delay.String(), err)
continue
p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to get amqp.Channel, retrying in %s due to %+v", delay.String(), err)
return 0, false
}
if !p.config.DontConfirm {
if err := mqChan.Confirm(false); err != nil {
delay = p.config.Delay(attempt)
attempt++
p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to put amqp.Channel in confirm mode. Retrying in %s due to err %+v", delay.String(), err)
continue
p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to put amqp.Channel in confirm mode, retrying in %s due to %+v", delay.String(), err)
return 0, false
}
}

// Successfully got a channel for publishing, reset delay
delay, attempt = 0, 0
start := time.Now()
p.handleReturns(mqChan)
p.listen(mqChan)
}
return time.Since(start), true
})
}

const dropReturnsAfter = 10 * time.Millisecond
Expand Down

0 comments on commit 3bcfb30

Please sign in to comment.