Skip to content

Commit

Permalink
added benchmarks, PublishBatchUntilAcked for publishing many messages…
Browse files Browse the repository at this point in the history
…, slight refactoring of Consumer to take the connection on constructor
  • Loading branch information
Danlock committed Sep 21, 2023
1 parent 3a7aee0 commit 7ef78d4
Show file tree
Hide file tree
Showing 7 changed files with 401 additions and 47 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ coverage-browser:

update-readme-badge:
@go tool cover -func=$(COVERAGE_PATH) -o=$(COVERAGE_PATH).badge
@go run github.com/AlexBeauchemin/[email protected] -filename=$(COVERAGE_PATH).badge
@go run github.com/AlexBeauchemin/[email protected] -filename=$(COVERAGE_PATH).badge

# pkg.go.dev documentation is updated via go get
update-proxy-cache:
@GOPROXY=https://proxy.golang.org go get github.com/danlock/rmq
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ This package attempts to provide a wrapper of useful features on top of amqp091,
Using an AMQP publisher to publish a message with at least once delivery.

```
ctx := context.TODO()
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
cfg := rmq.CommonConfig{Log: slog.Log}
rmqConn := rmq.ConnectWithURLs(ctx, rmq.ConnectConfig{CommonConfig: cfg}, os.Getenv("AMQP_URL_1"), os.Getenv("AMQP_URL_2"))
Expand All @@ -39,21 +40,21 @@ if err := rmqPub.PublishUntilAcked(ctx, time.Minute, msg); err != nil {
}
```

Using a reliable AMQP consumer that delivers messages through transient network failures while processing work concurrently with bounded goroutines.
Using a reliable AMQP consumer that receives deliveries through transient network failures while processing work concurrently with bounded goroutines.

```
ctx := context.TODO()
ctx, := context.TODO()
cfg := rmq.CommonConfig{Log: slog.Log}
rmqConn := rmq.ConnectWithURL(ctx, rmq.ConnectConfig{CommonConfig: cfg}, os.Getenv("AMQP_URL"))
rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmq.ConnectConfig{CommonConfig: cfg}, os.Getenv("AMQP_URL"), amqp.Config{})
consCfg := rmq.ConsumerConfig{
CommonConfig: cfg,
Queue: rmq.Queue{Name: "q2d2", AutoDelete: true},
Qos: rmq.Qos{PrefetchCount: 100},
Qos: rmq.Qos{PrefetchCount: 1000},
}
rmq.NewConsumer(consCfg).ConsumeConcurrently(ctx, rmqConn, 50, func(ctx context.Context, msg amqp.Delivery) {
rmq.NewConsumer(rmqConn, consCfg).ConsumeConcurrently(ctx, 100, func(ctx context.Context, msg amqp.Delivery) {
process(msg)
if err := msg.Ack(false); err != nil {
handleErr(err)
Expand All @@ -74,7 +75,7 @@ Here is an example logrus wrapper. danlock/rmq only uses the predefined slog.Lev
PublisherConfig{
Log: func(ctx context.Context, level slog.Level, msg string, args ...any) {
logruslevel, _ := logrus.ParseLevel(level.String())
logrus.StandardLogger().WithContext(ctx).Logf(logruslevel, msg, args...)
logrus.StandardLogger().WithContext(ctx).Logf(logruslevel, msg)
}
}
```
215 changes: 215 additions & 0 deletions benchmark_int_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
//go:build rabbit

package rmq_test

import (
"bytes"
"context"
"fmt"
"log/slog"
"os"
"strconv"
"testing"
"time"

"github.com/danlock/rmq"
amqp "github.com/rabbitmq/amqp091-go"
)

func generatePublishings(num int, routingKey string) []rmq.Publishing {
publishings := make([]rmq.Publishing, 100)
for i := range publishings {
publishings[i] = rmq.Publishing{
RoutingKey: routingKey,
Mandatory: true,
Publishing: amqp.Publishing{
Body: []byte(fmt.Sprintf("%d.%d", i, time.Now().UnixNano())),
},
}
}
return publishings
}

func BenchmarkPublishAndConsumeMany(b *testing.B) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

randSuffix := fmt.Sprintf("%d.%p", time.Now().UnixNano(), b)

queueName := "BenchmarkPublishAndConsumeMany" + randSuffix
baseCfg := rmq.CommonConfig{Log: slog.Log}
topology := rmq.Topology{
CommonConfig: baseCfg,
Queues: []rmq.Queue{{
Name: queueName,
Args: amqp.Table{
amqp.QueueTTLArg: time.Minute.Milliseconds(),
},
}},
}

subRMQConn := rmq.ConnectWithURL(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))
pubRMQConn := rmq.ConnectWithURL(ctx, rmq.ConnectConfig{CommonConfig: baseCfg, Topology: topology}, os.Getenv("TEST_AMQP_URI"))

consumer := rmq.NewConsumer(subRMQConn, rmq.ConsumerConfig{
CommonConfig: baseCfg,
Queue: topology.Queues[0],
})

publisher := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{
CommonConfig: baseCfg,
LogReturns: true,
})

publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{
CommonConfig: baseCfg,
LogReturns: true,
}), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherConfig{
CommonConfig: baseCfg,
LogReturns: true,
})

dot := []byte(".")
errChan := make(chan error)
consumeChan := consumer.Consume(ctx)

publishings := generatePublishings(10000, queueName)

cases := []struct {
name string
publishFunc func(b *testing.B)
}{
{
"PublishBatchUntilAcked",
func(b *testing.B) {
if err := publisher.PublishBatchUntilAcked(ctx, 0, publishings...); err != nil {
b.Fatalf("PublishBatchUntilAcked err %v", err)
}
},
},
{
"PublishBatchUntilAcked into thirds",
func(b *testing.B) {
errChan := make(chan error)
publishers := []*rmq.Publisher{publisher, publisher, publisher}
for i := range publishers {
go func(i int) {
errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...)
}(i)
}
successes := 0
for {
select {
case err := <-errChan:
if err != nil {
b.Fatalf("PublishBatchUntilAcked err %v", err)
}
successes++
if successes == len(publishers) {
return
}
case <-ctx.Done():
b.Fatalf("PublishBatchUntilAcked timed out")
}
}
},
},
{
"PublishBatchUntilAcked on three Publishers",
func(b *testing.B) {
errChan := make(chan error)
publishers := []*rmq.Publisher{publisher, publisher2, publisher3}
for i := range publishers {
go func(i int) {
errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...)
}(i)
}
successes := 0
for {
select {
case err := <-errChan:
if err != nil {
b.Fatalf("PublishBatchUntilAcked err %v", err)
}
successes++
if successes == len(publishers) {
return
}
case <-ctx.Done():
b.Fatalf("PublishBatchUntilAcked timed out")
}
}
},
},
{
"Concurrent PublishUntilAcked",
func(b *testing.B) {
errChan := make(chan error)
for i := range publishings {
go func(i int) {
errChan <- publisher.PublishUntilAcked(ctx, 0, publishings[i])
}(i)
}
successes := 0
for {
select {
case err := <-errChan:
if err != nil {
b.Fatalf("PublishUntilAcked err %v", err)
}
successes++
if successes == len(publishings) {
return
}
case <-ctx.Done():
b.Fatalf("PublishUntilAcked timed out")
}
}
},
},
}

for _, bb := range cases {
b.Run(bb.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
go func(i int) (err error) {
received := make(map[uint64]struct{}, len(publishings))
defer func() { errChan <- err }()
for {
select {
case msg := <-consumeChan:
rawIndex := bytes.Split(msg.Body, dot)[0]
index, err := strconv.ParseUint(string(rawIndex), 10, 64)
if err != nil {
return fmt.Errorf("strconv.ParseUint err %w", err)
}
received[index] = struct{}{}
if err := msg.Ack(false); err != nil {
return fmt.Errorf("msg.Ack err %w", err)
}
if len(received) == len(publishings) {
return nil
}
case <-ctx.Done():
return fmt.Errorf("timed out after consuming %d publishings on bench run %d", len(received), i)
}
}
}(i)

if err := publisher.PublishBatchUntilAcked(ctx, 0, publishings...); err != nil {
b.Fatalf("PublishBatchUntilAcked err %v", err)
}
select {
case <-ctx.Done():
b.Fatalf("timed out on bench run %d", i)
case err := <-errChan:
if err != nil {
b.Fatalf("on bench run %d consumer err %v", i, err)

}
}
}
})
}

}
27 changes: 14 additions & 13 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ConsumerConfig struct {
CommonConfig

Queue Queue
QueueBindings []QueueBinding // Should only be used for anonymous queues, otherwise QueueBinding's be declared with DeclareTopology
QueueBindings []QueueBinding // Only needed for anonymous queues since Consumer's do not return the generated RabbitMQ queue name
Consume Consume
Qos Qos
}
Expand Down Expand Up @@ -61,24 +61,25 @@ type Qos struct {
// Consumer enables reliable AMQP Queue consumption.
type Consumer struct {
config ConsumerConfig
conn *Connection
}

// NewConsumer takes in a ConsumerConfig that describes the AMQP topology of a single queue,
// and returns a rmq.Consumer that can redeclare this topology on any errors during queue consumption.
// This enables robust reconnections even on unreliable networks.
func NewConsumer(config ConsumerConfig) *Consumer {
func NewConsumer(rmqConn *Connection, config ConsumerConfig) *Consumer {
config.setDefaults()
return &Consumer{config: config}
return &Consumer{config: config, conn: rmqConn}
}

// safeDeclareAndConsume safely declares and consumes from an amqp.Queue
// Closes the amqp.Channel on errors.
func (c *Consumer) safeDeclareAndConsume(ctx context.Context, rmqConn *Connection) (_ *amqp.Channel, _ <-chan amqp.Delivery, err error) {
func (c *Consumer) safeDeclareAndConsume(ctx context.Context) (_ *amqp.Channel, _ <-chan amqp.Delivery, err error) {
logPrefix := fmt.Sprintf("rmq.Consumer.safeDeclareAndConsume for queue %s", c.config.Queue.Name)
ctx, cancel := context.WithTimeout(ctx, c.config.AMQPTimeout)
defer cancel()

mqChan, err := rmqConn.Channel(ctx)
mqChan, err := c.conn.Channel(ctx)
if err != nil {
return nil, nil, fmt.Errorf(logPrefix+" failed to get a channel due to err %w", err)
}
Expand Down Expand Up @@ -172,7 +173,7 @@ func (c *Consumer) declareAndConsume(ctx context.Context, mqChan *amqp.Channel)
// On errors Consume reconnects to AMQP, redeclares and resumes consumption and forwarding of deliveries.
// Consume returns an unbuffered channel, and will block on sending to it if no ones listening.
// The returned channel is closed only after the context finishes.
func (c *Consumer) Consume(ctx context.Context, rmqConn *Connection) <-chan amqp.Delivery {
func (c *Consumer) Consume(ctx context.Context) <-chan amqp.Delivery {
outChan := make(chan amqp.Delivery)
go func() {
logPrefix := fmt.Sprintf("rmq.Consumer.Consume for queue (%s)", c.config.Queue.Name)
Expand All @@ -185,7 +186,7 @@ func (c *Consumer) Consume(ctx context.Context, rmqConn *Connection) <-chan amqp
return
case <-time.After(delay):
}
mqChan, inChan, err := c.safeDeclareAndConsume(ctx, rmqConn)
mqChan, inChan, err := c.safeDeclareAndConsume(ctx)
if err != nil {
delay = c.config.Delay(attempt)
attempt++
Expand All @@ -202,7 +203,7 @@ func (c *Consumer) Consume(ctx context.Context, rmqConn *Connection) <-chan amqp
return outChan
}

// forwardDeliveries forwards from inChan until it closes. If the context finishes it closes the amqp Channel so that the delivery channel will close eventually.
// forwardDeliveries forwards from inChan until it closes. If the context finishes it closes the amqp Channel so that the delivery channel will close after sending it's deliveries.
func (c *Consumer) forwardDeliveries(ctx context.Context, mqChan *amqp.Channel, inChan <-chan amqp.Delivery, outChan chan<- amqp.Delivery) {
logPrefix := fmt.Sprintf("rmq.Consumer.forwardDeliveries for queue (%s)", c.config.Queue.Name)
closeNotifier := mqChan.NotifyClose(make(chan *amqp.Error, 6))
Expand Down Expand Up @@ -233,18 +234,18 @@ func (c *Consumer) forwardDeliveries(ctx context.Context, mqChan *amqp.Channel,
}

// ConsumeConcurrently simply runs the provided deliveryProcessor on each delivery from Consume in a new goroutine.
// maxGoroutines limits the amounts of goroutines spawned and defaults to 2000.
// maxGoroutines limits the amounts of goroutines spawned and defaults to 500.
// Qos.PrefetchCount can also limit goroutines spawned if deliveryProcessor properly Acks messages.
// Blocks until the context is finished and the Consume channel closes.
func (c *Consumer) ConsumeConcurrently(ctx context.Context, rmqConn *Connection, maxGoroutines uint64, deliveryProcessor func(ctx context.Context, msg amqp.Delivery)) {
func (c *Consumer) ConsumeConcurrently(ctx context.Context, maxGoroutines uint64, deliveryProcessor func(ctx context.Context, msg amqp.Delivery)) {
if maxGoroutines == 0 {
maxGoroutines = 2000
maxGoroutines = 500
}
// We use a simple semaphore here and a new goroutine each time.
// It may be more efficient to use a goroutine pool, but a concerned caller can probably do it better themselves.
// It may be more efficient to use a goroutine pool for small amounts of work, but a concerned caller can probably do it better themselves.
semaphore := make(chan struct{}, maxGoroutines)
deliverAndReleaseSemaphore := func(msg amqp.Delivery) { deliveryProcessor(ctx, msg); <-semaphore }
for msg := range c.Consume(ctx, rmqConn) {
for msg := range c.Consume(ctx) {
semaphore <- struct{}{}
go deliverAndReleaseSemaphore(msg)
}
Expand Down
Loading

0 comments on commit 7ef78d4

Please sign in to comment.