diff --git a/pkg/amqp10/publisher.go b/pkg/amqp10/publisher.go index 2ad9b56..ab314de 100644 --- a/pkg/amqp10/publisher.go +++ b/pkg/amqp10/publisher.go @@ -13,7 +13,6 @@ import ( "github.com/rabbitmq/omq/pkg/config" "github.com/rabbitmq/omq/pkg/log" "github.com/rabbitmq/omq/pkg/utils" - "golang.org/x/time/rate" "github.com/rabbitmq/omq/pkg/metrics" @@ -158,7 +157,7 @@ func (p *Amqp10Publisher) Start(ctx context.Context) { p.msg = utils.MessageBody(p.Config.Size) var err error - p.pool, err = ants.NewPool(p.Config.MaxInFlight, ants.WithExpiryDuration(time.Duration(10*time.Second)), ants.WithNonblocking(false)) + p.pool, err = utils.AntsPool(p.Config.MaxInFlight) if err != nil { log.Error("Can't initialize a pool for handling send receipts", "error", err) return @@ -166,38 +165,28 @@ func (p *Amqp10Publisher) Start(ctx context.Context) { defer p.pool.Release() log.Info("publisher started", "id", p.Id, "rate", utils.Rate(p.Config.Rate), "destination", p.Terminus) - switch p.Config.Rate { - case 0: - p.StartIdle(ctx) - default: - p.StartPublishing(ctx) + var farewell string + if p.Config.Rate == 0 { + // idle connection + <-ctx.Done() + farewell = "context cancelled" + } else { + farewell = p.StartPublishing(ctx) } + p.Stop(farewell) } -func (p *Amqp10Publisher) StartIdle(ctx context.Context) { - <-ctx.Done() -} - -func (p *Amqp10Publisher) StartPublishing(ctx context.Context) { - var limit rate.Limit - if p.Config.Rate == -1 { - limit = rate.Inf - } else { - limit = rate.Limit(p.Config.Rate) - } - // burst will probably need to be adjusted/dynamic, but for now it's fine - limiter := rate.NewLimiter(limit, 1) +func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string { + limiter := utils.RateLimiter(p.Config.Rate) var msgSent atomic.Int64 for { select { case <-ctx.Done(): - p.Stop("time limit reached") - return + return "context cancelled" default: if msgSent.Add(1) > int64(p.Config.PublishCount) { - p.Stop("--pmessages value reached") - return + return "--pmessages value reached" } if p.Config.Rate > 0 { _ = limiter.Wait(context.Background()) diff --git a/pkg/mqtt/publisher.go b/pkg/mqtt/publisher.go index 324b3ae..24d0d09 100644 --- a/pkg/mqtt/publisher.go +++ b/pkg/mqtt/publisher.go @@ -3,6 +3,7 @@ package mqtt import ( "context" "math/rand" + "sync/atomic" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -31,32 +32,6 @@ func NewMqttPublisher(cfg config.Config, id int) MqttPublisher { } } -func (p MqttPublisher) Start(ctx context.Context) { - // sleep random interval to avoid all publishers publishing at the same time - s := rand.Intn(1000) - time.Sleep(time.Duration(s) * time.Millisecond) - - defer func() { - if p.Connection != nil { - p.Connection.Disconnect(250) - } - }() - - p.Connect(ctx) - - p.msg = utils.MessageBody(p.Config.Size) - - switch p.Config.Rate { - case -1: - p.StartFullSpeed(ctx) - case 0: - p.StartIdle(ctx) - default: - p.StartRateLimited(ctx) - } - log.Debug("publisher stopped", "id", p.Id) -} - func (p *MqttPublisher) Connect(ctx context.Context) { var token mqtt.Token @@ -102,42 +77,50 @@ func (p MqttPublisher) connectionOptions() *mqtt.ClientOptions { return opts } -func (p MqttPublisher) StartFullSpeed(ctx context.Context) { - log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic) +func (p MqttPublisher) Start(ctx context.Context) { + // sleep random interval to avoid all publishers publishing at the same time + s := rand.Intn(1000) + time.Sleep(time.Duration(s) * time.Millisecond) - for msgSent := 0; msgSent < p.Config.PublishCount; msgSent++ { - select { - case <-ctx.Done(): - return - default: - p.Send() + defer func() { + if p.Connection != nil { + p.Connection.Disconnect(250) } - } -} + }() -func (p MqttPublisher) StartIdle(ctx context.Context) { - log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic) + p.Connect(ctx) - <-ctx.Done() -} + p.msg = utils.MessageBody(p.Config.Size) -func (p MqttPublisher) StartRateLimited(ctx context.Context) { log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic) - ticker := utils.RateTicker(p.Config.Rate) - msgSent := 0 + var farewell string + if p.Config.Rate == 0 { + // idle connection + <-ctx.Done() + farewell = "context cancelled" + } else { + farewell = p.StartPublishing(ctx) + } + p.Stop(farewell) +} + +func (p MqttPublisher) StartPublishing(ctx context.Context) string { + limiter := utils.RateLimiter(p.Config.Rate) + + var msgSent atomic.Int64 for { select { case <-ctx.Done(): - p.Stop("time limit reached") - return - case <-ticker.C: - p.Send() - msgSent++ - if msgSent >= p.Config.PublishCount { - p.Stop("--pmessages value reached") - return + return "time limit reached" + default: + if msgSent.Add(1) > int64(p.Config.PublishCount) { + return "--pmessages value reached" } + if p.Config.Rate > 0 { + _ = limiter.Wait(context.Background()) + } + p.Send() } } } diff --git a/pkg/mqtt/publisher_v5.go b/pkg/mqtt/publisher_v5.go index 7f83cfa..b77295a 100644 --- a/pkg/mqtt/publisher_v5.go +++ b/pkg/mqtt/publisher_v5.go @@ -3,6 +3,7 @@ package mqtt import ( "context" "math/rand" + "sync/atomic" "time" "github.com/eclipse/paho.golang/autopaho" @@ -32,32 +33,6 @@ func NewMqtt5Publisher(cfg config.Config, id int) Mqtt5Publisher { } } -func (p Mqtt5Publisher) Start(ctx context.Context) { - // sleep random interval to avoid all publishers publishing at the same time - s := rand.Intn(1000) - time.Sleep(time.Duration(s) * time.Millisecond) - - defer p.Stop("shutting down") - - p.Connect(ctx) - - p.msg = utils.MessageBody(p.Config.Size) - - switch p.Config.Rate { - case -1: - p.StartFullSpeed(ctx) - case 0: - p.StartIdle(ctx) - default: - p.StartRateLimited(ctx) - } - // TODO it seems that sometimes if we stop quickly after sending - // a message, this message is not delivered, even though Publish - // is supposed to by synchronous; to be investigated - time.Sleep(500 * time.Millisecond) - log.Debug("publisher stopped", "id", p.Id) -} - func (p *Mqtt5Publisher) Connect(ctx context.Context) { opts := p.connectionOptions() connection, err := autopaho.NewConnection(ctx, opts) @@ -101,42 +76,50 @@ func (p Mqtt5Publisher) connectionOptions() autopaho.ClientConfig { return opts } -func (p Mqtt5Publisher) StartFullSpeed(ctx context.Context) { - log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic) +func (p Mqtt5Publisher) Start(ctx context.Context) { + // sleep random interval to avoid all publishers publishing at the same time + s := rand.Intn(1000) + time.Sleep(time.Duration(s) * time.Millisecond) - for msgSent := 0; msgSent < p.Config.PublishCount; msgSent++ { - select { - case <-ctx.Done(): - return - default: - p.Send() - } - } -} + defer p.Stop("shutting down") -func (p Mqtt5Publisher) StartIdle(ctx context.Context) { - log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic) + p.Connect(ctx) - <-ctx.Done() -} + p.msg = utils.MessageBody(p.Config.Size) -func (p Mqtt5Publisher) StartRateLimited(ctx context.Context) { log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic) - ticker := utils.RateTicker(p.Config.Rate) - msgSent := 0 + var farewell string + if p.Config.Rate == 0 { + // idle connection + <-ctx.Done() + farewell = "context cancelled" + } else { + farewell = p.StartPublishing(ctx) + } + // TODO it seems that sometimes if we stop quickly after sending + // a message, this message is not delivered, even though Publish + // is supposed to by synchronous; to be investigated + time.Sleep(500 * time.Millisecond) + p.Stop(farewell) +} + +func (p Mqtt5Publisher) StartPublishing(ctx context.Context) string { + limiter := utils.RateLimiter(p.Config.Rate) + + var msgSent atomic.Int64 for { select { case <-ctx.Done(): - p.Stop("time limit reached") - return - case <-ticker.C: - p.Send() - msgSent++ - if msgSent >= p.Config.PublishCount { - p.Stop("--pmessages value reached") - return + return "time limit reached" + default: + if msgSent.Add(1) > int64(p.Config.PublishCount) { + return "--pmessages value reached" } + if p.Config.Rate > 0 { + _ = limiter.Wait(context.Background()) + } + p.Send() } } } diff --git a/pkg/stomp/publisher.go b/pkg/stomp/publisher.go index 5cb7e44..c3778cf 100644 --- a/pkg/stomp/publisher.go +++ b/pkg/stomp/publisher.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sync/atomic" "time" "github.com/rabbitmq/omq/pkg/config" @@ -77,61 +78,37 @@ func (p *StompPublisher) Start(ctx context.Context) { p.msg = utils.MessageBody(p.Config.Size) - switch p.Config.Rate { - case -1: - p.StartFullSpeed(ctx) - case 0: - p.StartIdle(ctx) - default: - p.StartRateLimited(ctx) - } -} - -func (p *StompPublisher) StartFullSpeed(ctx context.Context) { log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic) - for msgSent := 0; msgSent < p.Config.PublishCount; { - select { - case <-ctx.Done(): - return - default: - err := p.Send() - if err != nil { - p.Connect() - } else { - msgSent++ - } - } + var farewell string + if p.Config.Rate == 0 { + // idle connection + <-ctx.Done() + farewell = "context cancelled" + } else { + farewell = p.StartPublishing(ctx) } - log.Debug("publisher completed", "id", p.Id) + p.Stop(farewell) } -func (p *StompPublisher) StartIdle(ctx context.Context) { - log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic) - - <-ctx.Done() -} +func (p *StompPublisher) StartPublishing(ctx context.Context) string { + limiter := utils.RateLimiter(p.Config.Rate) -func (p *StompPublisher) StartRateLimited(ctx context.Context) { - log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic) - ticker := utils.RateTicker(p.Config.Rate) - - msgSent := 0 + var msgSent atomic.Int64 for { select { case <-ctx.Done(): - p.Stop("time limit reached") - return - case <-ticker.C: + return "context cancelled" + default: + if msgSent.Add(1) > int64(p.Config.PublishCount) { + return "--pmessages value reached" + } + if p.Config.Rate > 0 { + _ = limiter.Wait(context.Background()) + } err := p.Send() if err != nil { p.Connect() - } else { - msgSent++ - if msgSent >= p.Config.PublishCount { - p.Stop("--pmessages value reached") - return - } } } } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index e9bd54b..9c4cd48 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -9,6 +9,9 @@ import ( "strings" "time" + "golang.org/x/time/rate" + + "github.com/panjf2000/ants/v2" "github.com/rabbitmq/omq/pkg/log" ) @@ -118,9 +121,17 @@ func Rate(rate float32) string { } } -func RateTicker(rate float32) *time.Ticker { - if rate == 0 { - return nil +func RateLimiter(publishRate float32) *rate.Limiter { + var limit rate.Limit + if publishRate == -1 { + limit = rate.Inf + } else { + limit = rate.Limit(publishRate) } - return time.NewTicker(time.Duration(1_000_000/float64(rate)) * time.Microsecond) + // burst may need to be adjusted/dynamic, but for now it works pretty well + return rate.NewLimiter(limit, 1) +} + +func AntsPool(maxInFlight int) (*ants.Pool, error) { + return ants.NewPool(maxInFlight, ants.WithExpiryDuration(time.Duration(10*time.Second)), ants.WithNonblocking(false)) }