diff --git a/benchmark_int_test.go b/benchmark_int_test.go index 554c748..a0bbd96 100644 --- a/benchmark_int_test.go +++ b/benchmark_int_test.go @@ -16,8 +16,10 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) +const benchNumPubs = 1000 + func generatePublishings(num int, routingKey string) []rmq.Publishing { - publishings := make([]rmq.Publishing, 100) + publishings := make([]rmq.Publishing, num) for i := range publishings { publishings[i] = rmq.Publishing{ RoutingKey: routingKey, @@ -61,19 +63,19 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) { LogReturns: true, }) - publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{ - Args: baseCfg, - LogReturns: true, - }), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{ - Args: baseCfg, - LogReturns: true, - }) + // publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{ + // Args: baseCfg, + // LogReturns: true, + // }), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{ + // Args: baseCfg, + // LogReturns: true, + // }) dot := []byte(".") errChan := make(chan error) consumeChan := consumer.Consume(ctx) - publishings := generatePublishings(10_000, queueName) + publishings := generatePublishings(benchNumPubs, queueName) cases := []struct { name string @@ -87,86 +89,86 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) { } }, }, - { - "PublishBatchUntilAcked into thirds", - func(b *testing.B) { - errChan := make(chan error) - publishers := []*rmq.Publisher{publisher, publisher, publisher} - for i := range publishers { - go func(i int) { - errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...) - }(i) - } - successes := 0 - for { - select { - case err := <-errChan: - if err != nil { - b.Fatalf("PublishBatchUntilAcked err %v", err) - } - successes++ - if successes == len(publishers) { - return - } - case <-ctx.Done(): - b.Fatalf("PublishBatchUntilAcked timed out") - } - } - }, - }, - { - "PublishBatchUntilAcked on three Publishers", - func(b *testing.B) { - errChan := make(chan error) - publishers := []*rmq.Publisher{publisher, publisher2, publisher3} - for i := range publishers { - go func(i int) { - errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...) - }(i) - } - successes := 0 - for { - select { - case err := <-errChan: - if err != nil { - b.Fatalf("PublishBatchUntilAcked err %v", err) - } - successes++ - if successes == len(publishers) { - return - } - case <-ctx.Done(): - b.Fatalf("PublishBatchUntilAcked timed out") - } - } - }, - }, - { - "Concurrent PublishUntilAcked", - func(b *testing.B) { - errChan := make(chan error) - for i := range publishings { - go func(i int) { - errChan <- publisher.PublishUntilAcked(ctx, 0, publishings[i]) - }(i) - } - successes := 0 - for { - select { - case err := <-errChan: - if err != nil { - b.Fatalf("PublishUntilAcked err %v", err) - } - successes++ - if successes == len(publishings) { - return - } - case <-ctx.Done(): - b.Fatalf("PublishUntilAcked timed out") - } - } - }, - }, + // { + // "PublishBatchUntilAcked into thirds", + // func(b *testing.B) { + // errChan := make(chan error) + // publishers := []*rmq.Publisher{publisher, publisher, publisher} + // for i := range publishers { + // go func(i int) { + // errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...) + // }(i) + // } + // successes := 0 + // for { + // select { + // case err := <-errChan: + // if err != nil { + // b.Fatalf("PublishBatchUntilAcked err %v", err) + // } + // successes++ + // if successes == len(publishers) { + // return + // } + // case <-ctx.Done(): + // b.Fatalf("PublishBatchUntilAcked timed out") + // } + // } + // }, + // }, + // { + // "PublishBatchUntilAcked on three Publishers", + // func(b *testing.B) { + // errChan := make(chan error) + // publishers := []*rmq.Publisher{publisher, publisher2, publisher3} + // for i := range publishers { + // go func(i int) { + // errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...) + // }(i) + // } + // successes := 0 + // for { + // select { + // case err := <-errChan: + // if err != nil { + // b.Fatalf("PublishBatchUntilAcked err %v", err) + // } + // successes++ + // if successes == len(publishers) { + // return + // } + // case <-ctx.Done(): + // b.Fatalf("PublishBatchUntilAcked timed out") + // } + // } + // }, + // }, + // { + // "Concurrent PublishUntilAcked", + // func(b *testing.B) { + // errChan := make(chan error) + // for i := range publishings { + // go func(i int) { + // errChan <- publisher.PublishUntilAcked(ctx, 0, publishings[i]) + // }(i) + // } + // successes := 0 + // for { + // select { + // case err := <-errChan: + // if err != nil { + // b.Fatalf("PublishUntilAcked err %v", err) + // } + // successes++ + // if successes == len(publishings) { + // return + // } + // case <-ctx.Done(): + // b.Fatalf("PublishUntilAcked timed out") + // } + // } + // }, + // }, } for _, bb := range cases { @@ -196,9 +198,8 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) { } }(i) - if err := publisher.PublishBatchUntilAcked(ctx, 0, publishings...); err != nil { - b.Fatalf("PublishBatchUntilAcked err %v", err) - } + bb.publishFunc(b) + select { case <-ctx.Done(): b.Fatalf("timed out on bench run %d", i)