Skip to content

Commit

Permalink
removed ConnextWithURL since it's just a subset of ConnectWithURLs. F…
Browse files Browse the repository at this point in the history
…ixed README inconsitency
  • Loading branch information
Danlock committed Sep 21, 2023
1 parent 74a63b1 commit 8299fdb
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 41 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ An AMQP library for Go, built on top of amqp091.

[streadway/amqp](https://github.com/streadway/amqp), the library the RabbitMQ maintainers forked to [amqp-091](https://github.com/rabbitmq/amqp091-go), is a stable, thin client for communicating to RabbitMQ, but lacks many of the features present in RabbitMQ libraries from other languages. Many redialable AMQP connections have been reinvented in Go codebases everywhere.

This package attempts to provide a wrapper of useful features on top of amqp091, in the hopes of preventing at least one more unnecessary reinvention.
This package attempts to provide a wrapper of useful features on top of amqp091, in the hopes of preventing at least one more unnecessary reinvention (other than itself!)

# Design Goals

- Minimal API that doesn't get in the way of lower level access. The amqp091.Connection is there if you need it. amqp-091 knowledge is more transferable since danlock/rmq builds on top of those concepts rather than encapsulating things it doesn't need to.

- Network aware message delivery. Infra can fail so danlock/rmq uses context.Context and default timeouts wherever possible.

- As few dependencies as possible.
- One dependency (rabbitmq/amqp091-go).

- Prioritize readability. This means no functions with 5 boolean args.

Expand Down Expand Up @@ -72,8 +72,8 @@ All classes accept a Log function pointer that can be ignored entirely, set easi

Here is an example logrus wrapper. danlock/rmq only uses the predefined slog.Level's, and doesn't send any args.
```
PublisherConfig{
Log: func(ctx context.Context, level slog.Level, msg string, args ...any) {
CommonConfig{
Log: func(ctx context.Context, level slog.Level, msg string, _ ...any) {
logruslevel, _ := logrus.ParseLevel(level.String())
logrus.StandardLogger().WithContext(ctx).Logf(logruslevel, msg)
}
Expand Down
5 changes: 2 additions & 3 deletions benchmark_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) {
}},
}

subRMQConn := rmq.ConnectWithURL(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))
pubRMQConn := rmq.ConnectWithURL(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))
subRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))
pubRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))

consumer := rmq.NewConsumer(subRMQConn, rmq.ConsumerConfig{
CommonConfig: baseCfg,
Expand Down Expand Up @@ -211,5 +211,4 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) {
}
})
}

}
23 changes: 12 additions & 11 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,25 @@ type ConnectConfig struct {
CommonConfig
// Topology will be declared each connection to mitigate downed RabbitMQ nodes. Recommended to set, but not required.
Topology Topology
}

func ConnectWithURL(ctx context.Context, conf ConnectConfig, amqpURL string) *Connection {
return Connect(ctx, conf, func() (AMQPConnection, error) {
return amqp.Dial(amqpURL)
})
// DisableAMQP091Logs ensures the Connection's logs will not include rabbitmq/amqp091-go log's.
// Note only the first Connection created will include rabbitmq/amqp091-go logs, due to rabbitmq/amqp091-go using a single global logger.
DisableAMQP091Logs bool
}

func ConnectWithURLs(ctx context.Context, conf ConnectConfig, amqpURLs ...string) *Connection {
if len(amqpURLs) == 0 {
panic("ConnectWithURLs needs amqpURLs!")
}
return Connect(ctx, conf, func() (AMQPConnection, error) {
var errs error
errs := make([]error, 0, len(amqpURLs))
for _, amqpURL := range amqpURLs {
amqpConn, err := amqp.Dial(amqpURL)
if err == nil {
return amqpConn, nil
}
errs = errors.Join(err)
errs = append(errs, err)
}
return nil, errs
return nil, errors.Join(errs...)
})
}

Expand All @@ -68,8 +65,11 @@ func Connect(ctx context.Context, conf ConnectConfig, dialFn func() (AMQPConnect
if dialFn == nil || ctx == nil {
panic("Connect requires a ctx and a dialFn")
}
// Thread safely set the amqp091 logger so it's included within danlock/rmq Connection Logs.
if conf.Log != nil {
// Thread safely set the amqp091 logger once so it's included within danlock/rmq Connection Logs.
// We use a sync.Once to avoid races, but this also means just the first Connection logs these errors.
// It could be useful to have every Connection log these but practically that would lead to unneccessary log spam.
// If this behaviour causes an issue your only solution is to set DisableAMQP091Logs and call amqp.SetLogger yourself
if conf.Log != nil && !conf.DisableAMQP091Logs {
setAMQP091Logger.Do(func() {
amqp.SetLogger(internal.AMQP091Logger{ctx, conf.Log})
})
Expand Down Expand Up @@ -191,6 +191,7 @@ func (c *Connection) redial(dialFn func() (AMQPConnection, error)) {
c.config.Log(c.ctx, slog.LevelError, logPrefix+" failed, retrying after %s. err: %+v", dialDelay.String(), err)
continue
}
logPrefix = fmt.Sprintf("rmq.Connection.redial's AMQPConnection (%s -> %s)", amqpConn.LocalAddr(), amqpConn.RemoteAddr())

// Redeclare Topology if we have one. This has the bonus aspect of making sure the connection is actually usable, better than a Ping.
if err := DeclareTopology(c.ctx, amqpConn, c.config.Topology); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *Consumer) declareAndConsume(ctx context.Context, mqChan *amqp.Channel)
// Consume uses the rmq.Consumer config to declare and consume from an AMQP queue, forwarding deliveries to it's returned channel.
// On errors Consume reconnects to AMQP, redeclares and resumes consumption and forwarding of deliveries.
// Consume returns an unbuffered channel, and will block on sending to it if no ones listening.
// The returned channel is closed only after the context finishes.
// The returned channel is closed only after the context finishes and the amqp.Channel.Consume's Go channel delivers it's messages.
func (c *Consumer) Consume(ctx context.Context) <-chan amqp.Delivery {
outChan := make(chan amqp.Delivery)
go func() {
Expand Down
8 changes: 4 additions & 4 deletions healthcheck_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func Example() {
// If we want to use a different log library instead of log/slog.Log, wrap the function instead.
// If call depth is being logged, add to it so it doesn't just print this log function.
// Here we use log instead of slog
customLog := func(ctx context.Context, level slog.Level, msg string, args ...any) {
log.Printf("[%s] trace_id=%v msg="+msg, append([]any{level, ctx.Value("your_embedded_trace_id")}, args...)...)
customLog := func(ctx context.Context, level slog.Level, msg string, _ ...any) {
log.Printf("[%s] trace_id=%v msg="+msg, level, ctx.Value("your_embedded_trace_id"))
}
commonCfg := rmq.CommonConfig{Log: customLog}
// Create an AMQP topology for our healthcheck, which uses a temporary exchange.
Expand All @@ -46,8 +46,8 @@ func Example() {
// danlock/rmq best practice is including your applications topology in your ConnectConfig
cfg := rmq.ConnectConfig{CommonConfig: commonCfg, Topology: topology}
// RabbitMQ best practice is to pub and sub on different AMQP connections to avoid TCP backpressure causing issues with message consumption.
pubRMQConn := rmq.ConnectWithURL(ctx, cfg, os.Getenv("TEST_AMQP_URI"))
subRMQConn := rmq.ConnectWithURL(ctx, cfg, os.Getenv("TEST_AMQP_URI"))
pubRMQConn := rmq.ConnectWithURLs(ctx, cfg, os.Getenv("TEST_AMQP_URI"))
subRMQConn := rmq.ConnectWithURLs(ctx, cfg, os.Getenv("TEST_AMQP_URI"))

// A rudimentary healthcheck of a rmq.Connection is to ensure it can get a Channel, but we can do better
_, err := subRMQConn.MustChannel(ctx)
Expand Down
36 changes: 18 additions & 18 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

// Topology contains all the exchange, queue and binding information needed for your application to use RabbitMQ.
type Topology struct {
CommonConfig

Exchanges []Exchange
ExchangeBindings []ExchangeBinding
Queues []Queue
QueueBindings []QueueBinding
}

// Exchange contains args for amqp.Channel.ExchangeDeclare
type Exchange struct {
Name string
Expand All @@ -47,7 +37,7 @@ type ExchangeBinding struct {
// DeclareTopology declares an AMQP topology once.
// If you want this to be redeclared automatically on connections, add your Topology to ConnectConfig instead.
func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topology) error {
logPrefix := fmt.Sprintf("rmq.DeclareTopology")
logPrefix := fmt.Sprintf("rmq.DeclareTopology for AMQPConnection (%s -> %s)", amqpConn.LocalAddr(), amqpConn.RemoteAddr())

if topology.empty() {
return nil
Expand Down Expand Up @@ -98,6 +88,23 @@ func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topo
}
}

// ImportJSONTopology reads in a Topology from a file. Useful for setting Exchanges, ExchangeBindings, Queues and QueueBindings from a config,
// although rabbitmqctl is probably a better candidate for this since it can also export your cuurrent RabbitMQ schema.
// Decoding files to structs is easy in Golang, so feel free to write your own as desired for XML, YAML, TOML or any other desired config format.
func ImportJSONTopology(topologyReader io.Reader) (top Topology, _ error) {
return top, json.NewDecoder(topologyReader).Decode(&top)
}

// Topology contains all the exchange, queue and binding information needed for your application to use RabbitMQ.
type Topology struct {
CommonConfig

Exchanges []Exchange
ExchangeBindings []ExchangeBinding
Queues []Queue
QueueBindings []QueueBinding
}

func (t *Topology) empty() bool {
return len(t.Exchanges) == 0 && len(t.Queues) == 0 &&
len(t.ExchangeBindings) == 0 && len(t.QueueBindings) == 0
Expand Down Expand Up @@ -162,10 +169,3 @@ func (t *Topology) declare(ctx context.Context, mqChan *amqp.Channel) (err error

return nil
}

// ImportJSONTopology reads in a Topology from a file. Useful for setting Exchanges, ExchangeBindings, Queues and QueueBindings from a config,
// although rabbitmqctl is probably a better candidate for this since it can also export your cuurrent RabbitMQ schema.
// Decoding files to structs is easy in Golang, so feel free to write your own as desired for XML, YAML, TOML or any other desired config format.
func ImportJSONTopology(topologyReader io.Reader) (top Topology, _ error) {
return top, json.NewDecoder(topologyReader).Decode(&top)
}

0 comments on commit 8299fdb

Please sign in to comment.