Skip to content

Commit

Permalink
changed Config to Args since amqp091-go uses Config already. Also mak…
Browse files Browse the repository at this point in the history
…es rmq sound more like pirate. Release time!
  • Loading branch information
Danlock committed Sep 23, 2023
1 parent 8299fdb commit 4b49628
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 126 deletions.
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#! /usr/bin/make
SHELL = /bin/bash
BUILDTIME = $(shell date -u --rfc-3339=seconds)
GITHASH = $(shell git describe --dirty --always --tags)
GITCOMMITNO = $(shell git rev-list --all --count)
SHORTBUILDTAG = $(GITCOMMITNO).$(GITHASH)
SHORTBUILDTAG = v0.0.$(GITCOMMITNO)-$(GITHASH)
BUILDINFO = Build Time:$(BUILDTIME)
LDFLAGS = -X 'main.buildTag=$(SHORTBUILDTAG)' -X 'main.buildInfo=$(BUILDINFO)'

Expand Down Expand Up @@ -48,3 +49,14 @@ update-readme-badge:
# pkg.go.dev documentation is updated via go get
update-proxy-cache:
@GOPROXY=https://proxy.golang.org go get github.com/danlock/rmq

release:
ifeq ($(findstring dirty,$(SHORTBUILDTAG)),dirty)
@echo "Version $(SHORTBUILDTAG) is filthy, commit to clean it" && exit 1
endif
@read -t 5 -p "$(SHORTBUILDTAG) will be the new released version. Hit enter to proceed, CTRL-C to cancel."
@$(MAKE) deps
@$(MAKE) test
@$(MAKE) bench
@git tag $(SHORTBUILDTAG)
@git push origin $(SHORTBUILDTAG)
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# rmq
![Coverage](https://img.shields.io/badge/Coverage-86.1%25-brightgreen)
![Coverage](https://img.shields.io/badge/Coverage-85.6%25-brightgreen)
[![Go Report Card](https://goreportcard.com/badge/github.com/danlock/rmq)](https://goreportcard.com/report/github.com/danlock/rmq)
[![Go Reference](https://pkg.go.dev/badge/github.com/danlock/rmq.svg)](https://pkg.go.dev/github.com/danlock/rmq)

Expand All @@ -26,11 +26,11 @@ Using an AMQP publisher to publish a message with at least once delivery.
```
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
cfg := rmq.CommonConfig{Log: slog.Log}
cfg := rmq.Args{Log: slog.Log}
rmqConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: cfg}, os.Getenv("AMQP_URL_1"), os.Getenv("AMQP_URL_2"))
rmqConn := rmq.ConnectWithURLs(ctx, rmq.ConnectArgs{Args: cfg}, os.Getenv("AMQP_URL_1"), os.Getenv("AMQP_URL_2"))
rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{CommonConfig: cfg})
rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherArgs{Args: cfg})
msg := rmq.Publishing{Exchange: "amq.topic", RoutingKey: "somewhere", Mandatory: true}
msg.Body = []byte(`{"life": 42}`)
Expand All @@ -44,12 +44,12 @@ Using a reliable AMQP consumer that receives deliveries through transient networ

```
ctx, := context.TODO()
cfg := rmq.CommonConfig{Log: slog.Log}
cfg := rmq.Args{Log: slog.Log}
rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectConfig{CommonConfig: cfg}, os.Getenv("AMQP_URL"), amqp.Config{})
rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectArgs{Args: cfg}, os.Getenv("AMQP_URL"), amqp.Config{})
consCfg := rmq.ConsumerConfig{
CommonConfig: cfg,
consCfg := rmq.ConsumerArgs{
Args: cfg,
Queue: rmq.Queue{Name: "q2d2", AutoDelete: true},
Qos: rmq.Qos{PrefetchCount: 1000},
}
Expand All @@ -72,7 +72,7 @@ All classes accept a Log function pointer that can be ignored entirely, set easi

Here is an example logrus wrapper. danlock/rmq only uses the predefined slog.Level's, and doesn't send any args.
```
CommonConfig{
Args{
Log: func(ctx context.Context, level slog.Level, msg string, _ ...any) {
logruslevel, _ := logrus.ParseLevel(level.String())
logrus.StandardLogger().WithContext(ctx).Logf(logruslevel, msg)
Expand Down
34 changes: 17 additions & 17 deletions benchmark_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) {
randSuffix := fmt.Sprintf("%d.%p", time.Now().UnixNano(), b)

queueName := "BenchmarkPublishAndConsumeMany" + randSuffix
baseCfg := rmq.CommonConfig{Log: slog.Log}
baseCfg := rmq.Args{Log: slog.Log}
topology := rmq.Topology{
CommonConfig: baseCfg,
Args: baseCfg,
Queues: []rmq.Queue{{
Name: queueName,
Args: amqp.Table{
Expand All @@ -48,32 +48,32 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) {
}},
}

subRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))
pubRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))
subRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectArgs{Args: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))
pubRMQConn := rmq.ConnectWithURLs(ctx, rmq.ConnectArgs{Args: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))

consumer := rmq.NewConsumer(subRMQConn, rmq.ConsumerConfig{
CommonConfig: baseCfg,
Queue: topology.Queues[0],
consumer := rmq.NewConsumer(subRMQConn, rmq.ConsumerArgs{
Args: baseCfg,
Queue: topology.Queues[0],
})

publisher := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{
CommonConfig: baseCfg,
LogReturns: true,
publisher := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{
Args: baseCfg,
LogReturns: true,
})

publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{
CommonConfig: baseCfg,
LogReturns: true,
}), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{
CommonConfig: baseCfg,
LogReturns: true,
publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{
Args: baseCfg,
LogReturns: true,
}), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{
Args: baseCfg,
LogReturns: true,
})

dot := []byte(".")
errChan := make(chan error)
consumeChan := consumer.Consume(ctx)

publishings := generatePublishings(10000, queueName)
publishings := generatePublishings(10_000, queueName)

cases := []struct {
name string
Expand Down
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/danlock/rmq/internal"
)

// CommonConfig contains options shared by danlock/rmq classes.
type CommonConfig struct {
// Args contains common options shared by danlock/rmq classes.
type Args struct {
// AMQPTimeout sets a timeout on AMQP operations. Defaults to 1 minute.
AMQPTimeout time.Duration
// Delay returns the delay between retry attempts. Defaults to FibonacciDelay.
Expand All @@ -18,7 +18,7 @@ type CommonConfig struct {
Log func(ctx context.Context, level slog.Level, msg string, args ...any)
}

func (cfg *CommonConfig) setDefaults() {
func (cfg *Args) setDefaults() {
if cfg.AMQPTimeout == 0 {
cfg.AMQPTimeout = time.Minute
}
Expand Down
18 changes: 9 additions & 9 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ func TestConnect(t *testing.T) {
defer func() {
errs <- recover()
}()
_ = Connect(nil, ConnectConfig{}, nil)
_ = Connect(nil, ConnectArgs{}, nil)
}()
result := <-errs
if result == nil {
t.Fatalf("Connect should panic when missing required arguments")
}
rmqConn := Connect(context.Background(), ConnectConfig{}, func() (AMQPConnection, error) { return nil, fmt.Errorf("sike") })
rmqConn := Connect(context.Background(), ConnectArgs{}, func() (AMQPConnection, error) { return nil, fmt.Errorf("sike") })
if rmqConn == nil {
t.Fatalf("Connect failed to return a rmq.Connection")
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestConnection_CurrentConnection(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rmqConn := Connect(tt.connCtx, ConnectConfig{}, tt.connDialFn)
rmqConn := Connect(tt.connCtx, ConnectArgs{}, tt.connDialFn)
got, err := rmqConn.CurrentConnection(tt.reqCtx)
if !errors.Is(err, tt.wantErr) {
t.Fatalf("rmq.Connection.CurrentConnection() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -120,9 +120,9 @@ func TestConnection_Channel(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
baseCfg := CommonConfig{Log: slog.Log}
connConf := ConnectConfig{
CommonConfig: baseCfg,
baseCfg := Args{Log: slog.Log}
connConf := ConnectArgs{
Args: baseCfg,
}
goodRMQConn := Connect(ctx, connConf, func() (AMQPConnection, error) {
return goodMockAMQP, nil
Expand All @@ -133,7 +133,7 @@ func TestConnection_Channel(t *testing.T) {
slowRMQConn := Connect(ctx, connConf, func() (AMQPConnection, error) {
return slowMockAMQP, nil
})
slowConfig := ConnectConfig{CommonConfig: CommonConfig{Log: slog.Log, AMQPTimeout: 50 * time.Millisecond}}
slowConfig := ConnectArgs{Args: Args{Log: slog.Log, AMQPTimeout: 50 * time.Millisecond}}
slowUsingTimeoutRMQConn := Connect(ctx, slowConfig, func() (AMQPConnection, error) {
return slowMockAMQP, nil
})
Expand Down Expand Up @@ -180,8 +180,8 @@ func TestConnection_Channel(t *testing.T) {
})
}

connConf = ConnectConfig{
CommonConfig: CommonConfig{
connConf = ConnectArgs{
Args: Args{
Log: connConf.Log,
Delay: func(attempt int) time.Duration { return time.Duration(attempt) * time.Millisecond },
},
Expand Down
12 changes: 6 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ type AMQPConnection interface {
IsClosed() bool
}

type ConnectConfig struct {
CommonConfig
type ConnectArgs struct {
Args
// Topology will be declared each connection to mitigate downed RabbitMQ nodes. Recommended to set, but not required.
Topology Topology
// DisableAMQP091Logs ensures the Connection's logs will not include rabbitmq/amqp091-go log's.
// Note only the first Connection created will include rabbitmq/amqp091-go logs, due to rabbitmq/amqp091-go using a single global logger.
DisableAMQP091Logs bool
}

func ConnectWithURLs(ctx context.Context, conf ConnectConfig, amqpURLs ...string) *Connection {
func ConnectWithURLs(ctx context.Context, conf ConnectArgs, amqpURLs ...string) *Connection {
if len(amqpURLs) == 0 {
panic("ConnectWithURLs needs amqpURLs!")
}
Expand All @@ -49,7 +49,7 @@ func ConnectWithURLs(ctx context.Context, conf ConnectConfig, amqpURLs ...string
})
}

func ConnectWithAMQPConfig(ctx context.Context, conf ConnectConfig, amqpURL string, amqpConf amqp.Config) *Connection {
func ConnectWithAMQPConfig(ctx context.Context, conf ConnectArgs, amqpURL string, amqpConf amqp.Config) *Connection {
return Connect(ctx, conf, func() (AMQPConnection, error) {
return amqp.DialConfig(amqpURL, amqpConf)
})
Expand All @@ -61,7 +61,7 @@ var setAMQP091Logger sync.Once
// Each Channel() call triggers rmq.Connection to return an amqp.Channel from it's CurrentConnection() or redial with the provided dialFn for a new AMQP Connection.
// ConnectWith* functions provide a few simple dialFn's for ease of use. They can be a simple wrapper around an amqp.Dial or much more complicated.
// If you want to ensure the Connection is working, call MustChannel with a timeout.
func Connect(ctx context.Context, conf ConnectConfig, dialFn func() (AMQPConnection, error)) *Connection {
func Connect(ctx context.Context, conf ConnectArgs, dialFn func() (AMQPConnection, error)) *Connection {
if dialFn == nil || ctx == nil {
panic("Connect requires a ctx and a dialFn")
}
Expand Down Expand Up @@ -119,7 +119,7 @@ type Connection struct {
chanReqChan chan internal.ChanReq[*amqp.Channel]
currentConReqChan chan internal.ChanReq[AMQPConnection]

config ConnectConfig
config ConnectArgs
}

// Channel requests an AMQP channel from the current AMQP Connection.
Expand Down
14 changes: 7 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

// ConsumerConfig contains information needed to declare and consume deliveries from a queue.
type ConsumerConfig struct {
CommonConfig
// ConsumerArgs contains information needed to declare and consume deliveries from a queue.
type ConsumerArgs struct {
Args

Queue Queue
QueueBindings []QueueBinding // Only needed for anonymous queues since Consumer's do not return the generated RabbitMQ queue name
QueueBindings []QueueBinding
Consume Consume
Qos Qos
}
Expand Down Expand Up @@ -60,14 +60,14 @@ type Qos struct {

// Consumer enables reliable AMQP Queue consumption.
type Consumer struct {
config ConsumerConfig
config ConsumerArgs
conn *Connection
}

// NewConsumer takes in a ConsumerConfig that describes the AMQP topology of a single queue,
// NewConsumer takes in a ConsumerArgs that describes the AMQP topology of a single queue,
// and returns a rmq.Consumer that can redeclare this topology on any errors during queue consumption.
// This enables robust reconnections even on unreliable networks.
func NewConsumer(rmqConn *Connection, config ConsumerConfig) *Consumer {
func NewConsumer(rmqConn *Connection, config ConsumerArgs) *Consumer {
config.setDefaults()
return &Consumer{config: config, conn: rmqConn}
}
Expand Down
Loading

0 comments on commit 4b49628

Please sign in to comment.