From 76a013b9d56ee04fb322c4881df842f9a8f8ce4f Mon Sep 17 00:00:00 2001 From: Danlock Date: Sat, 16 Nov 2024 21:18:43 -0500 Subject: [PATCH] added wagslane benchmark --- README.md | 12 +++- benchmark_int_test.go | 130 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 5 +- go.sum | 2 + 4 files changed, 147 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a65fa9e..7bffd4a 100644 --- a/README.md +++ b/README.md @@ -96,4 +96,14 @@ Here is an example logrus wrapper. danlock/rmq only uses the predefined slog.Lev logrus.StandardLogger().WithContext(ctx).Logf(logruslevel, msg) } } -``` \ No newline at end of file +``` + +# TODO +Benchmark against other Go amqp wrappers. Test if they implement timeouts or just block indefinitely on network errors. + +Candidates: +AMQP-091 +https://github.com/wagslane/go-rabbitmq +https://github.com/ThreeDotsLabs/watermill-amqp +AMQP-1.0 +https://github.com/Azure/go-amqp \ No newline at end of file diff --git a/benchmark_int_test.go b/benchmark_int_test.go index a0bbd96..da29001 100644 --- a/benchmark_int_test.go +++ b/benchmark_int_test.go @@ -13,7 +13,10 @@ import ( "time" "github.com/danlock/rmq" + "github.com/danlock/rmq/internal/test" amqp "github.com/rabbitmq/amqp091-go" + "github.com/wagslane/go-rabbitmq" + wagslane "github.com/wagslane/go-rabbitmq" ) const benchNumPubs = 1000 @@ -213,3 +216,130 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) { }) } } + +func BenchmarkPublishAndConsumeManyWagslane(b *testing.B) { + setupStart := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + randSuffix := fmt.Sprintf("%d.%p", time.Now().UnixNano(), b) + + queueName := "BenchmarkPublishAndConsumeManyWagslane" + randSuffix + + conn, err := wagslane.NewConn( + os.Getenv("TEST_AMQP_URI"), + wagslane.WithConnectionOptionsLogging, + ) + test.FailOnError(b, err) + defer conn.Close() + + consumer, err := wagslane.NewConsumer( + conn, + queueName, + wagslane.WithConsumerOptionsQueueArgs(wagslane.Table{ + amqp.QueueTTLArg: time.Minute.Milliseconds(), + }), + ) + test.FailOnError(b, err) + + defer consumer.Close() + + consumeChan := make(chan wagslane.Delivery) + go func() { + err = consumer.Run(func(d wagslane.Delivery) rabbitmq.Action { + consumeChan <- d + return rabbitmq.Manual + }) + test.FailOnError(b, err) + }() + + dot := []byte(".") + errChan := make(chan error) + + publishings := generatePublishings(1000, queueName) + + publisher, err := wagslane.NewPublisher( + conn, + wagslane.WithPublisherOptionsLogging, + wagslane.WithPublisherOptionsConfirm, + ) + test.FailOnError(b, err) + defer publisher.Close() + + b.Logf("setup took %s", time.Since(setupStart)) + cases := []struct { + name string + publishFunc func(b *testing.B) + }{ + { + "PublishBatchUntilAcked", + func(b *testing.B) { + // wagslane doesn't have a batch publisher, implement our own + toBeConfirmed := make([]wagslane.PublisherConfirmation, 0, len(publishings)) + for _, p := range publishings { + c, err := publisher.PublishWithDeferredConfirmWithContext( + ctx, + p.Body, + []string{p.RoutingKey}, + wagslane.WithPublishOptionsMandatory, + ) + test.FailOnError(b, err) + toBeConfirmed = append(toBeConfirmed, c) + } + + for i, confirms := range toBeConfirmed { + if len(confirms) != 1 { + b.Fatalf("wagslane published a single message to a single routing key and got %d confirms somehow?", len(confirms)) + } + acked, err := confirms[0].WaitContext(ctx) + test.FailOnError(b, err) + if !acked { + b.Errorf("wagslane published message %d got nacked", i) + } + } + }, + }, + } + + for _, bb := range cases { + b.Run(bb.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + go func(i int) (err error) { + received := make(map[uint64]struct{}, len(publishings)) + defer func() { errChan <- err }() + for { + select { + case msg := <-consumeChan: + rawIndex := bytes.Split(msg.Body, dot)[0] + index, err := strconv.ParseUint(string(rawIndex), 10, 64) + if err != nil { + return fmt.Errorf("strconv.ParseUint err %w", err) + } + received[index] = struct{}{} + if err := msg.Ack(false); err != nil { + return fmt.Errorf("msg.Ack err %w", err) + } + if len(received) == len(publishings) { + return nil + } + case <-ctx.Done(): + return fmt.Errorf("timed out after consuming %d publishings on bench run %d", len(received), i) + } + } + }(i) + + bb.publishFunc(b) + + select { + case <-ctx.Done(): + b.Fatalf("timed out on bench run %d", i) + case err := <-errChan: + if err != nil { + b.Fatalf("on bench run %d consumer err %v", i, err) + + } + } + } + }) + } +} diff --git a/go.mod b/go.mod index 4e1f1f3..ff32fa0 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/danlock/rmq go 1.21 -require github.com/rabbitmq/amqp091-go v1.10.0 +require ( + github.com/rabbitmq/amqp091-go v1.10.0 + github.com/wagslane/go-rabbitmq v0.14.2 +) diff --git a/go.sum b/go.sum index 024eebe..ebfcaf8 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/wagslane/go-rabbitmq v0.14.2 h1:3l75Unsy0b8sb3ILqJxMTXkQLUPI67BOuubV9YBjGLE= +github.com/wagslane/go-rabbitmq v0.14.2/go.mod h1:6sCLt2wZoxyC73G7u/yD6/RX/yYf+x5D8SQk8nsa4Lc= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=