From fde730dde06fd7ba093221f0af0f2da089519e29 Mon Sep 17 00:00:00 2001 From: Danlock Date: Tue, 22 Oct 2024 18:30:02 -0400 Subject: [PATCH] added clarifications, improve log --- README.md | 4 ++-- hang_int_test.go | 2 ++ publisher.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 54a4b01..a65fa9e 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ This package attempts to provide a wrapper of useful features on top of amqp091, - Minimal API that doesn't get in the way of lower level access. The amqp091.Connection is there if you need it. amqp-091 knowledge is more transferable since danlock/rmq builds on top of those concepts rather than encapsulating things it doesn't need to. -- Network aware message delivery. Infra can fail so danlock/rmq uses context.Context and default timeouts wherever possible. +- Network aware message delivery. Networks fail so danlock/rmq uses context.Context and default timeouts wherever possible, and tries to redeliver across network failures, unlike amqp091-go. - One dependency (rabbitmq/amqp091-go). @@ -21,7 +21,7 @@ This package attempts to provide a wrapper of useful features on top of amqp091, # Examples -Using an AMQP publisher to publish a message with at least once delivery. +Using an AMQP publisher to publish a message with at least once delivery, that retries for up to a minute on failures. ``` ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) diff --git a/hang_int_test.go b/hang_int_test.go index e2afbef..ffde92e 100644 --- a/hang_int_test.go +++ b/hang_int_test.go @@ -108,4 +108,6 @@ func Example_hanging() { if chanDur > (hangTime - (hangTime / 10)) { log.Fatalf("rmqConn.Channel hung for (%s)", chanDur) } + // A caveat here is that rmqConn has leaked a goroutine that blocks until the connection sorts itself out. + // If amqp091-go ever fixes https://github.com/rabbitmq/amqp091-go/issues/225 then we can improve this situation. } diff --git a/publisher.go b/publisher.go index f647809..93079ea 100644 --- a/publisher.go +++ b/publisher.go @@ -282,7 +282,7 @@ func (p *Publisher) PublishUntilAcked(ctx context.Context, confirmTimeout time.D // resending if it's been longer than confirmTimeout or if they've been nacked. // confirmTimeout defaults to 1 minute. Recommended to call with context.WithTimeout. func (p *Publisher) PublishBatchUntilAcked(ctx context.Context, confirmTimeout time.Duration, pubs ...Publishing) error { - logPrefix := "rmq.Publisher.PublishBatchUntilConfirmed" + logPrefix := "rmq.Publisher.PublishBatchUntilAcked" if len(pubs) == 0 { return nil