diff --git a/README.md b/README.md index e6f3581..a9d14bf 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ An AMQP library for Go, built on top of amqp091. [streadway/amqp](https://github.com/streadway/amqp), the library the RabbitMQ maintainers forked to [amqp-091](https://github.com/rabbitmq/amqp091-go), is a stable, thin client for communicating to RabbitMQ, but lacks many of the features present in RabbitMQ libraries from other languages. Many redialable AMQP connections have been reinvented in Go codebases everywhere. -This package attempts to provide a wrapper of useful features on top of amqp091, in the hopes of preventing at least one more unnecessary reinvention. +This package attempts to provide a wrapper of useful features on top of amqp091, in the hopes of preventing at least one more unnecessary reinvention (other than itself!) # Design Goals @@ -15,7 +15,7 @@ This package attempts to provide a wrapper of useful features on top of amqp091, - Network aware message delivery. Infra can fail so danlock/rmq uses context.Context and default timeouts wherever possible. -- As few dependencies as possible. +- One dependency (rabbitmq/amqp091-go). - Prioritize readability. This means no functions with 5 boolean args. @@ -72,8 +72,8 @@ All classes accept a Log function pointer that can be ignored entirely, set easi Here is an example logrus wrapper. danlock/rmq only uses the predefined slog.Level's, and doesn't send any args. ``` - PublisherConfig{ - Log: func(ctx context.Context, level slog.Level, msg string, args ...any) { + CommonConfig{ + Log: func(ctx context.Context, level slog.Level, msg string, _ ...any) { logruslevel, _ := logrus.ParseLevel(level.String()) logrus.StandardLogger().WithContext(ctx).Logf(logruslevel, msg) } diff --git a/benchmark_int_test.go b/benchmark_int_test.go index 03a5fa7..7fd9da0 100644 --- a/benchmark_int_test.go +++ b/benchmark_int_test.go @@ -48,8 +48,8 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) { }}, } - subRMQConn := rmq.ConnectWithURL(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI")) - pubRMQConn := rmq.ConnectWithURL(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI")) + subRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI")) + pubRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI")) consumer := rmq.NewConsumer(subRMQConn, rmq.ConsumerConfig{ CommonConfig: baseCfg, @@ -211,5 +211,4 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) { } }) } - } diff --git a/connection.go b/connection.go index 53e09c4..a018b07 100644 --- a/connection.go +++ b/connection.go @@ -27,12 +27,9 @@ type ConnectConfig struct { CommonConfig // Topology will be declared each connection to mitigate downed RabbitMQ nodes. Recommended to set, but not required. Topology Topology -} - -func ConnectWithURL(ctx context.Context, conf ConnectConfig, amqpURL string) *Connection { - return Connect(ctx, conf, func() (AMQPConnection, error) { - return amqp.Dial(amqpURL) - }) + // DisableAMQP091Logs ensures the Connection's logs will not include rabbitmq/amqp091-go log's. + // Note only the first Connection created will include rabbitmq/amqp091-go logs, due to rabbitmq/amqp091-go using a single global logger. + DisableAMQP091Logs bool } func ConnectWithURLs(ctx context.Context, conf ConnectConfig, amqpURLs ...string) *Connection { @@ -40,15 +37,15 @@ func ConnectWithURLs(ctx context.Context, conf ConnectConfig, amqpURLs ...string panic("ConnectWithURLs needs amqpURLs!") } return Connect(ctx, conf, func() (AMQPConnection, error) { - var errs error + errs := make([]error, 0, len(amqpURLs)) for _, amqpURL := range amqpURLs { amqpConn, err := amqp.Dial(amqpURL) if err == nil { return amqpConn, nil } - errs = errors.Join(err) + errs = append(errs, err) } - return nil, errs + return nil, errors.Join(errs...) }) } @@ -68,8 +65,11 @@ func Connect(ctx context.Context, conf ConnectConfig, dialFn func() (AMQPConnect if dialFn == nil || ctx == nil { panic("Connect requires a ctx and a dialFn") } - // Thread safely set the amqp091 logger so it's included within danlock/rmq Connection Logs. - if conf.Log != nil { + // Thread safely set the amqp091 logger once so it's included within danlock/rmq Connection Logs. + // We use a sync.Once to avoid races, but this also means just the first Connection logs these errors. + // It could be useful to have every Connection log these but practically that would lead to unneccessary log spam. + // If this behaviour causes an issue your only solution is to set DisableAMQP091Logs and call amqp.SetLogger yourself + if conf.Log != nil && !conf.DisableAMQP091Logs { setAMQP091Logger.Do(func() { amqp.SetLogger(internal.AMQP091Logger{ctx, conf.Log}) }) @@ -191,6 +191,7 @@ func (c *Connection) redial(dialFn func() (AMQPConnection, error)) { c.config.Log(c.ctx, slog.LevelError, logPrefix+" failed, retrying after %s. err: %+v", dialDelay.String(), err) continue } + logPrefix = fmt.Sprintf("rmq.Connection.redial's AMQPConnection (%s -> %s)", amqpConn.LocalAddr(), amqpConn.RemoteAddr()) // Redeclare Topology if we have one. This has the bonus aspect of making sure the connection is actually usable, better than a Ping. if err := DeclareTopology(c.ctx, amqpConn, c.config.Topology); err != nil { diff --git a/consumer.go b/consumer.go index 1b79d06..9b561e9 100644 --- a/consumer.go +++ b/consumer.go @@ -176,7 +176,7 @@ func (c *Consumer) declareAndConsume(ctx context.Context, mqChan *amqp.Channel) // Consume uses the rmq.Consumer config to declare and consume from an AMQP queue, forwarding deliveries to it's returned channel. // On errors Consume reconnects to AMQP, redeclares and resumes consumption and forwarding of deliveries. // Consume returns an unbuffered channel, and will block on sending to it if no ones listening. -// The returned channel is closed only after the context finishes. +// The returned channel is closed only after the context finishes and the amqp.Channel.Consume's Go channel delivers it's messages. func (c *Consumer) Consume(ctx context.Context) <-chan amqp.Delivery { outChan := make(chan amqp.Delivery) go func() { diff --git a/healthcheck_int_test.go b/healthcheck_int_test.go index 000c1a8..7eaea90 100644 --- a/healthcheck_int_test.go +++ b/healthcheck_int_test.go @@ -31,8 +31,8 @@ func Example() { // If we want to use a different log library instead of log/slog.Log, wrap the function instead. // If call depth is being logged, add to it so it doesn't just print this log function. // Here we use log instead of slog - customLog := func(ctx context.Context, level slog.Level, msg string, args ...any) { - log.Printf("[%s] trace_id=%v msg="+msg, append([]any{level, ctx.Value("your_embedded_trace_id")}, args...)...) + customLog := func(ctx context.Context, level slog.Level, msg string, _ ...any) { + log.Printf("[%s] trace_id=%v msg="+msg, level, ctx.Value("your_embedded_trace_id")) } commonCfg := rmq.CommonConfig{Log: customLog} // Create an AMQP topology for our healthcheck, which uses a temporary exchange. @@ -46,8 +46,8 @@ func Example() { // danlock/rmq best practice is including your applications topology in your ConnectConfig cfg := rmq.ConnectConfig{CommonConfig: commonCfg, Topology: topology} // RabbitMQ best practice is to pub and sub on different AMQP connections to avoid TCP backpressure causing issues with message consumption. - pubRMQConn := rmq.ConnectWithURL(ctx, cfg, os.Getenv("TEST_AMQP_URI")) - subRMQConn := rmq.ConnectWithURL(ctx, cfg, os.Getenv("TEST_AMQP_URI")) + pubRMQConn := rmq.ConnectWithURLs(ctx, cfg, os.Getenv("TEST_AMQP_URI")) + subRMQConn := rmq.ConnectWithURLs(ctx, cfg, os.Getenv("TEST_AMQP_URI")) // A rudimentary healthcheck of a rmq.Connection is to ensure it can get a Channel, but we can do better _, err := subRMQConn.MustChannel(ctx) diff --git a/topology.go b/topology.go index de85929..4d92d5b 100644 --- a/topology.go +++ b/topology.go @@ -13,16 +13,6 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) -// Topology contains all the exchange, queue and binding information needed for your application to use RabbitMQ. -type Topology struct { - CommonConfig - - Exchanges []Exchange - ExchangeBindings []ExchangeBinding - Queues []Queue - QueueBindings []QueueBinding -} - // Exchange contains args for amqp.Channel.ExchangeDeclare type Exchange struct { Name string @@ -47,7 +37,7 @@ type ExchangeBinding struct { // DeclareTopology declares an AMQP topology once. // If you want this to be redeclared automatically on connections, add your Topology to ConnectConfig instead. func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topology) error { - logPrefix := fmt.Sprintf("rmq.DeclareTopology") + logPrefix := fmt.Sprintf("rmq.DeclareTopology for AMQPConnection (%s -> %s)", amqpConn.LocalAddr(), amqpConn.RemoteAddr()) if topology.empty() { return nil @@ -98,6 +88,23 @@ func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topo } } +// ImportJSONTopology reads in a Topology from a file. Useful for setting Exchanges, ExchangeBindings, Queues and QueueBindings from a config, +// although rabbitmqctl is probably a better candidate for this since it can also export your cuurrent RabbitMQ schema. +// Decoding files to structs is easy in Golang, so feel free to write your own as desired for XML, YAML, TOML or any other desired config format. +func ImportJSONTopology(topologyReader io.Reader) (top Topology, _ error) { + return top, json.NewDecoder(topologyReader).Decode(&top) +} + +// Topology contains all the exchange, queue and binding information needed for your application to use RabbitMQ. +type Topology struct { + CommonConfig + + Exchanges []Exchange + ExchangeBindings []ExchangeBinding + Queues []Queue + QueueBindings []QueueBinding +} + func (t *Topology) empty() bool { return len(t.Exchanges) == 0 && len(t.Queues) == 0 && len(t.ExchangeBindings) == 0 && len(t.QueueBindings) == 0 @@ -162,10 +169,3 @@ func (t *Topology) declare(ctx context.Context, mqChan *amqp.Channel) (err error return nil } - -// ImportJSONTopology reads in a Topology from a file. Useful for setting Exchanges, ExchangeBindings, Queues and QueueBindings from a config, -// although rabbitmqctl is probably a better candidate for this since it can also export your cuurrent RabbitMQ schema. -// Decoding files to structs is easy in Golang, so feel free to write your own as desired for XML, YAML, TOML or any other desired config format. -func ImportJSONTopology(topologyReader io.Reader) (top Topology, _ error) { - return top, json.NewDecoder(topologyReader).Decode(&top) -}