Skip to content

Commit

Permalink
updated benchmark to only use the function that actually works. this …
Browse files Browse the repository at this point in the history
…branch will only contain benchmarks for danlock/rmq to keep dependencies light, even if they would only be test dependencies anyway
  • Loading branch information
Danlock committed Nov 17, 2024
1 parent fde730d commit 0c29d93
Showing 1 changed file with 93 additions and 92 deletions.
185 changes: 93 additions & 92 deletions benchmark_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

const benchNumPubs = 1000

func generatePublishings(num int, routingKey string) []rmq.Publishing {
publishings := make([]rmq.Publishing, 100)
publishings := make([]rmq.Publishing, num)
for i := range publishings {
publishings[i] = rmq.Publishing{
RoutingKey: routingKey,
Expand Down Expand Up @@ -61,19 +63,19 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) {
LogReturns: true,
})

publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{
Args: baseCfg,
LogReturns: true,
}), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{
Args: baseCfg,
LogReturns: true,
})
// publisher2, publisher3 := rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{
// Args: baseCfg,
// LogReturns: true,
// }), rmq.NewPublisher(ctx, pubRMQConn, rmq.PublisherArgs{
// Args: baseCfg,
// LogReturns: true,
// })

dot := []byte(".")
errChan := make(chan error)
consumeChan := consumer.Consume(ctx)

publishings := generatePublishings(10_000, queueName)
publishings := generatePublishings(benchNumPubs, queueName)

cases := []struct {
name string
Expand All @@ -87,86 +89,86 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) {
}
},
},
{
"PublishBatchUntilAcked into thirds",
func(b *testing.B) {
errChan := make(chan error)
publishers := []*rmq.Publisher{publisher, publisher, publisher}
for i := range publishers {
go func(i int) {
errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...)
}(i)
}
successes := 0
for {
select {
case err := <-errChan:
if err != nil {
b.Fatalf("PublishBatchUntilAcked err %v", err)
}
successes++
if successes == len(publishers) {
return
}
case <-ctx.Done():
b.Fatalf("PublishBatchUntilAcked timed out")
}
}
},
},
{
"PublishBatchUntilAcked on three Publishers",
func(b *testing.B) {
errChan := make(chan error)
publishers := []*rmq.Publisher{publisher, publisher2, publisher3}
for i := range publishers {
go func(i int) {
errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...)
}(i)
}
successes := 0
for {
select {
case err := <-errChan:
if err != nil {
b.Fatalf("PublishBatchUntilAcked err %v", err)
}
successes++
if successes == len(publishers) {
return
}
case <-ctx.Done():
b.Fatalf("PublishBatchUntilAcked timed out")
}
}
},
},
{
"Concurrent PublishUntilAcked",
func(b *testing.B) {
errChan := make(chan error)
for i := range publishings {
go func(i int) {
errChan <- publisher.PublishUntilAcked(ctx, 0, publishings[i])
}(i)
}
successes := 0
for {
select {
case err := <-errChan:
if err != nil {
b.Fatalf("PublishUntilAcked err %v", err)
}
successes++
if successes == len(publishings) {
return
}
case <-ctx.Done():
b.Fatalf("PublishUntilAcked timed out")
}
}
},
},
// {
// "PublishBatchUntilAcked into thirds",
// func(b *testing.B) {
// errChan := make(chan error)
// publishers := []*rmq.Publisher{publisher, publisher, publisher}
// for i := range publishers {
// go func(i int) {
// errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...)
// }(i)
// }
// successes := 0
// for {
// select {
// case err := <-errChan:
// if err != nil {
// b.Fatalf("PublishBatchUntilAcked err %v", err)
// }
// successes++
// if successes == len(publishers) {
// return
// }
// case <-ctx.Done():
// b.Fatalf("PublishBatchUntilAcked timed out")
// }
// }
// },
// },
// {
// "PublishBatchUntilAcked on three Publishers",
// func(b *testing.B) {
// errChan := make(chan error)
// publishers := []*rmq.Publisher{publisher, publisher2, publisher3}
// for i := range publishers {
// go func(i int) {
// errChan <- publishers[i].PublishBatchUntilAcked(ctx, 0, publishings[i:i+1]...)
// }(i)
// }
// successes := 0
// for {
// select {
// case err := <-errChan:
// if err != nil {
// b.Fatalf("PublishBatchUntilAcked err %v", err)
// }
// successes++
// if successes == len(publishers) {
// return
// }
// case <-ctx.Done():
// b.Fatalf("PublishBatchUntilAcked timed out")
// }
// }
// },
// },
// {
// "Concurrent PublishUntilAcked",
// func(b *testing.B) {
// errChan := make(chan error)
// for i := range publishings {
// go func(i int) {
// errChan <- publisher.PublishUntilAcked(ctx, 0, publishings[i])
// }(i)
// }
// successes := 0
// for {
// select {
// case err := <-errChan:
// if err != nil {
// b.Fatalf("PublishUntilAcked err %v", err)
// }
// successes++
// if successes == len(publishings) {
// return
// }
// case <-ctx.Done():
// b.Fatalf("PublishUntilAcked timed out")
// }
// }
// },
// },
}

for _, bb := range cases {
Expand Down Expand Up @@ -196,9 +198,8 @@ func BenchmarkPublishAndConsumeMany(b *testing.B) {
}
}(i)

if err := publisher.PublishBatchUntilAcked(ctx, 0, publishings...); err != nil {
b.Fatalf("PublishBatchUntilAcked err %v", err)
}
bb.publishFunc(b)

select {
case <-ctx.Done():
b.Fatalf("timed out on bench run %d", i)
Expand Down

0 comments on commit 0c29d93

Please sign in to comment.