Skip to content

Commit

Permalink
Refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 6, 2024
1 parent 56b3720 commit fcdd23f
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 174 deletions.
37 changes: 13 additions & 24 deletions pkg/amqp10/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
"github.com/rabbitmq/omq/pkg/utils"
"golang.org/x/time/rate"

"github.com/rabbitmq/omq/pkg/metrics"

Expand Down Expand Up @@ -158,46 +157,36 @@ func (p *Amqp10Publisher) Start(ctx context.Context) {
p.msg = utils.MessageBody(p.Config.Size)

var err error
p.pool, err = ants.NewPool(p.Config.MaxInFlight, ants.WithExpiryDuration(time.Duration(10*time.Second)), ants.WithNonblocking(false))
p.pool, err = utils.AntsPool(p.Config.MaxInFlight)
if err != nil {
log.Error("Can't initialize a pool for handling send receipts", "error", err)
return
}
defer p.pool.Release()

log.Info("publisher started", "id", p.Id, "rate", utils.Rate(p.Config.Rate), "destination", p.Terminus)
switch p.Config.Rate {
case 0:
p.StartIdle(ctx)
default:
p.StartPublishing(ctx)
var farewell string
if p.Config.Rate == 0 {
// idle connection
<-ctx.Done()
farewell = "context cancelled"
} else {
farewell = p.StartPublishing(ctx)
}
p.Stop(farewell)
}

func (p *Amqp10Publisher) StartIdle(ctx context.Context) {
<-ctx.Done()
}

func (p *Amqp10Publisher) StartPublishing(ctx context.Context) {
var limit rate.Limit
if p.Config.Rate == -1 {
limit = rate.Inf
} else {
limit = rate.Limit(p.Config.Rate)
}
// burst will probably need to be adjusted/dynamic, but for now it's fine
limiter := rate.NewLimiter(limit, 1)
func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string {
limiter := utils.RateLimiter(p.Config.Rate)

var msgSent atomic.Int64
for {
select {
case <-ctx.Done():
p.Stop("time limit reached")
return
return "context cancelled"
default:
if msgSent.Add(1) > int64(p.Config.PublishCount) {
p.Stop("--pmessages value reached")
return
return "--pmessages value reached"
}
if p.Config.Rate > 0 {
_ = limiter.Wait(context.Background())
Expand Down
85 changes: 34 additions & 51 deletions pkg/mqtt/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mqtt
import (
"context"
"math/rand"
"sync/atomic"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
Expand Down Expand Up @@ -31,32 +32,6 @@ func NewMqttPublisher(cfg config.Config, id int) MqttPublisher {
}
}

func (p MqttPublisher) Start(ctx context.Context) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(1000)
time.Sleep(time.Duration(s) * time.Millisecond)

defer func() {
if p.Connection != nil {
p.Connection.Disconnect(250)
}
}()

p.Connect(ctx)

p.msg = utils.MessageBody(p.Config.Size)

switch p.Config.Rate {
case -1:
p.StartFullSpeed(ctx)
case 0:
p.StartIdle(ctx)
default:
p.StartRateLimited(ctx)
}
log.Debug("publisher stopped", "id", p.Id)
}

func (p *MqttPublisher) Connect(ctx context.Context) {
var token mqtt.Token

Expand Down Expand Up @@ -102,42 +77,50 @@ func (p MqttPublisher) connectionOptions() *mqtt.ClientOptions {
return opts
}

func (p MqttPublisher) StartFullSpeed(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic)
func (p MqttPublisher) Start(ctx context.Context) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(1000)
time.Sleep(time.Duration(s) * time.Millisecond)

for msgSent := 0; msgSent < p.Config.PublishCount; msgSent++ {
select {
case <-ctx.Done():
return
default:
p.Send()
defer func() {
if p.Connection != nil {
p.Connection.Disconnect(250)
}
}
}
}()

func (p MqttPublisher) StartIdle(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic)
p.Connect(ctx)

<-ctx.Done()
}
p.msg = utils.MessageBody(p.Config.Size)

func (p MqttPublisher) StartRateLimited(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic)
ticker := utils.RateTicker(p.Config.Rate)

msgSent := 0
var farewell string
if p.Config.Rate == 0 {
// idle connection
<-ctx.Done()
farewell = "context cancelled"
} else {
farewell = p.StartPublishing(ctx)
}
p.Stop(farewell)
}

func (p MqttPublisher) StartPublishing(ctx context.Context) string {
limiter := utils.RateLimiter(p.Config.Rate)

var msgSent atomic.Int64
for {
select {
case <-ctx.Done():
p.Stop("time limit reached")
return
case <-ticker.C:
p.Send()
msgSent++
if msgSent >= p.Config.PublishCount {
p.Stop("--pmessages value reached")
return
return "time limit reached"
default:
if msgSent.Add(1) > int64(p.Config.PublishCount) {
return "--pmessages value reached"
}
if p.Config.Rate > 0 {
_ = limiter.Wait(context.Background())
}
p.Send()
}
}
}
Expand Down
87 changes: 35 additions & 52 deletions pkg/mqtt/publisher_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mqtt
import (
"context"
"math/rand"
"sync/atomic"
"time"

"github.com/eclipse/paho.golang/autopaho"
Expand Down Expand Up @@ -32,32 +33,6 @@ func NewMqtt5Publisher(cfg config.Config, id int) Mqtt5Publisher {
}
}

func (p Mqtt5Publisher) Start(ctx context.Context) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(1000)
time.Sleep(time.Duration(s) * time.Millisecond)

defer p.Stop("shutting down")

p.Connect(ctx)

p.msg = utils.MessageBody(p.Config.Size)

switch p.Config.Rate {
case -1:
p.StartFullSpeed(ctx)
case 0:
p.StartIdle(ctx)
default:
p.StartRateLimited(ctx)
}
// TODO it seems that sometimes if we stop quickly after sending
// a message, this message is not delivered, even though Publish
// is supposed to by synchronous; to be investigated
time.Sleep(500 * time.Millisecond)
log.Debug("publisher stopped", "id", p.Id)
}

func (p *Mqtt5Publisher) Connect(ctx context.Context) {
opts := p.connectionOptions()
connection, err := autopaho.NewConnection(ctx, opts)
Expand Down Expand Up @@ -101,42 +76,50 @@ func (p Mqtt5Publisher) connectionOptions() autopaho.ClientConfig {
return opts
}

func (p Mqtt5Publisher) StartFullSpeed(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic)
func (p Mqtt5Publisher) Start(ctx context.Context) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(1000)
time.Sleep(time.Duration(s) * time.Millisecond)

for msgSent := 0; msgSent < p.Config.PublishCount; msgSent++ {
select {
case <-ctx.Done():
return
default:
p.Send()
}
}
}
defer p.Stop("shutting down")

func (p Mqtt5Publisher) StartIdle(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic)
p.Connect(ctx)

<-ctx.Done()
}
p.msg = utils.MessageBody(p.Config.Size)

func (p Mqtt5Publisher) StartRateLimited(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic)
ticker := utils.RateTicker(p.Config.Rate)

msgSent := 0
var farewell string
if p.Config.Rate == 0 {
// idle connection
<-ctx.Done()
farewell = "context cancelled"
} else {
farewell = p.StartPublishing(ctx)
}
// TODO it seems that sometimes if we stop quickly after sending
// a message, this message is not delivered, even though Publish
// is supposed to by synchronous; to be investigated
time.Sleep(500 * time.Millisecond)
p.Stop(farewell)
}

func (p Mqtt5Publisher) StartPublishing(ctx context.Context) string {
limiter := utils.RateLimiter(p.Config.Rate)

var msgSent atomic.Int64
for {
select {
case <-ctx.Done():
p.Stop("time limit reached")
return
case <-ticker.C:
p.Send()
msgSent++
if msgSent >= p.Config.PublishCount {
p.Stop("--pmessages value reached")
return
return "time limit reached"
default:
if msgSent.Add(1) > int64(p.Config.PublishCount) {
return "--pmessages value reached"
}
if p.Config.Rate > 0 {
_ = limiter.Wait(context.Background())
}
p.Send()
}
}
}
Expand Down
63 changes: 20 additions & 43 deletions pkg/stomp/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync/atomic"
"time"

"github.com/rabbitmq/omq/pkg/config"
Expand Down Expand Up @@ -77,61 +78,37 @@ func (p *StompPublisher) Start(ctx context.Context) {

p.msg = utils.MessageBody(p.Config.Size)

switch p.Config.Rate {
case -1:
p.StartFullSpeed(ctx)
case 0:
p.StartIdle(ctx)
default:
p.StartRateLimited(ctx)
}
}

func (p *StompPublisher) StartFullSpeed(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic)

for msgSent := 0; msgSent < p.Config.PublishCount; {
select {
case <-ctx.Done():
return
default:
err := p.Send()
if err != nil {
p.Connect()
} else {
msgSent++
}
}
var farewell string
if p.Config.Rate == 0 {
// idle connection
<-ctx.Done()
farewell = "context cancelled"
} else {
farewell = p.StartPublishing(ctx)
}
log.Debug("publisher completed", "id", p.Id)
p.Stop(farewell)
}

func (p *StompPublisher) StartIdle(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic)

<-ctx.Done()
}
func (p *StompPublisher) StartPublishing(ctx context.Context) string {
limiter := utils.RateLimiter(p.Config.Rate)

func (p *StompPublisher) StartRateLimited(ctx context.Context) {
log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic)
ticker := utils.RateTicker(p.Config.Rate)

msgSent := 0
var msgSent atomic.Int64
for {
select {
case <-ctx.Done():
p.Stop("time limit reached")
return
case <-ticker.C:
return "context cancelled"
default:
if msgSent.Add(1) > int64(p.Config.PublishCount) {
return "--pmessages value reached"
}
if p.Config.Rate > 0 {
_ = limiter.Wait(context.Background())
}
err := p.Send()
if err != nil {
p.Connect()
} else {
msgSent++
if msgSent >= p.Config.PublishCount {
p.Stop("--pmessages value reached")
return
}
}
}
}
Expand Down
Loading

0 comments on commit fcdd23f

Please sign in to comment.