Skip to content

Commit

Permalink
added wagslane benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
Danlock committed Nov 17, 2024
1 parent 0c29d93 commit 76a013b
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 2 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,14 @@ Here is an example logrus wrapper. danlock/rmq only uses the predefined slog.Lev
logrus.StandardLogger().WithContext(ctx).Logf(logruslevel, msg)
}
}
```
```

# TODO
Benchmark against other Go amqp wrappers. Test if they implement timeouts or just block indefinitely on network errors.

Candidates:
AMQP-091
https://github.com/wagslane/go-rabbitmq
https://github.com/ThreeDotsLabs/watermill-amqp
AMQP-1.0
https://github.com/Azure/go-amqp
130 changes: 130 additions & 0 deletions benchmark_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"time"

"github.com/danlock/rmq"
"github.com/danlock/rmq/internal/test"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq"
wagslane "github.com/wagslane/go-rabbitmq"
)

const benchNumPubs = 1000
Expand Down Expand Up @@ -213,3 +216,130 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) {
})
}
}

func BenchmarkPublishAndConsumeManyWagslane(b *testing.B) {
setupStart := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

randSuffix := fmt.Sprintf("%d.%p", time.Now().UnixNano(), b)

queueName := "BenchmarkPublishAndConsumeManyWagslane" + randSuffix

conn, err := wagslane.NewConn(
os.Getenv("TEST_AMQP_URI"),
wagslane.WithConnectionOptionsLogging,
)
test.FailOnError(b, err)
defer conn.Close()

consumer, err := wagslane.NewConsumer(
conn,
queueName,
wagslane.WithConsumerOptionsQueueArgs(wagslane.Table{
amqp.QueueTTLArg: time.Minute.Milliseconds(),
}),
)
test.FailOnError(b, err)

defer consumer.Close()

consumeChan := make(chan wagslane.Delivery)
go func() {
err = consumer.Run(func(d wagslane.Delivery) rabbitmq.Action {
consumeChan <- d
return rabbitmq.Manual
})
test.FailOnError(b, err)
}()

dot := []byte(".")
errChan := make(chan error)

publishings := generatePublishings(1000, queueName)

publisher, err := wagslane.NewPublisher(
conn,
wagslane.WithPublisherOptionsLogging,
wagslane.WithPublisherOptionsConfirm,
)
test.FailOnError(b, err)
defer publisher.Close()

b.Logf("setup took %s", time.Since(setupStart))
cases := []struct {
name string
publishFunc func(b *testing.B)
}{
{
"PublishBatchUntilAcked",
func(b *testing.B) {
// wagslane doesn't have a batch publisher, implement our own
toBeConfirmed := make([]wagslane.PublisherConfirmation, 0, len(publishings))
for _, p := range publishings {
c, err := publisher.PublishWithDeferredConfirmWithContext(
ctx,
p.Body,
[]string{p.RoutingKey},
wagslane.WithPublishOptionsMandatory,
)
test.FailOnError(b, err)
toBeConfirmed = append(toBeConfirmed, c)
}

for i, confirms := range toBeConfirmed {
if len(confirms) != 1 {
b.Fatalf("wagslane published a single message to a single routing key and got %d confirms somehow?", len(confirms))
}
acked, err := confirms[0].WaitContext(ctx)
test.FailOnError(b, err)
if !acked {
b.Errorf("wagslane published message %d got nacked", i)
}
}
},
},
}

for _, bb := range cases {
b.Run(bb.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
go func(i int) (err error) {
received := make(map[uint64]struct{}, len(publishings))
defer func() { errChan <- err }()
for {
select {
case msg := <-consumeChan:
rawIndex := bytes.Split(msg.Body, dot)[0]
index, err := strconv.ParseUint(string(rawIndex), 10, 64)
if err != nil {
return fmt.Errorf("strconv.ParseUint err %w", err)
}
received[index] = struct{}{}
if err := msg.Ack(false); err != nil {
return fmt.Errorf("msg.Ack err %w", err)
}
if len(received) == len(publishings) {
return nil
}
case <-ctx.Done():
return fmt.Errorf("timed out after consuming %d publishings on bench run %d", len(received), i)
}
}
}(i)

bb.publishFunc(b)

select {
case <-ctx.Done():
b.Fatalf("timed out on bench run %d", i)
case err := <-errChan:
if err != nil {
b.Fatalf("on bench run %d consumer err %v", i, err)

}
}
}
})
}
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/danlock/rmq

go 1.21

require github.com/rabbitmq/amqp091-go v1.10.0
require (
github.com/rabbitmq/amqp091-go v1.10.0
github.com/wagslane/go-rabbitmq v0.14.2
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/wagslane/go-rabbitmq v0.14.2 h1:3l75Unsy0b8sb3ILqJxMTXkQLUPI67BOuubV9YBjGLE=
github.com/wagslane/go-rabbitmq v0.14.2/go.mod h1:6sCLt2wZoxyC73G7u/yD6/RX/yYf+x5D8SQk8nsa4Lc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

0 comments on commit 76a013b

Please sign in to comment.