diff --git a/Makefile b/Makefile index b317856..2c201e7 100644 --- a/Makefile +++ b/Makefile @@ -20,16 +20,16 @@ version: @echo $(SHORTBUILDTAG) unit-test: - @go test -race -count=3 ./... + @go test -failfast -race -count=3 ./... test: - @TEST_AMQP_URI=$(TEST_AMQP_URI) go test -v -race -count=2 -tags="rabbit" ./... + @TEST_AMQP_URI=$(TEST_AMQP_URI) go test -failfast -v -race -count=2 -tags="rabbit" ./... bench: - @TEST_AMQP_URI=$(TEST_AMQP_URI) go test -benchmem -run=^$ -v -count=2 -tags="rabbit" -bench . ./... + @TEST_AMQP_URI=$(TEST_AMQP_URI) go test -failfast -benchmem -run=^$ -v -count=2 -tags="rabbit" -bench . ./... coverage: - @TEST_AMQP_URI=$(TEST_AMQP_URI) go test -covermode=count -tags="rabbit" -coverprofile=$(COVERAGE_PATH) + @TEST_AMQP_URI=$(TEST_AMQP_URI) go test -failfast -covermode=count -tags="rabbit" -coverprofile=$(COVERAGE_PATH) coverage-html: @rm $(COVERAGE_PATH) || true diff --git a/connection.go b/connection.go index 5eb2077..4418045 100644 --- a/connection.go +++ b/connection.go @@ -174,38 +174,23 @@ func (c *Connection) CurrentConnection(ctx context.Context) (AMQPConnection, err } func (c *Connection) redial(dialFn func() (AMQPConnection, error)) { - logPrefix := "rmq.Connection.redial" - var dialDelay time.Duration - attempt := 0 - for { - select { - case <-c.ctx.Done(): - return - case <-time.After(dialDelay): - } - + internal.Retry(c.ctx, c.config.Delay, func(delay time.Duration) (time.Duration, bool) { + logPrefix := "rmq.Connection.redial" amqpConn, err := dialFn() if err != nil { - dialDelay = c.config.Delay(attempt) - attempt++ - c.config.Log(c.ctx, slog.LevelError, logPrefix+" failed, retrying after %s. err: %+v", dialDelay.String(), err) - continue + c.config.Log(c.ctx, slog.LevelError, logPrefix+" failed, retrying after %s due to %+v", delay.String(), err) + return 0, false } logPrefix = fmt.Sprintf("rmq.Connection.redial's AMQPConnection (%s -> %s)", amqpConn.LocalAddr(), amqpConn.RemoteAddr()) - // Redeclare Topology if we have one. This has the bonus aspect of making sure the connection is actually usable, better than a Ping. if err := DeclareTopology(c.ctx, amqpConn, c.config.Topology); err != nil { - dialDelay = c.config.Delay(attempt) - attempt++ - c.config.Log(c.ctx, slog.LevelError, logPrefix+" DeclareTopology failed, retrying after %s. err: %+v", dialDelay.String(), err) - continue + c.config.Log(c.ctx, slog.LevelError, logPrefix+" DeclareTopology failed, retrying after %s due to %+v", delay.String(), err) + return 0, false } - - // After a successful dial and topology declare, reset our attempts and delay - dialDelay, attempt = 0, 0 - + start := time.Now() c.listen(amqpConn) - } + return time.Since(start), true + }) } // listen listens and responds to Channel and Connection requests. It returns on any failure to prompt another redial. @@ -231,7 +216,7 @@ func (c *Connection) listen(amqpConn AMQPConnection) { resp.Val, resp.Err = c.safeChannel(chanReq.Ctx, amqpConn) chanReq.RespChan <- resp if resp.Err != nil { - // redial on failed Channel requests + c.config.Log(c.ctx, slog.LevelDebug, logPrefix+" redialing due to %+v", resp.Err) return } } diff --git a/consumer.go b/consumer.go index 0e63908..0cbbd9f 100644 --- a/consumer.go +++ b/consumer.go @@ -76,14 +76,14 @@ func NewConsumer(rmqConn *Connection, config ConsumerArgs) *Consumer { // Closes the amqp.Channel on errors. func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel, _ <-chan amqp.Delivery, err error) { logPrefix := fmt.Sprintf("rmq.Consumer.safeDeclareAndConsume for queue %s", c.config.Queue.Name) - ctx, cancel := context.WithTimeout(ctx, c.config.AMQPTimeout) + timeoutCtx, cancel := context.WithTimeout(ctx, c.config.AMQPTimeout) defer cancel() - mqChan, err := c.conn.Channel(ctx) + mqChan, err := c.conn.Channel(timeoutCtx) if err != nil { return nil, nil, fmt.Errorf(logPrefix+" failed to get a channel due to err %w", err) } - // Network calls that don't take a context can block indefintely. + // Network calls that don't take a context can block indefinitely. // Call them in a goroutine so we can timeout if necessary respChan := make(chan internal.ChanResp[<-chan amqp.Delivery], 1) @@ -91,7 +91,7 @@ func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel, go func() { var r internal.ChanResp[<-chan amqp.Delivery] r.Val, r.Err = c.declareAndConsume(ctx, mqChan) - ctxDone := ctx.Err() != nil + ctxDone := timeoutCtx.Err() != nil // Close the channel on errors or if the context times out, so the amqp channel isn't leaked if r.Err != nil || ctxDone { mqChanErr := mqChan.Close() @@ -108,7 +108,7 @@ func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel, }() select { - case <-ctx.Done(): + case <-timeoutCtx.Done(): return nil, nil, fmt.Errorf(logPrefix+" unable to complete before context did due to %w", context.Cause(ctx)) case r := <-respChan: return mqChan, r.Val, r.Err @@ -154,7 +154,9 @@ func (c *Consumer) declareAndConsume(ctx context.Context, mqChan *amqp.Channel) } } - // https://github.com/rabbitmq/amqp091-go/pull/192 Channel.ConsumeWithContext doesn't hold up under scrutiny. The actual network call doesn't respect the passed in context. + // https://github.com/rabbitmq/amqp091-go/pull/192 Channel.ConsumeWithContext doesn't hold up under scrutiny. The actual network call (ch.call()) doesn't respect the passed in context. + // As of amqp091-go 1.9.0 it doesn't look like we can use ConsumeWithContext to timeout network calls, so we're stuck with this wrapper. + // Now ConsumeWithContext cancels itself when the context is finished, which seems unneccessary since callers can call Cancel, or in danlock/rmq, Close(), themselves. deliveries, err := mqChan.ConsumeWithContext( ctx, c.config.Queue.Name, @@ -179,29 +181,19 @@ func (c *Consumer) declareAndConsume(ctx context.Context, mqChan *amqp.Channel) func (c *Consumer) Consume(ctx context.Context) <-chan amqp.Delivery { outChan := make(chan amqp.Delivery) go func() { - logPrefix := fmt.Sprintf("rmq.Consumer.Consume for queue (%s)", c.config.Queue.Name) - var delay time.Duration - attempt := 0 - for { - select { - case <-ctx.Done(): - close(outChan) - return - case <-time.After(delay): - } + internal.Retry(ctx, c.config.Delay, func(delay time.Duration) (time.Duration, bool) { + logPrefix := fmt.Sprintf("rmq.Consumer.Consume for queue (%s)", c.config.Queue.Name) mqChan, inChan, err := c.safeDeclareAndConsume(ctx) if err != nil { - delay = c.config.Delay(attempt) - attempt++ - c.config.Log(ctx, slog.LevelError, logPrefix+" failed to safeDeclareAndConsume. Retrying in %s due to %v", delay.String(), err) - continue + c.config.Log(ctx, slog.LevelError, logPrefix+" failed to safeDeclareAndConsume, retrying in %s due to %v", delay.String(), err) + return 0, false } - // Successfully redeclared our topology, so reset the backoff - delay, attempt = 0, 0 - + start := time.Now() c.forwardDeliveries(ctx, mqChan, inChan, outChan) - } + return time.Since(start), true + }) + close(outChan) }() return outChan } @@ -224,6 +216,7 @@ func (c *Consumer) forwardDeliveries(ctx context.Context, mqChan *amqp.Channel, } case msg, ok := <-inChan: if !ok { + c.config.Log(ctx, slog.LevelDebug, logPrefix+" amqp.Channel.ConsumeWithContext channel closed") return } // If the client never listens to outChan, this blocks forever diff --git a/consumer_int_test.go b/consumer_int_test.go index ee4742b..5167a5b 100644 --- a/consumer_int_test.go +++ b/consumer_int_test.go @@ -31,7 +31,8 @@ func ForceRedial(ctx context.Context, rmqConn *rmq.Connection) error { func TestConsumer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - connectCfg := rmq.ConnectArgs{Args: rmq.Args{Log: slog.Log}} + slogLog := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})).Log + connectCfg := rmq.ConnectArgs{Args: rmq.Args{Log: slogLog}} rmqConn := rmq.ConnectWithURLs(ctx, connectCfg, "amqp://dont.exist", os.Getenv("TEST_AMQP_URI")) baseConsConfig := rmq.ConsumerArgs{ @@ -72,7 +73,7 @@ func TestConsumer(t *testing.T) { wantedPub := rmq.Publishing{RoutingKey: baseConsConfig.Queue.Name} wantedPub.Body = []byte("TestRMQPublisher") - pubCount := 10 + pubCount := 3 errChan := make(chan error, pubCount) for i := 0; i < pubCount; i++ { go func() { @@ -107,7 +108,6 @@ func TestConsumer(t *testing.T) { func TestConsumer_Load(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute/2) defer cancel() - logf := slog.Log baseName := fmt.Sprint("TestRMQConsumer_Load_Base_", rand.Uint64()) @@ -228,8 +228,9 @@ func TestConsumer_Load(t *testing.T) { // RabbitMQ behaviour around auto generated names and restricting declaring queues with amq prefix func TestRMQConsumer_AutogeneratedQueueNames(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))) baseCfg := rmq.Args{Log: slog.Log} rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: baseCfg}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) diff --git a/internal/util.go b/internal/util.go index 39ad6b6..7f05de1 100644 --- a/internal/util.go +++ b/internal/util.go @@ -54,3 +54,26 @@ type AMQP091Logger struct { func (l AMQP091Logger) Printf(format string, v ...interface{}) { l.Log(l.Ctx, slog.LevelError, "rabbitmq/amqp091-go: "+fmt.Sprintf(format, v...)) } + +const healthyLifetime = 20 * time.Millisecond + +// Retry attempts do repeatedly until it's ctx ends. If do returns false delayForAttempt is used to backoff retries. +// If do is true and ran longer than healthyLifetime the backoff is reset. do returns it's own lifetime, since it may do some setup beforehand. +func Retry(ctx context.Context, delayForAttempt func(int) time.Duration, do func(time.Duration) (time.Duration, bool)) { + var delay time.Duration + var attempt = 0 + for { + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } + + delay = delayForAttempt(attempt) + attempt++ + lifetime, ok := do(delay) + if ok && lifetime >= healthyLifetime { + delay, attempt = 0, 0 + } + } +} diff --git a/publisher.go b/publisher.go index c2cdb07..f647809 100644 --- a/publisher.go +++ b/publisher.go @@ -49,36 +49,25 @@ func NewPublisher(ctx context.Context, rmqConn *Connection, config PublisherArgs // connect grabs an amqp.Channel from rmq.Connection. It does so repeatedly on any error until it's context finishes. func (p *Publisher) connect(rmqConn *Connection) { - logPrefix := "rmq.Publisher.connect" - var delay time.Duration - attempt := 0 - for { - select { - case <-p.ctx.Done(): - return - case <-time.After(delay): - } + internal.Retry(p.ctx, p.config.Delay, func(delay time.Duration) (time.Duration, bool) { + logPrefix := "rmq.Publisher.connect" mqChan, err := rmqConn.Channel(p.ctx) if err != nil { - delay = p.config.Delay(attempt) - attempt++ - p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to get amqp.Channel. Retrying in %s due to err %+v", delay.String(), err) - continue + p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to get amqp.Channel, retrying in %s due to %+v", delay.String(), err) + return 0, false } if !p.config.DontConfirm { if err := mqChan.Confirm(false); err != nil { - delay = p.config.Delay(attempt) - attempt++ - p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to put amqp.Channel in confirm mode. Retrying in %s due to err %+v", delay.String(), err) - continue + p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to put amqp.Channel in confirm mode, retrying in %s due to %+v", delay.String(), err) + return 0, false } } - // Successfully got a channel for publishing, reset delay - delay, attempt = 0, 0 + start := time.Now() p.handleReturns(mqChan) p.listen(mqChan) - } + return time.Since(start), true + }) } const dropReturnsAfter = 10 * time.Millisecond