From 4b1a4339b99f063b44b55496bbc277fd864b80ce Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 6 Dec 2024 17:17:47 +0100 Subject: [PATCH] Publishers start synchronously; refactors * publishers now start one by one and then wait for a signal to start publishing * --time should be more accurate/intuitive, at least with AMQP --- cmd/root.go | 20 +++++++++++++------- main_test.go | 7 +++---- pkg/amqp10/consumer.go | 4 ++-- pkg/amqp10/publisher.go | 26 ++++++++++++++++---------- pkg/common/common.go | 2 +- pkg/mqtt/common.go | 2 +- pkg/mqtt/consumer.go | 6 +++--- pkg/mqtt/consumer_v5.go | 8 +++----- pkg/mqtt/publisher.go | 22 ++++++++++++++-------- pkg/mqtt/publisher_v5.go | 37 +++++++++++++++++++++++-------------- pkg/stomp/consumer.go | 4 ++-- pkg/stomp/publisher.go | 22 ++++++++++++++-------- 12 files changed, 95 insertions(+), 65 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index bb0f99b..ce3d0dd 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -349,9 +349,6 @@ func start(cfg config.Config) { var wg sync.WaitGroup - // TODO - // refactor; make consumer startup delay more accurate - if cfg.Queues != config.Predeclared { rmqMgmt.DeclareQueues(cfg) } @@ -369,11 +366,14 @@ func start(cfg config.Config) { } }() } - startPublishers(ctx, &wg) + + startPublishing := make(chan bool) + startPublishers(ctx, &wg, startPublishing) + close(startPublishing) if cfg.Duration > 0 { - log.Debug("will stop all consumers and publishers at " + time.Now().Add(cfg.Duration).String()) time.AfterFunc(cfg.Duration, func() { cancel() }) + log.Debug("will stop all consumers and publishers at " + time.Now().Add(cfg.Duration).String()) } // every second, print the current values of the metrics @@ -475,12 +475,13 @@ func startConsumers(ctx context.Context, wg *sync.WaitGroup) { } } -func startPublishers(ctx context.Context, wg *sync.WaitGroup) { +func startPublishers(ctx context.Context, wg *sync.WaitGroup, startPublishing chan bool) { for i := 1; i <= cfg.Publishers; i++ { select { case <-ctx.Done(): return default: + publisherReady := make(chan bool) n := i wg.Add(1) go func() { @@ -490,8 +491,13 @@ func startPublishers(ctx context.Context, wg *sync.WaitGroup) { log.Error("Error creating publisher: ", "error", err) os.Exit(1) } - p.Start(ctx) + // publisher are started one by one and synchronously + // then we close the channel to allow all of them to start publishing "at once" + // but each publisher sleeps for a random sub-second time, to avoid burts of + // messages being published + p.Start(ctx, publisherReady, startPublishing) }() + <-publisherReady } } } diff --git a/main_test.go b/main_test.go index 7114b26..d43abbd 100644 --- a/main_test.go +++ b/main_test.go @@ -33,8 +33,7 @@ var _ = Describe("OMQ CLI", func() { "--publish-to=time/%d", "--rate=1", "--consumers=100", - "--consume-from=/queues/time", - "--binding-key=time.#", + "--consume-from=time/%d", "--time", "5s", } session := omq(args) @@ -330,9 +329,9 @@ var _ = Describe("OMQ CLI", func() { output, _ := io.ReadAll(session.Out) buf := bytes.NewReader(output) - Expect(metricValue(buf, `omq_messages_consumed_total{priority="normal"}`)).Should((BeNumerically(">", 0))) + Expect(metricValue(buf, `omq_messages_consumed_total{priority="normal"}`)).Should((BeNumerically(">", 0.0))) buf.Reset(output) - Expect(metricValue(buf, `omq_messages_published_total`)).Should((BeNumerically(">", 0))) + Expect(metricValue(buf, `omq_messages_published_total`)).Should((BeNumerically(">", 0.0))) }, Entry("MQTT v3.1", "3", "MQTT 3-1"), Entry("MQTT v3.1.1", "4", "MQTT 3-1-1"), diff --git a/pkg/amqp10/consumer.go b/pkg/amqp10/consumer.go index ab5b13f..89d27c5 100644 --- a/pkg/amqp10/consumer.go +++ b/pkg/amqp10/consumer.go @@ -143,9 +143,9 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) { } } -func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) { +func (c *Amqp10Consumer) Start(ctx context.Context, consumerReady chan bool) { c.CreateReceiver(ctx) - close(subscribed) + close(consumerReady) log.Info("consumer started", "id", c.Id, "terminus", c.Terminus) previousMessageTimeSent := time.Unix(0, 0) diff --git a/pkg/amqp10/publisher.go b/pkg/amqp10/publisher.go index ab314de..57483af 100644 --- a/pkg/amqp10/publisher.go +++ b/pkg/amqp10/publisher.go @@ -149,11 +149,7 @@ func (p *Amqp10Publisher) CreateSender() { } } -func (p *Amqp10Publisher) Start(ctx context.Context) { - // sleep random interval to avoid all publishers publishing at the same time - s := rand.Intn(1000) - time.Sleep(time.Duration(s) * time.Millisecond) - +func (p *Amqp10Publisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) { p.msg = utils.MessageBody(p.Config.Size) var err error @@ -164,6 +160,16 @@ func (p *Amqp10Publisher) Start(ctx context.Context) { } defer p.pool.Release() + close(publisherReady) + + select { + case <-ctx.Done(): + return + case <-startPublishing: + // short random delay to avoid all publishers publishing at the same time + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + } + log.Info("publisher started", "id", p.Id, "rate", utils.Rate(p.Config.Rate), "destination", p.Terminus) var farewell string if p.Config.Rate == 0 { @@ -189,14 +195,14 @@ func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string { return "--pmessages value reached" } if p.Config.Rate > 0 { - _ = limiter.Wait(context.Background()) + _ = limiter.Wait(ctx) } p.poolWg.Add(1) _ = p.pool.Submit(func() { defer func() { p.poolWg.Done() }() - err := p.Send() + err := p.Send(ctx) if err != nil { p.Connect() } @@ -205,7 +211,7 @@ func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string { } } -func (p *Amqp10Publisher) Send() error { +func (p *Amqp10Publisher) Send(ctx context.Context) error { utils.UpdatePayload(p.Config.UseMillis, &p.msg) msg := amqp.NewMessage(p.msg) @@ -236,7 +242,7 @@ func (p *Amqp10Publisher) Send() error { } startTime := time.Now() - receipt, err := p.Sender.SendWithReceipt(context.TODO(), msg, nil) + receipt, err := p.Sender.SendWithReceipt(ctx, msg, nil) if err != nil { var connErr *amqp.ConnError var linkErr *amqp.LinkError @@ -288,7 +294,7 @@ func (p *Amqp10Publisher) handleSent(receipt *amqp.SendReceipt, published time.T func (p *Amqp10Publisher) Stop(reason string) { p.poolWg.Wait() - log.Debug("closing connection", "id", p.Id, "reason", reason) + log.Debug("closing publisher connection", "id", p.Id, "reason", reason) if p.Session != nil { _ = p.Session.Close(context.Background()) } diff --git a/pkg/common/common.go b/pkg/common/common.go index a32e398..857aaea 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -11,7 +11,7 @@ import ( ) type Publisher interface { - Start(context.Context) + Start(context.Context, chan bool, chan bool) } type Consumer interface { diff --git a/pkg/mqtt/common.go b/pkg/mqtt/common.go index e34fc1f..8a32ae7 100644 --- a/pkg/mqtt/common.go +++ b/pkg/mqtt/common.go @@ -14,7 +14,7 @@ type Consumer interface { } type Publisher interface { - Start(context.Context) + Start(context.Context, chan bool, chan bool) } func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer { diff --git a/pkg/mqtt/consumer.go b/pkg/mqtt/consumer.go index f827af6..594f88e 100644 --- a/pkg/mqtt/consumer.go +++ b/pkg/mqtt/consumer.go @@ -29,7 +29,7 @@ func NewMqttConsumer(cfg config.Config, id int) MqttConsumer { } } -func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) { +func (c MqttConsumer) Start(ctx context.Context, cosumerReady chan bool) { msgsReceived := 0 previousMessageTimeSent := time.Unix(0, 0) @@ -87,7 +87,7 @@ func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) { log.Error("failed to connect", "id", c.Id, "error", token.Error()) } - close(subscribed) + close(cosumerReady) // TODO: currently we can consume more than ConsumerCount messages for msgsReceived < c.Config.ConsumeCount { @@ -104,7 +104,7 @@ func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) { } func (c MqttConsumer) Stop(reason string) { - log.Debug("closing connection", "id", c.Id, "reason", reason) + log.Debug("closing consumer connection", "id", c.Id, "reason", reason) c.Connection.Disconnect(250) } diff --git a/pkg/mqtt/consumer_v5.go b/pkg/mqtt/consumer_v5.go index bc8e104..63e5463 100644 --- a/pkg/mqtt/consumer_v5.go +++ b/pkg/mqtt/consumer_v5.go @@ -30,7 +30,7 @@ func NewMqtt5Consumer(cfg config.Config, id int) Mqtt5Consumer { } } -func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) { +func (c Mqtt5Consumer) Start(ctx context.Context, consumerReady chan bool) { msgsReceived := 0 previousMessageTimeSent := time.Unix(0, 0) @@ -90,8 +90,6 @@ func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) { }, } - defer c.Stop("shutting down") - var err error c.Connection, err = autopaho.NewConnection(ctx, opts) if err != nil { @@ -102,7 +100,7 @@ func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) { // AwaitConnection only returns an error if the context is cancelled return } - close(subscribed) + close(consumerReady) // TODO: currently we can consume more than ConsumerCount messages for msgsReceived < c.Config.ConsumeCount { @@ -119,7 +117,7 @@ func (c Mqtt5Consumer) Start(ctx context.Context, subscribed chan bool) { } func (c Mqtt5Consumer) Stop(reason string) { - log.Debug("closing connection", "id", c.Id, "reason", reason) + log.Debug("closing consumer connection", "id", c.Id, "reason", reason) if c.Connection != nil { _ = c.Connection.Disconnect(context.TODO()) } diff --git a/pkg/mqtt/publisher.go b/pkg/mqtt/publisher.go index 24d0d09..c912f99 100644 --- a/pkg/mqtt/publisher.go +++ b/pkg/mqtt/publisher.go @@ -2,7 +2,6 @@ package mqtt import ( "context" - "math/rand" "sync/atomic" "time" @@ -10,6 +9,7 @@ import ( "github.com/rabbitmq/omq/pkg/config" "github.com/rabbitmq/omq/pkg/log" "github.com/rabbitmq/omq/pkg/utils" + "golang.org/x/exp/rand" "github.com/rabbitmq/omq/pkg/metrics" ) @@ -77,11 +77,7 @@ func (p MqttPublisher) connectionOptions() *mqtt.ClientOptions { return opts } -func (p MqttPublisher) Start(ctx context.Context) { - // sleep random interval to avoid all publishers publishing at the same time - s := rand.Intn(1000) - time.Sleep(time.Duration(s) * time.Millisecond) - +func (p MqttPublisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) { defer func() { if p.Connection != nil { p.Connection.Disconnect(250) @@ -92,6 +88,16 @@ func (p MqttPublisher) Start(ctx context.Context) { p.msg = utils.MessageBody(p.Config.Size) + close(publisherReady) + + select { + case <-ctx.Done(): + return + case <-startPublishing: + // short random delay to avoid all publishers publishing at the same time + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + } + log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic) var farewell string @@ -118,7 +124,7 @@ func (p MqttPublisher) StartPublishing(ctx context.Context) string { return "--pmessages value reached" } if p.Config.Rate > 0 { - _ = limiter.Wait(context.Background()) + _ = limiter.Wait(ctx) } p.Send() } @@ -145,7 +151,7 @@ func (p MqttPublisher) Send() { } func (p MqttPublisher) Stop(reason string) { - log.Debug("closing connection", "id", p.Id, "reason", reason) + log.Debug("closing publisher connection", "id", p.Id, "reason", reason) if p.Connection != nil { p.Connection.Disconnect(250) } diff --git a/pkg/mqtt/publisher_v5.go b/pkg/mqtt/publisher_v5.go index b77295a..71516c2 100644 --- a/pkg/mqtt/publisher_v5.go +++ b/pkg/mqtt/publisher_v5.go @@ -2,7 +2,7 @@ package mqtt import ( "context" - "math/rand" + "strings" "sync/atomic" "time" @@ -11,6 +11,7 @@ import ( "github.com/rabbitmq/omq/pkg/config" "github.com/rabbitmq/omq/pkg/log" "github.com/rabbitmq/omq/pkg/utils" + "golang.org/x/exp/rand" "github.com/rabbitmq/omq/pkg/metrics" ) @@ -76,17 +77,21 @@ func (p Mqtt5Publisher) connectionOptions() autopaho.ClientConfig { return opts } -func (p Mqtt5Publisher) Start(ctx context.Context) { - // sleep random interval to avoid all publishers publishing at the same time - s := rand.Intn(1000) - time.Sleep(time.Duration(s) * time.Millisecond) - - defer p.Stop("shutting down") - +func (p Mqtt5Publisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) { p.Connect(ctx) p.msg = utils.MessageBody(p.Config.Size) + close(publisherReady) + + select { + case <-ctx.Done(): + return + case <-startPublishing: + // short random delay to avoid all publishers publishing at the same time + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + } + log.Info("publisher started", "id", p.Id, "rate", p.Config.Rate, "destination", p.Topic) var farewell string @@ -117,23 +122,27 @@ func (p Mqtt5Publisher) StartPublishing(ctx context.Context) string { return "--pmessages value reached" } if p.Config.Rate > 0 { - _ = limiter.Wait(context.Background()) + _ = limiter.Wait(ctx) } - p.Send() + p.Send(ctx) } } } -func (p Mqtt5Publisher) Send() { +func (p Mqtt5Publisher) Send(ctx context.Context) { utils.UpdatePayload(p.Config.UseMillis, &p.msg) startTime := time.Now() - _, err := p.Connection.Publish(context.TODO(), &paho.Publish{ + _, err := p.Connection.Publish(ctx, &paho.Publish{ QoS: byte(p.Config.MqttPublisher.QoS), Topic: p.Topic, Payload: p.msg, }) if err != nil { - log.Error("message sending failure", "id", p.Id, "error", err) + // I couldn't find any way to prevent publishing just after omq + // initiated the shutdown procedure, so we have to ignore this + if !strings.Contains(err.Error(), "use of closed network connection") { + log.Error("message sending failure", "id", p.Id, "error", err) + } return } latency := time.Since(startTime) @@ -143,7 +152,7 @@ func (p Mqtt5Publisher) Send() { } func (p Mqtt5Publisher) Stop(reason string) { - log.Debug("closing connection", "id", p.Id, "reason", reason) + log.Debug("closing publisher connection", "id", p.Id, "reason", reason) if p.Connection != nil { _ = p.Connection.Disconnect(context.TODO()) } diff --git a/pkg/stomp/consumer.go b/pkg/stomp/consumer.go index 3cd6139..095b8ac 100644 --- a/pkg/stomp/consumer.go +++ b/pkg/stomp/consumer.go @@ -89,9 +89,9 @@ func (c *StompConsumer) Subscribe() { c.Subscription = sub } -func (c *StompConsumer) Start(ctx context.Context, subscribed chan bool) { +func (c *StompConsumer) Start(ctx context.Context, consumerReady chan bool) { c.Subscribe() - close(subscribed) + close(consumerReady) log.Info("consumer started", "id", c.Id, "destination", c.Topic) previousMessageTimeSent := time.Unix(0, 0) diff --git a/pkg/stomp/publisher.go b/pkg/stomp/publisher.go index c3778cf..20b26cc 100644 --- a/pkg/stomp/publisher.go +++ b/pkg/stomp/publisher.go @@ -3,7 +3,6 @@ package stomp import ( "context" "fmt" - "math/rand" "sync/atomic" "time" @@ -11,6 +10,7 @@ import ( "github.com/rabbitmq/omq/pkg/log" "github.com/rabbitmq/omq/pkg/metrics" "github.com/rabbitmq/omq/pkg/utils" + "golang.org/x/exp/rand" "github.com/go-stomp/stomp/v3" "github.com/go-stomp/stomp/v3/frame" @@ -71,13 +71,19 @@ func (p *StompPublisher) Connect() { } } -func (p *StompPublisher) Start(ctx context.Context) { - // sleep random interval to avoid all publishers publishing at the same time - s := rand.Intn(1000) - time.Sleep(time.Duration(s) * time.Millisecond) - +func (p *StompPublisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) { p.msg = utils.MessageBody(p.Config.Size) + close(publisherReady) + + select { + case <-ctx.Done(): + return + case <-startPublishing: + // short random delay to avoid all publishers publishing at the same time + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + } + log.Info("publisher started", "id", p.Id, "rate", "unlimited", "destination", p.Topic) var farewell string @@ -104,7 +110,7 @@ func (p *StompPublisher) StartPublishing(ctx context.Context) string { return "--pmessages value reached" } if p.Config.Rate > 0 { - _ = limiter.Wait(context.Background()) + _ = limiter.Wait(ctx) } err := p.Send() if err != nil { @@ -131,7 +137,7 @@ func (p *StompPublisher) Send() error { } func (p *StompPublisher) Stop(reason string) { - log.Debug("closing connection", "id", p.Id, "reason", reason) + log.Debug("closing publisher connection", "id", p.Id, "reason", reason) _ = p.Connection.Disconnect() }