Skip to content

Commit

Permalink
Publishers start synchronously; refactors
Browse files Browse the repository at this point in the history
* publishers now start one by one and then wait for a signal to start
  publishing
* --time should be more accurate/intuitive, at least with AMQP
  • Loading branch information
mkuratczyk committed Dec 6, 2024
1 parent a01b262 commit 4b1a433
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 65 deletions.
20 changes: 13 additions & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,6 @@ func start(cfg config.Config) {

var wg sync.WaitGroup

// TODO
// refactor; make consumer startup delay more accurate

if cfg.Queues != config.Predeclared {
rmqMgmt.DeclareQueues(cfg)
}
Expand All @@ -369,11 +366,14 @@ func start(cfg config.Config) {
}
}()
}
startPublishers(ctx, &wg)

startPublishing := make(chan bool)
startPublishers(ctx, &wg, startPublishing)
close(startPublishing)

if cfg.Duration > 0 {
log.Debug("will stop all consumers and publishers at " + time.Now().Add(cfg.Duration).String())
time.AfterFunc(cfg.Duration, func() { cancel() })
log.Debug("will stop all consumers and publishers at " + time.Now().Add(cfg.Duration).String())
}

// every second, print the current values of the metrics
Expand Down Expand Up @@ -475,12 +475,13 @@ func startConsumers(ctx context.Context, wg *sync.WaitGroup) {
}
}

func startPublishers(ctx context.Context, wg *sync.WaitGroup) {
func startPublishers(ctx context.Context, wg *sync.WaitGroup, startPublishing chan bool) {
for i := 1; i <= cfg.Publishers; i++ {
select {
case <-ctx.Done():
return
default:
publisherReady := make(chan bool)
n := i
wg.Add(1)
go func() {
Expand All @@ -490,8 +491,13 @@ func startPublishers(ctx context.Context, wg *sync.WaitGroup) {
log.Error("Error creating publisher: ", "error", err)
os.Exit(1)
}
p.Start(ctx)
// publisher are started one by one and synchronously
// then we close the channel to allow all of them to start publishing "at once"
// but each publisher sleeps for a random sub-second time, to avoid burts of
// messages being published
p.Start(ctx, publisherReady, startPublishing)
}()
<-publisherReady
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ var _ = Describe("OMQ CLI", func() {
"--publish-to=time/%d",
"--rate=1",
"--consumers=100",
"--consume-from=/queues/time",
"--binding-key=time.#",
"--consume-from=time/%d",
"--time", "5s",
}
session := omq(args)
Expand Down Expand Up @@ -330,9 +329,9 @@ var _ = Describe("OMQ CLI", func() {

output, _ := io.ReadAll(session.Out)
buf := bytes.NewReader(output)
Expect(metricValue(buf, `omq_messages_consumed_total{priority="normal"}`)).Should((BeNumerically(">", 0)))
Expect(metricValue(buf, `omq_messages_consumed_total{priority="normal"}`)).Should((BeNumerically(">", 0.0)))
buf.Reset(output)
Expect(metricValue(buf, `omq_messages_published_total`)).Should((BeNumerically(">", 0)))
Expect(metricValue(buf, `omq_messages_published_total`)).Should((BeNumerically(">", 0.0)))
},
Entry("MQTT v3.1", "3", "MQTT 3-1"),
Entry("MQTT v3.1.1", "4", "MQTT 3-1-1"),
Expand Down
4 changes: 2 additions & 2 deletions pkg/amqp10/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) {
}
}

func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
func (c *Amqp10Consumer) Start(ctx context.Context, consumerReady chan bool) {
c.CreateReceiver(ctx)
close(subscribed)
close(consumerReady)
log.Info("consumer started", "id", c.Id, "terminus", c.Terminus)
previousMessageTimeSent := time.Unix(0, 0)

Expand Down
26 changes: 16 additions & 10 deletions pkg/amqp10/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,7 @@ func (p *Amqp10Publisher) CreateSender() {
}
}

func (p *Amqp10Publisher) Start(ctx context.Context) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(1000)
time.Sleep(time.Duration(s) * time.Millisecond)

func (p *Amqp10Publisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) {
p.msg = utils.MessageBody(p.Config.Size)

var err error
Expand All @@ -164,6 +160,16 @@ func (p *Amqp10Publisher) Start(ctx context.Context) {
}
defer p.pool.Release()

close(publisherReady)

select {
case <-ctx.Done():
return
case <-startPublishing:
// short random delay to avoid all publishers publishing at the same time
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

log.Info("publisher started", "id", p.Id, "rate", utils.Rate(p.Config.Rate), "destination", p.Terminus)
var farewell string
if p.Config.Rate == 0 {
Expand All @@ -189,14 +195,14 @@ func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string {
return "--pmessages value reached"
}
if p.Config.Rate > 0 {
_ = limiter.Wait(context.Background())
_ = limiter.Wait(ctx)
}
p.poolWg.Add(1)
_ = p.pool.Submit(func() {
defer func() {
p.poolWg.Done()
}()
err := p.Send()
err := p.Send(ctx)
if err != nil {
p.Connect()
}
Expand All @@ -205,7 +211,7 @@ func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string {
}
}

func (p *Amqp10Publisher) Send() error {
func (p *Amqp10Publisher) Send(ctx context.Context) error {
utils.UpdatePayload(p.Config.UseMillis, &p.msg)
msg := amqp.NewMessage(p.msg)

Expand Down Expand Up @@ -236,7 +242,7 @@ func (p *Amqp10Publisher) Send() error {
}

startTime := time.Now()
receipt, err := p.Sender.SendWithReceipt(context.TODO(), msg, nil)
receipt, err := p.Sender.SendWithReceipt(ctx, msg, nil)
if err != nil {
var connErr *amqp.ConnError
var linkErr *amqp.LinkError
Expand Down Expand Up @@ -288,7 +294,7 @@ func (p *Amqp10Publisher) handleSent(receipt *amqp.SendReceipt, published time.T

func (p *Amqp10Publisher) Stop(reason string) {
p.poolWg.Wait()
log.Debug("closing connection", "id", p.Id, "reason", reason)
log.Debug("closing publisher connection", "id", p.Id, "reason", reason)
if p.Session != nil {
_ = p.Session.Close(context.Background())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type Publisher interface {
Start(context.Context)
Start(context.Context, chan bool, chan bool)
}

type Consumer interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Consumer interface {
}

type Publisher interface {
Start(context.Context)
Start(context.Context, chan bool, chan bool)
}

func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer {
Expand Down
6 changes: 3 additions & 3 deletions pkg/mqtt/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewMqttConsumer(cfg config.Config, id int) MqttConsumer {
}
}

func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) {
func (c MqttConsumer) Start(ctx context.Context, cosumerReady chan bool) {
msgsReceived := 0
previousMessageTimeSent := time.Unix(0, 0)

Expand Down Expand Up @@ -87,7 +87,7 @@ func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) {
log.Error("failed to connect", "id", c.Id, "error", token.Error())
}

close(subscribed)
close(cosumerReady)

// TODO: currently we can consume more than ConsumerCount messages
for msgsReceived < c.Config.ConsumeCount {
Expand All @@ -104,7 +104,7 @@ func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) {
}

func (c MqttConsumer) Stop(reason string) {
log.Debug("closing connection", "id", c.Id, "reason", reason)
log.Debug("closing consumer connection", "id", c.Id, "reason", reason)
c.Connection.Disconnect(250)
}

Expand Down
8 changes: 3 additions & 5 deletions pkg/mqtt/consumer_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewMqtt5Consumer(cfg config.Config, id int) Mqtt5Consumer {
}
}

func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) {
func (c Mqtt5Consumer) Start(ctx context.Context, consumerReady chan bool) {
msgsReceived := 0
previousMessageTimeSent := time.Unix(0, 0)

Expand Down Expand Up @@ -90,8 +90,6 @@ func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) {
},
}

defer c.Stop("shutting down")

var err error
c.Connection, err = autopaho.NewConnection(ctx, opts)
if err != nil {
Expand All @@ -102,7 +100,7 @@ func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) {
// AwaitConnection only returns an error if the context is cancelled
return
}
close(subscribed)
close(consumerReady)

// TODO: currently we can consume more than ConsumerCount messages
for msgsReceived < c.Config.ConsumeCount {
Expand All @@ -119,7 +117,7 @@ func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) {
}

func (c Mqtt5Consumer) Stop(reason string) {
log.Debug("closing connection", "id", c.Id, "reason", reason)
log.Debug("closing consumer connection", "id", c.Id, "reason", reason)
if c.Connection != nil {
_ = c.Connection.Disconnect(context.TODO())
}
Expand Down
22 changes: 14 additions & 8 deletions pkg/mqtt/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package mqtt

import (
"context"
"math/rand"
"sync/atomic"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
"github.com/rabbitmq/omq/pkg/utils"
"golang.org/x/exp/rand"

"github.com/rabbitmq/omq/pkg/metrics"
)
Expand Down Expand Up @@ -77,11 +77,7 @@ func (p MqttPublisher) connectionOptions() *mqtt.ClientOptions {
return opts
}

func (p MqttPublisher) Start(ctx context.Context) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(1000)
time.Sleep(time.Duration(s) * time.Millisecond)

func (p MqttPublisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) {
defer func() {
if p.Connection != nil {
p.Connection.Disconnect(250)
Expand All @@ -92,6 +88,16 @@ func (p MqttPublisher) Start(ctx context.Context) {

p.msg = utils.MessageBody(p.Config.Size)

close(publisherReady)

select {
case <-ctx.Done():
return
case <-startPublishing:
// short random delay to avoid all publishers publishing at the same time
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic)

var farewell string
Expand All @@ -118,7 +124,7 @@ func (p MqttPublisher) StartPublishing(ctx context.Context) string {
return "--pmessages value reached"
}
if p.Config.Rate > 0 {
_ = limiter.Wait(context.Background())
_ = limiter.Wait(ctx)
}
p.Send()
}
Expand All @@ -145,7 +151,7 @@ func (p MqttPublisher) Send() {
}

func (p MqttPublisher) Stop(reason string) {
log.Debug("closing connection", "id", p.Id, "reason", reason)
log.Debug("closing publisher connection", "id", p.Id, "reason", reason)
if p.Connection != nil {
p.Connection.Disconnect(250)
}
Expand Down
37 changes: 23 additions & 14 deletions pkg/mqtt/publisher_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package mqtt

import (
"context"
"math/rand"
"strings"
"sync/atomic"
"time"

Expand All @@ -11,6 +11,7 @@ import (
"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
"github.com/rabbitmq/omq/pkg/utils"
"golang.org/x/exp/rand"

"github.com/rabbitmq/omq/pkg/metrics"
)
Expand Down Expand Up @@ -76,17 +77,21 @@ func (p Mqtt5Publisher) connectionOptions() autopaho.ClientConfig {
return opts
}

func (p Mqtt5Publisher) Start(ctx context.Context) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(1000)
time.Sleep(time.Duration(s) * time.Millisecond)

defer p.Stop("shutting down")

func (p Mqtt5Publisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) {
p.Connect(ctx)

p.msg = utils.MessageBody(p.Config.Size)

close(publisherReady)

select {
case <-ctx.Done():
return
case <-startPublishing:
// short random delay to avoid all publishers publishing at the same time
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}

log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic)

var farewell string
Expand Down Expand Up @@ -117,23 +122,27 @@ func (p Mqtt5Publisher) StartPublishing(ctx context.Context) string {
return "--pmessages value reached"
}
if p.Config.Rate > 0 {
_ = limiter.Wait(context.Background())
_ = limiter.Wait(ctx)
}
p.Send()
p.Send(ctx)
}
}
}

func (p Mqtt5Publisher) Send() {
func (p Mqtt5Publisher) Send(ctx context.Context) {
utils.UpdatePayload(p.Config.UseMillis, &p.msg)
startTime := time.Now()
_, err := p.Connection.Publish(context.TODO(), &paho.Publish{
_, err := p.Connection.Publish(ctx, &paho.Publish{
QoS: byte(p.Config.MqttPublisher.QoS),
Topic: p.Topic,
Payload: p.msg,
})
if err != nil {
log.Error("message sending failure", "id", p.Id, "error", err)
// I couldn't find any way to prevent publishing just after omq
// initiated the shutdown procedure, so we have to ignore this
if !strings.Contains(err.Error(), "use of closed network connection") {
log.Error("message sending failure", "id", p.Id, "error", err)
}
return
}
latency := time.Since(startTime)
Expand All @@ -143,7 +152,7 @@ func (p Mqtt5Publisher) Send() {
}

func (p Mqtt5Publisher) Stop(reason string) {
log.Debug("closing connection", "id", p.Id, "reason", reason)
log.Debug("closing publisher connection", "id", p.Id, "reason", reason)
if p.Connection != nil {
_ = p.Connection.Disconnect(context.TODO())
}
Expand Down
Loading

0 comments on commit 4b1a433

Please sign in to comment.