diff --git a/Makefile b/Makefile index 6f73857..3c9399c 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,9 @@ #! /usr/bin/make +SHELL = /bin/bash BUILDTIME = $(shell date -u --rfc-3339=seconds) GITHASH = $(shell git describe --dirty --always --tags) GITCOMMITNO = $(shell git rev-list --all --count) -SHORTBUILDTAG = $(GITCOMMITNO).$(GITHASH) +SHORTBUILDTAG = v0.0.$(GITCOMMITNO)-$(GITHASH) BUILDINFO = Build Time:$(BUILDTIME) LDFLAGS = -X 'main.buildTag=$(SHORTBUILDTAG)' -X 'main.buildInfo=$(BUILDINFO)' @@ -48,3 +49,14 @@ update-readme-badge: # pkg.go.dev documentation is updated via go get update-proxy-cache: @GOPROXY=https://proxy.golang.org go get github.com/danlock/rmq + +release: +ifeq ($(findstring dirty,$(SHORTBUILDTAG)),dirty) + @echo "Version $(SHORTBUILDTAG) is filthy, commit to clean it" && exit 1 +endif + @read -t 5 -p "$(SHORTBUILDTAG) will be the new released version. Hit enter to proceed, CTRL-C to cancel." + @$(MAKE) deps + @$(MAKE) test + @$(MAKE) bench + @git tag $(SHORTBUILDTAG) + @git push origin $(SHORTBUILDTAG) diff --git a/README.md b/README.md index a9d14bf..bcb8a7c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # rmq -![Coverage](https://img.shields.io/badge/Coverage-86.1%25-brightgreen) +![Coverage](https://img.shields.io/badge/Coverage-85.6%25-brightgreen) [![Go Report Card](https://goreportcard.com/badge/github.com/danlock/rmq)](https://goreportcard.com/report/github.com/danlock/rmq) [![Go Reference](https://pkg.go.dev/badge/github.com/danlock/rmq.svg)](https://pkg.go.dev/github.com/danlock/rmq) @@ -26,11 +26,11 @@ Using an AMQP publisher to publish a message with at least once delivery. ``` ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) defer cancel() -cfg := rmq.CommonConfig{Log: slog.Log} +cfg := rmq.Args{Log: slog.Log} -rmqConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: cfg}, os.Getenv("AMQP_URL_1"), os.Getenv("AMQP_URL_2")) +rmqConn := rmq.ConnectWithURLs(ctx, rmq.ConnectArgs{Args: cfg}, os.Getenv("AMQP_URL_1"), os.Getenv("AMQP_URL_2")) -rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{CommonConfig: cfg}) +rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherArgs{Args: cfg}) msg := rmq.Publishing{Exchange: "amq.topic", RoutingKey: "somewhere", Mandatory: true} msg.Body = []byte(`{"life": 42}`) @@ -44,12 +44,12 @@ Using a reliable AMQP consumer that receives deliveries through transient networ ``` ctx, := context.TODO() -cfg := rmq.CommonConfig{Log: slog.Log} +cfg := rmq.Args{Log: slog.Log} -rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectConfig{CommonConfig: cfg}, os.Getenv("AMQP_URL"), amqp.Config{}) +rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: cfg}, os.Getenv("AMQP_URL"), amqp.Config{}) -consCfg := rmq.ConsumerConfig{ - CommonConfig: cfg, +consCfg := rmq.ConsumerArgs{ + Args: cfg, Queue: rmq.Queue{Name: "q2d2", AutoDelete: true}, Qos: rmq.Qos{PrefetchCount: 1000}, } @@ -72,7 +72,7 @@ All classes accept a Log function pointer that can be ignored entirely, set easi Here is an example logrus wrapper. danlock/rmq only uses the predefined slog.Level's, and doesn't send any args. ``` - CommonConfig{ + Args{ Log: func(ctx context.Context, level slog.Level, msg string, _ ...any) { logruslevel, _ := logrus.ParseLevel(level.String()) logrus.StandardLogger().WithContext(ctx).Logf(logruslevel, msg) diff --git a/benchmark_int_test.go b/benchmark_int_test.go index 7fd9da0..554c748 100644 --- a/benchmark_int_test.go +++ b/benchmark_int_test.go @@ -37,9 +37,9 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) { randSuffix := fmt.Sprintf("%d.%p", time.Now().UnixNano(), b) queueName := "BenchmarkPublishAndConsumeMany" + randSuffix - baseCfg := rmq.CommonConfig{Log: slog.Log} + baseCfg := rmq.Args{Log: slog.Log} topology := rmq.Topology{ - CommonConfig: baseCfg, + Args: baseCfg, Queues: []rmq.Queue{{ Name: queueName, Args: amqp.Table{ @@ -48,32 +48,32 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) { }}, } - subRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI")) - pubRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI")) + subRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectArgs{Args: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI")) + pubRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectArgs{Args: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI")) - consumer := rmq.NewConsumer(subRMQConn, rmq.ConsumerConfig{ - CommonConfig: baseCfg, - Queue: topology.Queues[0], + consumer := rmq.NewConsumer(subRMQConn, rmq.ConsumerArgs{ + Args: baseCfg, + Queue: topology.Queues[0], }) - publisher := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{ - CommonConfig: baseCfg, - LogReturns: true, + publisher := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{ + Args: baseCfg, + LogReturns: true, }) - publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{ - CommonConfig: baseCfg, - LogReturns: true, - }), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{ - CommonConfig: baseCfg, - LogReturns: true, + publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{ + Args: baseCfg, + LogReturns: true, + }), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{ + Args: baseCfg, + LogReturns: true, }) dot := []byte(".") errChan := make(chan error) consumeChan := consumer.Consume(ctx) - publishings := generatePublishings(10000, queueName) + publishings := generatePublishings(10_000, queueName) cases := []struct { name string diff --git a/config.go b/config.go index 48efca0..c685cea 100644 --- a/config.go +++ b/config.go @@ -8,8 +8,8 @@ import ( "github.com/danlock/rmq/internal" ) -// CommonConfig contains options shared by danlock/rmq classes. -type CommonConfig struct { +// Args contains common options shared by danlock/rmq classes. +type Args struct { // AMQPTimeout sets a timeout on AMQP operations. Defaults to 1 minute. AMQPTimeout time.Duration // Delay returns the delay between retry attempts. Defaults to FibonacciDelay. @@ -18,7 +18,7 @@ type CommonConfig struct { Log func(ctx context.Context, level slog.Level, msg string, args ...any) } -func (cfg *CommonConfig) setDefaults() { +func (cfg *Args) setDefaults() { if cfg.AMQPTimeout == 0 { cfg.AMQPTimeout = time.Minute } diff --git a/conn_test.go b/conn_test.go index 9815fdd..8d7062d 100644 --- a/conn_test.go +++ b/conn_test.go @@ -48,13 +48,13 @@ func TestConnect(t *testing.T) { defer func() { errs <- recover() }() - _ = Connect(nil, ConnectConfig{}, nil) + _ = Connect(nil, ConnectArgs{}, nil) }() result := <-errs if result == nil { t.Fatalf("Connect should panic when missing required arguments") } - rmqConn := Connect(context.Background(), ConnectConfig{}, func() (AMQPConnection, error) { return nil, fmt.Errorf("sike") }) + rmqConn := Connect(context.Background(), ConnectArgs{}, func() (AMQPConnection, error) { return nil, fmt.Errorf("sike") }) if rmqConn == nil { t.Fatalf("Connect failed to return a rmq.Connection") } @@ -90,7 +90,7 @@ func TestConnection_CurrentConnection(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - rmqConn := Connect(tt.connCtx, ConnectConfig{}, tt.connDialFn) + rmqConn := Connect(tt.connCtx, ConnectArgs{}, tt.connDialFn) got, err := rmqConn.CurrentConnection(tt.reqCtx) if !errors.Is(err, tt.wantErr) { t.Fatalf("rmq.Connection.CurrentConnection() error = %v, wantErr %v", err, tt.wantErr) @@ -120,9 +120,9 @@ func TestConnection_Channel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - baseCfg := CommonConfig{Log: slog.Log} - connConf := ConnectConfig{ - CommonConfig: baseCfg, + baseCfg := Args{Log: slog.Log} + connConf := ConnectArgs{ + Args: baseCfg, } goodRMQConn := Connect(ctx, connConf, func() (AMQPConnection, error) { return goodMockAMQP, nil @@ -133,7 +133,7 @@ func TestConnection_Channel(t *testing.T) { slowRMQConn := Connect(ctx, connConf, func() (AMQPConnection, error) { return slowMockAMQP, nil }) - slowConfig := ConnectConfig{CommonConfig: CommonConfig{Log: slog.Log, AMQPTimeout: 50 * time.Millisecond}} + slowConfig := ConnectArgs{Args: Args{Log: slog.Log, AMQPTimeout: 50 * time.Millisecond}} slowUsingTimeoutRMQConn := Connect(ctx, slowConfig, func() (AMQPConnection, error) { return slowMockAMQP, nil }) @@ -180,8 +180,8 @@ func TestConnection_Channel(t *testing.T) { }) } - connConf = ConnectConfig{ - CommonConfig: CommonConfig{ + connConf = ConnectArgs{ + Args: Args{ Log: connConf.Log, Delay: func(attempt int) time.Duration { return time.Duration(attempt) * time.Millisecond }, }, diff --git a/connection.go b/connection.go index a018b07..2d308cd 100644 --- a/connection.go +++ b/connection.go @@ -23,8 +23,8 @@ type AMQPConnection interface { IsClosed() bool } -type ConnectConfig struct { - CommonConfig +type ConnectArgs struct { + Args // Topology will be declared each connection to mitigate downed RabbitMQ nodes. Recommended to set, but not required. Topology Topology // DisableAMQP091Logs ensures the Connection's logs will not include rabbitmq/amqp091-go log's. @@ -32,7 +32,7 @@ type ConnectConfig struct { DisableAMQP091Logs bool } -func ConnectWithURLs(ctx context.Context, conf ConnectConfig, amqpURLs ...string) *Connection { +func ConnectWithURLs(ctx context.Context, conf ConnectArgs, amqpURLs ...string) *Connection { if len(amqpURLs) == 0 { panic("ConnectWithURLs needs amqpURLs!") } @@ -49,7 +49,7 @@ func ConnectWithURLs(ctx context.Context, conf ConnectConfig, amqpURLs ...string }) } -func ConnectWithAMQPConfig(ctx context.Context, conf ConnectConfig, amqpURL string, amqpConf amqp.Config) *Connection { +func ConnectWithAMQPConfig(ctx context.Context, conf ConnectArgs, amqpURL string, amqpConf amqp.Config) *Connection { return Connect(ctx, conf, func() (AMQPConnection, error) { return amqp.DialConfig(amqpURL, amqpConf) }) @@ -61,7 +61,7 @@ var setAMQP091Logger sync.Once // Each Channel() call triggers rmq.Connection to return an amqp.Channel from it's CurrentConnection() or redial with the provided dialFn for a new AMQP Connection. // ConnectWith* functions provide a few simple dialFn's for ease of use. They can be a simple wrapper around an amqp.Dial or much more complicated. // If you want to ensure the Connection is working, call MustChannel with a timeout. -func Connect(ctx context.Context, conf ConnectConfig, dialFn func() (AMQPConnection, error)) *Connection { +func Connect(ctx context.Context, conf ConnectArgs, dialFn func() (AMQPConnection, error)) *Connection { if dialFn == nil || ctx == nil { panic("Connect requires a ctx and a dialFn") } @@ -119,7 +119,7 @@ type Connection struct { chanReqChan chan internal.ChanReq[*amqp.Channel] currentConReqChan chan internal.ChanReq[AMQPConnection] - config ConnectConfig + config ConnectArgs } // Channel requests an AMQP channel from the current AMQP Connection. diff --git a/consumer.go b/consumer.go index 9b561e9..a2b7c55 100644 --- a/consumer.go +++ b/consumer.go @@ -11,12 +11,12 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) -// ConsumerConfig contains information needed to declare and consume deliveries from a queue. -type ConsumerConfig struct { - CommonConfig +// ConsumerArgs contains information needed to declare and consume deliveries from a queue. +type ConsumerArgs struct { + Args Queue Queue - QueueBindings []QueueBinding // Only needed for anonymous queues since Consumer's do not return the generated RabbitMQ queue name + QueueBindings []QueueBinding Consume Consume Qos Qos } @@ -60,14 +60,14 @@ type Qos struct { // Consumer enables reliable AMQP Queue consumption. type Consumer struct { - config ConsumerConfig + config ConsumerArgs conn *Connection } -// NewConsumer takes in a ConsumerConfig that describes the AMQP topology of a single queue, +// NewConsumer takes in a ConsumerArgs that describes the AMQP topology of a single queue, // and returns a rmq.Consumer that can redeclare this topology on any errors during queue consumption. // This enables robust reconnections even on unreliable networks. -func NewConsumer(rmqConn *Connection, config ConsumerConfig) *Consumer { +func NewConsumer(rmqConn *Connection, config ConsumerArgs) *Consumer { config.setDefaults() return &Consumer{config: config, conn: rmqConn} } diff --git a/consumer_int_test.go b/consumer_int_test.go index da7bc1d..ee4742b 100644 --- a/consumer_int_test.go +++ b/consumer_int_test.go @@ -16,7 +16,6 @@ import ( "time" "github.com/danlock/rmq" - "github.com/danlock/rmq/internal" amqp "github.com/rabbitmq/amqp091-go" ) @@ -32,11 +31,11 @@ func ForceRedial(ctx context.Context, rmqConn *rmq.Connection) error { func TestConsumer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - connectCfg := rmq.ConnectConfig{CommonConfig: rmq.CommonConfig{Log: slog.Log}} + connectCfg := rmq.ConnectArgs{Args: rmq.Args{Log: slog.Log}} rmqConn := rmq.ConnectWithURLs(ctx, connectCfg, "amqp://dont.exist", os.Getenv("TEST_AMQP_URI")) - baseConsConfig := rmq.ConsumerConfig{ - CommonConfig: connectCfg.CommonConfig, + baseConsConfig := rmq.ConsumerArgs{ + Args: connectCfg.Args, Queue: rmq.Queue{ Name: fmt.Sprintf("TestRMQConsumer.%p", t), Args: amqp.Table{amqp.QueueTTLArg: time.Minute.Milliseconds()}, @@ -62,9 +61,9 @@ func TestConsumer(t *testing.T) { _ = msg.Ack(false) }) time.Sleep(time.Second / 10) - unreliableRMQPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{DontConfirm: true}) + unreliableRMQPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherArgs{DontConfirm: true}) unreliableRMQPub.Publish(ctx, rmq.Publishing{Exchange: "amq.fanout"}) - rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{CommonConfig: connectCfg.CommonConfig}) + rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherArgs{Args: connectCfg.Args}) ForceRedial(ctx, rmqConn) pubCtx, pubCancel := context.WithTimeout(ctx, 20*time.Second) @@ -109,12 +108,13 @@ func TestConsumer_Load(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute/2) defer cancel() - logf := internal.SlogLog(nil) - internal.WrapLogFunc(&logf) + logf := slog.Log + baseName := fmt.Sprint("TestRMQConsumer_Load_Base_", rand.Uint64()) prefetchName := fmt.Sprint("TestRMQConsumer_Load_Prefetch_", rand.Uint64()) - baseCfg := rmq.CommonConfig{Log: logf} + baseCfg := rmq.Args{Log: logf} topology := rmq.Topology{ + Args: baseCfg, Queues: []rmq.Queue{{ Name: baseName, Args: amqp.Table{amqp.QueueTTLArg: time.Minute.Milliseconds()}, @@ -124,7 +124,7 @@ func TestConsumer_Load(t *testing.T) { }}, } - rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) + rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) periodicallyCloseConn := func() { for { @@ -138,9 +138,9 @@ func TestConsumer_Load(t *testing.T) { } go periodicallyCloseConn() - baseConsConfig := rmq.ConsumerConfig{ - CommonConfig: baseCfg, - Queue: topology.Queues[0], + baseConsConfig := rmq.ConsumerArgs{ + Args: baseCfg, + Queue: topology.Queues[0], Consume: rmq.Consume{ Consumer: baseName, }, @@ -151,8 +151,8 @@ func TestConsumer_Load(t *testing.T) { prefetchConsConfig.Qos.PrefetchCount = 10 prefetchConsConfig.Queue.Name = prefetchName - consumers := []rmq.ConsumerConfig{baseConsConfig, prefetchConsConfig} - publisher := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{CommonConfig: baseCfg}) + consumers := []rmq.ConsumerArgs{baseConsConfig, prefetchConsConfig} + publisher := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherArgs{Args: baseCfg}) msgCount := 5_000 errChan := make(chan error, (msgCount/2+1)*len(consumers)) @@ -230,13 +230,13 @@ func TestConsumer_Load(t *testing.T) { func TestRMQConsumer_AutogeneratedQueueNames(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - baseCfg := rmq.CommonConfig{Log: slog.Log} - rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectConfig{CommonConfig: baseCfg}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) + baseCfg := rmq.Args{Log: slog.Log} + rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: baseCfg}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) // NewConsumer with an empty Queue.Name will declare a queue with a RabbitMQ generated name // This is useless unless the config also includes QueueBindings, since reconnections cause RabbitMQ to generate a different name anyway - cons := rmq.NewConsumer(rmqConn, rmq.ConsumerConfig{ - CommonConfig: baseCfg, + cons := rmq.NewConsumer(rmqConn, rmq.ConsumerArgs{ + Args: baseCfg, QueueBindings: []rmq.QueueBinding{ {ExchangeName: "amq.fanout", RoutingKey: "TestRMQConsumer_AutogeneratedQueueNames"}, }, @@ -256,7 +256,7 @@ func TestRMQConsumer_AutogeneratedQueueNames(t *testing.T) { // rmq.Consumer could remember the last queue name to consume from it again, but that wouldn't be reliable with auto-deleted or expiring queues. // It's simpler to disallow that use case by not making RabbitMQ generated queue names available from rmq.Consumer. secondDeliveries := cons.Consume(ctx) - publisher := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{CommonConfig: baseCfg, LogReturns: true}) + publisher := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherArgs{Args: baseCfg, LogReturns: true}) pubCount := 10 time.Sleep(time.Second / 3) diff --git a/healthcheck_int_test.go b/healthcheck_int_test.go index 7eaea90..cb8a867 100644 --- a/healthcheck_int_test.go +++ b/healthcheck_int_test.go @@ -34,17 +34,17 @@ func Example() { customLog := func(ctx context.Context, level slog.Level, msg string, _ ...any) { log.Printf("[%s] trace_id=%v msg="+msg, level, ctx.Value("your_embedded_trace_id")) } - commonCfg := rmq.CommonConfig{Log: customLog} + commonCfg := rmq.Args{Log: customLog} // Create an AMQP topology for our healthcheck, which uses a temporary exchange. // Design goals of danlock/rmq include reducing the amount of naked booleans in function signatures. topology := rmq.Topology{ - CommonConfig: commonCfg, + Args: commonCfg, Exchanges: []rmq.Exchange{{Name: "healthcheck", Kind: amqp.ExchangeDirect, AutoDelete: true}}, Queues: []rmq.Queue{{Name: "healthcheck", AutoDelete: true}}, QueueBindings: []rmq.QueueBinding{{QueueName: "healthcheck", ExchangeName: "healthcheck"}}, } // danlock/rmq best practice is including your applications topology in your ConnectConfig - cfg := rmq.ConnectConfig{CommonConfig: commonCfg, Topology: topology} + cfg := rmq.ConnectArgs{Args: commonCfg, Topology: topology} // RabbitMQ best practice is to pub and sub on different AMQP connections to avoid TCP backpressure causing issues with message consumption. pubRMQConn := rmq.ConnectWithURLs(ctx, cfg, os.Getenv("TEST_AMQP_URI")) subRMQConn := rmq.ConnectWithURLs(ctx, cfg, os.Getenv("TEST_AMQP_URI")) @@ -55,15 +55,15 @@ func Example() { panic("couldn't get a channel") } - rmqCons := rmq.NewConsumer(subRMQConn, rmq.ConsumerConfig{ - CommonConfig: commonCfg, - Queue: topology.Queues[0], - Qos: rmq.Qos{PrefetchCount: 10}, + rmqCons := rmq.NewConsumer(subRMQConn, rmq.ConsumerArgs{ + Args: commonCfg, + Queue: topology.Queues[0], + Qos: rmq.Qos{PrefetchCount: 10}, }) // Now we have a RabbitMQ queue with messages incoming on the deliveries channel, even if the network flakes. deliveries := rmqCons.Consume(ctx) - rmqPub := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{CommonConfig: commonCfg}) + rmqPub := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{Args: commonCfg}) // Now we have an AMQP publisher that can sends messages with at least once delivery. // Generate "unique" messages for our healthchecker to check later baseMsg := rmq.Publishing{Exchange: topology.Exchanges[0].Name, Mandatory: true} diff --git a/publisher.go b/publisher.go index e70b489..c7b63a2 100644 --- a/publisher.go +++ b/publisher.go @@ -11,12 +11,12 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) -type PublisherConfig struct { - CommonConfig +type PublisherArgs struct { + Args // NotifyReturn will receive amqp.Return's from any amqp.Channel this rmq.Publisher sends on. // Recommended to use a buffered channel. Closed after the publisher's context is done. NotifyReturn chan<- amqp.Return - // LogReturns without their amqp.Return.Body using CommonConfig.Log when true + // LogReturns without their amqp.Return.Body using Args.Log when true LogReturns bool // DontConfirm means the Publisher's amqp.Channel won't be in Confirm mode. Methods except for Publish will throw an error. @@ -25,13 +25,13 @@ type PublisherConfig struct { type Publisher struct { ctx context.Context - config PublisherConfig + config PublisherArgs in chan *Publishing } // NewPublisher creates a rmq.Publisher that will publish messages to AMQP on a single amqp.Channel at a time. // On error it reconnects via rmq.Connection. Shuts down when it's context is finished. -func NewPublisher(ctx context.Context, rmqConn *Connection, config PublisherConfig) *Publisher { +func NewPublisher(ctx context.Context, rmqConn *Connection, config PublisherArgs) *Publisher { if ctx == nil || rmqConn == nil { panic("rmq.NewPublisher called with nil ctx or rmqConn") } @@ -309,25 +309,18 @@ func (p *Publisher) PublishBatchUntilAcked(ctx context.Context, confirmTimeout t pendingPubs := make([]*amqp.DeferredConfirmation, len(pubs)) ackedPubs := make([]bool, len(pubs)) - remainingPubs := func() int { - unacks := 0 - for _, acked := range ackedPubs { - if !acked { - unacks++ - } - } - return unacks - } - for { select { + case <-p.ctx.Done(): + err := fmt.Errorf(logPrefix+"'s Publisher timed out because %w", context.Cause(p.ctx)) + return errors.Join(append(errs, err)...) case <-ctx.Done(): err := fmt.Errorf(logPrefix+" timed out because %w", context.Cause(ctx)) return errors.Join(append(errs, err)...) default: } - err := p.publishBatch(ctx, confirmTimeout, remainingPubs(), pubs, pendingPubs, ackedPubs, errs) + err := p.publishBatch(ctx, confirmTimeout, pubs, pendingPubs, ackedPubs, errs) if err == nil { return nil } @@ -341,35 +334,37 @@ func (p *Publisher) PublishBatchUntilAcked(ctx context.Context, confirmTimeout t func (p *Publisher) publishBatch( ctx context.Context, confirmTimeout time.Duration, - remaining int, pubs []Publishing, pendingPubs []*amqp.DeferredConfirmation, ackedPubs []bool, errs []error, ) (err error) { logPrefix := "rmq.Publisher.publishBatch" - published := 0 - attempt := 0 var delay time.Duration + attempt := 0 + published := 0 + remaining := 0 + for _, acked := range ackedPubs { + if !acked { + remaining++ + } + } + for published != remaining { for i, pub := range pubs { - // Skip if it's been successfully published or acked - if pendingPubs[i] != nil || ackedPubs[i] { + // Skip if it's been previously acked or published + if ackedPubs[i] || pendingPubs[i] != nil { continue } - select { - case <-ctx.Done(): - return fmt.Errorf(logPrefix+" timed out because %w", context.Cause(ctx)) - default: - } - pendingPubs[i], err = p.Publish(ctx, pub) if err != nil { errs = append(errs, err) delay = p.config.Delay(attempt) attempt++ select { + case <-p.ctx.Done(): + return fmt.Errorf(logPrefix+"'s Publisher timed out because %w", context.Cause(p.ctx)) case <-ctx.Done(): return fmt.Errorf(logPrefix+" timed out because %w", context.Cause(ctx)) case <-time.After(delay): @@ -385,12 +380,14 @@ func (p *Publisher) publishBatch( confirmed := 0 for confirmed != remaining { for i, pub := range pendingPubs { - // Skip if it's already been confirmed - if pendingPubs[i] == nil { + // Skip if it's been previously confirmed + if ackedPubs[i] || pendingPubs[i] == nil { continue } select { + case <-p.ctx.Done(): + return fmt.Errorf(logPrefix+"'s Publisher timed out because %w", context.Cause(p.ctx)) case <-ctx.Done(): return fmt.Errorf(logPrefix+" timed out because %w", context.Cause(ctx)) case <-confirmTimer: diff --git a/publisher_int_test.go b/publisher_int_test.go index 8a6e7aa..c74409f 100644 --- a/publisher_int_test.go +++ b/publisher_int_test.go @@ -18,18 +18,18 @@ func TestPublisher(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() - baseCfg := rmq.CommonConfig{Log: slog.Log} - rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectConfig{CommonConfig: baseCfg}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) + baseCfg := rmq.Args{Log: slog.Log} + rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: baseCfg}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) - unreliableRMQPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{DontConfirm: true}) + unreliableRMQPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherArgs{DontConfirm: true}) _, err := unreliableRMQPub.PublishUntilConfirmed(ctx, time.Minute, rmq.Publishing{}) if err == nil { t.Fatalf("PublishUntilConfirmed succeeded despite the publisher set dont confirm") } returnChan := make(chan amqp.Return, 5) - rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{ - CommonConfig: baseCfg, + rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherArgs{ + Args: baseCfg, NotifyReturn: returnChan, LogReturns: true, }) diff --git a/topology.go b/topology.go index 4d92d5b..07ac1c0 100644 --- a/topology.go +++ b/topology.go @@ -9,14 +9,13 @@ import ( "log/slog" "time" - "github.com/danlock/rmq/internal" amqp "github.com/rabbitmq/amqp091-go" ) // Exchange contains args for amqp.Channel.ExchangeDeclare type Exchange struct { Name string - Kind string + Kind string // Kind is required by ExchangeDeclare. amqp091-go exports valid values like amqp.ExchangeDirect, etc Durable bool AutoDelete bool Internal bool @@ -35,14 +34,14 @@ type ExchangeBinding struct { } // DeclareTopology declares an AMQP topology once. -// If you want this to be redeclared automatically on connections, add your Topology to ConnectConfig instead. +// If you want this to be redeclared automatically on connections, add your Topology to ConnectArgs instead. func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topology) error { logPrefix := fmt.Sprintf("rmq.DeclareTopology for AMQPConnection (%s -> %s)", amqpConn.LocalAddr(), amqpConn.RemoteAddr()) if topology.empty() { return nil } - + dontLog := topology.Log == nil topology.setDefaults() ctx, cancel := context.WithTimeout(ctx, topology.AMQPTimeout) defer cancel() @@ -62,7 +61,8 @@ func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topo err = topology.declare(ctx, mqChan) // An amqp.Channel must not be used from multiple goroutines simultaneously, so close it inside this goroutine to prevent cryptic RabbitMQ errors. mqChanErr := mqChan.Close() - // Should we join mqChanErr if err is nil? When declare succeeeds a Close error is fairly inconsequential. Maybe just log it in that case? Food for thought. + // Should we join mqChanErr if err is nil? When declare succeeeds a Close error is fairly inconsequential. Unless it leaves the channel in a bad state... + // But we don't actually use the channel after this. Maybe just log it in that case? Food for thought. if mqChanErr != nil && !errors.Is(mqChanErr, amqp.ErrClosed) { err = errors.Join(err, mqChanErr) } @@ -78,8 +78,7 @@ func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topo select { case <-ctx.Done(): // Log our leaked goroutine's response whenever it finally finishes since it may have useful debugging information. - if topology.Log != nil { - internal.WrapLogFunc(&topology.Log) + if !dontLog { close(shouldLog) } return fmt.Errorf(logPrefix+" unable to complete before context due to %w", context.Cause(ctx)) @@ -97,7 +96,7 @@ func ImportJSONTopology(topologyReader io.Reader) (top Topology, _ error) { // Topology contains all the exchange, queue and binding information needed for your application to use RabbitMQ. type Topology struct { - CommonConfig + Args Exchanges []Exchange ExchangeBindings []ExchangeBinding @@ -160,8 +159,7 @@ func (t *Topology) declare(ctx context.Context, mqChan *amqp.Channel) (err error for _, b := range t.QueueBindings { err = mqChan.QueueBind(b.QueueName, b.RoutingKey, b.ExchangeName, b.NoWait, b.Args) if err != nil { - err = fmt.Errorf(logPrefix+" unable to bind queue to exchange '%s' via key '%s' due to %w", b.ExchangeName, b.RoutingKey, err) - return + return fmt.Errorf(logPrefix+" unable to bind queue to exchange '%s' via key '%s' due to %w", b.ExchangeName, b.RoutingKey, err) } else if err = context.Cause(ctx); err != nil { return fmt.Errorf(logPrefix+" failed to declare queue bindings before context ended due to %w", err) } diff --git a/topology_int_test.go b/topology_int_test.go index 74f3431..efa9fe7 100644 --- a/topology_int_test.go +++ b/topology_int_test.go @@ -17,8 +17,8 @@ import ( func TestDeclareTopology(t *testing.T) { ctx := context.Background() - baseCfg := rmq.CommonConfig{Log: slog.Log} - rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectConfig{CommonConfig: baseCfg}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) + baseCfg := rmq.Args{Log: slog.Log} + rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: baseCfg}, os.Getenv("TEST_AMQP_URI"), amqp.Config{}) suffix := fmt.Sprintf("%s|%p", time.Now(), t) baseTopology := rmq.Topology{ Exchanges: []rmq.Exchange{{