diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 15274ff..de5c3ca 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,12 +42,12 @@ jobs: coverage-threshold: 80 test-args: '["-tags=rabbit"]' - - name: update coverage badge - env: - COVERAGE_PATH: ${{ steps.coverage.outputs.gocov-agg-pathname }} - run: make update-readme-badge - - - uses: stefanzweifel/git-auto-commit-action@v4 - with: - file_pattern: 'README.md' - commit_message: "Update coverage badge" + # - name: update coverage badge + # env: + # COVERAGE_PATH: ${{ steps.coverage.outputs.gocov-agg-pathname }} + # run: make update-readme-badge + + # - uses: stefanzweifel/git-auto-commit-action@v4 + # with: + # file_pattern: 'README.md' + # commit_message: "Update coverage badge" diff --git a/connection.go b/connection.go index d35c6c4..53e09c4 100644 --- a/connection.go +++ b/connection.go @@ -96,17 +96,17 @@ func request[T any](connCtx, ctx context.Context, reqChan chan internal.ChanReq[ respChan := make(chan internal.ChanResp[T], 1) select { case <-connCtx.Done(): - return t, fmt.Errorf("rmq.Connection context finished: %w", context.Cause(connCtx)) + return t, fmt.Errorf("rmq.Connection context timed out because %w", context.Cause(connCtx)) case <-ctx.Done(): - return t, context.Cause(ctx) + return t, fmt.Errorf("request context timed out because %w", context.Cause(ctx)) case reqChan <- internal.ChanReq[T]{Ctx: ctx, RespChan: respChan}: } select { case <-connCtx.Done(): - return t, fmt.Errorf("rmq.Connection context finished: %w", context.Cause(connCtx)) + return t, fmt.Errorf("rmq.Connection context timed out because %w", context.Cause(connCtx)) case <-ctx.Done(): - return t, context.Cause(ctx) + return t, fmt.Errorf("request context timed out because %w", context.Cause(ctx)) case resp := <-respChan: return resp.Val, resp.Err } @@ -165,12 +165,12 @@ func (c *Connection) CurrentConnection(ctx context.Context) (AMQPConnection, err defer cancel() conn, err := request(c.ctx, ctx, c.currentConReqChan) if err != nil { - return conn, err + return nil, err } if conn.IsClosed() { - return conn, amqp.ErrClosed + return nil, amqp.ErrClosed } - return conn, err + return conn, nil } func (c *Connection) redial(dialFn func() (AMQPConnection, error)) { diff --git a/consumer.go b/consumer.go index 21b70e6..5ae11c1 100644 --- a/consumer.go +++ b/consumer.go @@ -225,7 +225,7 @@ func (c *Consumer) forwardDeliveries(ctx context.Context, mqChan *amqp.Channel, // If the client never listens to outChan, this blocks forever // Other options include using select with a default and dropping the message if the client doesn't listen, dropping the message after a timeout, // or buffering messages and sending them again later. Of course the buffer could grow forever in that case without listeners. - // The only thing blocked would be the rmq.Consumer.Consume goroutine listening for reconnects and logging errors, which seem unneccessary without a listener anyway. + // The only thing blocked would be the rmq.Consumer.Consume goroutine listening for reconnects and logging errors, which seem unnecessary without a listener anyway. // Alls well since we don't lock up the entire amqp.Connection like streadway/amqp with Notify* channels... outChan <- msg } diff --git a/consumer_int_test.go b/consumer_int_test.go index dad8ad3..37d1c77 100644 --- a/consumer_int_test.go +++ b/consumer_int_test.go @@ -20,6 +20,15 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) +func ForceRedial(ctx context.Context, rmqConn *rmq.Connection) error { + amqpConn, err := rmqConn.CurrentConnection(ctx) + if err != nil { + return fmt.Errorf("rmqConn.CurrentConnection failed because %w", err) + } + // close the current connection to force a redial + return amqpConn.CloseDeadline(time.Now().Add(time.Minute)) +} + func TestConsumer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -57,15 +66,7 @@ func TestConsumer(t *testing.T) { unreliableRMQPub.Publish(ctx, rmq.Publishing{Exchange: "amq.fanout"}) rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{CommonConfig: connectCfg.CommonConfig}) - forceRedial := func() { - amqpConn, err := rmqConn.CurrentConnection(ctx) - if err != nil { - t.Fatalf("failed to get rmqConn's current connection %v", err) - } - // close the current connection to force a redial - amqpConn.CloseDeadline(time.Now().Add(time.Minute)) - } - forceRedial() + ForceRedial(ctx, rmqConn) pubCtx, pubCancel := context.WithTimeout(ctx, 20*time.Second) defer pubCancel() @@ -79,14 +80,14 @@ func TestConsumer(t *testing.T) { errChan <- rmqPub.PublishUntilAcked(pubCtx, 0, wantedPub) }() } - forceRedial() + ForceRedial(ctx, rmqConn) for i := 0; i < pubCount; i++ { if err := <-errChan; err != nil { t.Fatalf("PublishUntilAcked returned unexpected error %v", err) } if i%2 == 0 { - forceRedial() + ForceRedial(ctx, rmqConn) } } @@ -131,8 +132,7 @@ func TestConsumer_Load(t *testing.T) { case <-ctx.Done(): return case <-time.After(time.Second): - amqpConn, _ := rmqConn.CurrentConnection(ctx) - amqpConn.CloseDeadline(time.Now().Add(time.Minute)) + ForceRedial(ctx, rmqConn) } } } diff --git a/healthcheck_int_test.go b/healthcheck_int_test.go index 71b25fc..4999e0e 100644 --- a/healthcheck_int_test.go +++ b/healthcheck_int_test.go @@ -96,7 +96,7 @@ func Example() { panic("where's my message?") case msg := <-deliveries: if !reflect.DeepEqual(msg.Body, msgOne.Body) && !reflect.DeepEqual(msg.Body, msgTwo.Body) { - panic("realistically this would probably be an error with another instance using this healthcheck simultaenously. Prevent this with an unique exchange or topic exchange with unique routing keys.") + panic("realistically this would probably be an error with another instance using this healthcheck simultaneously. Prevent this with an unique exchange or topic exchange with unique routing keys.") } } } diff --git a/publisher.go b/publisher.go index abd94ec..ea5759a 100644 --- a/publisher.go +++ b/publisher.go @@ -238,8 +238,7 @@ func (p *Publisher) PublishUntilConfirmed(ctx context.Context, confirmTimeout ti continue } } - // reset the delay on success - pubDelay, attempt = 0, 0 + attempt = 0 confirmTimeout := time.NewTimer(confirmTimeout) defer confirmTimeout.Stop() diff --git a/publisher_int_test.go b/publisher_int_test.go index d4b61df..8a6e7aa 100644 --- a/publisher_int_test.go +++ b/publisher_int_test.go @@ -33,15 +33,7 @@ func TestPublisher(t *testing.T) { NotifyReturn: returnChan, LogReturns: true, }) - forceRedial := func() { - amqpConn, err := rmqConn.CurrentConnection(ctx) - if err != nil { - t.Fatalf("failed to get rmqConn's current connection %v", err) - } - // close the current connection to force a redial - amqpConn.CloseDeadline(time.Now().Add(time.Minute)) - } - forceRedial() + ForceRedial(ctx, rmqConn) pubCtx, pubCancel := context.WithTimeout(ctx, 10*time.Second) defer pubCancel() @@ -59,7 +51,7 @@ func TestPublisher(t *testing.T) { if err != nil { t.Fatalf("PublishUntilAcked failed with %v", err) } - forceRedial() + ForceRedial(ctx, rmqConn) select { case <-pubCtx.Done(): t.Fatalf("didnt get return") diff --git a/topology.go b/topology.go index b1f365e..a67a68a 100644 --- a/topology.go +++ b/topology.go @@ -68,7 +68,7 @@ func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topo return } err = topology.declare(ctx, mqChan) - // An amqp.Channel must not be used from multiple goroutines simultaenously, so close it inside this goroutine to prevent cryptic RabbitMQ errors. + // An amqp.Channel must not be used from multiple goroutines simultaneously, so close it inside this goroutine to prevent cryptic RabbitMQ errors. mqChanErr := mqChan.Close() // Should we join mqChanErr if err is nil? When declare succeeeds a Close error is fairly inconsequential. Maybe just log it in that case? Food for thought. if mqChanErr != nil && !errors.Is(mqChanErr, amqp.ErrClosed) {