Skip to content

Commit

Permalink
Add --spread-connections (default: true)
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Aug 13, 2024
1 parent 951ae6f commit aabaab9
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 13 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ messages published with perf-test can be consumed by `omq` or vice versa, and th
--queue-durability queue-durability Queue durability (default: configuration - the queue definition is durable) (default configuration)
-r, --rate float Messages per second (-1 = unlimited) (default -1)
-s, --size int Message payload size in bytes (default 12)
--spread-connections Spread connections across URIs (default true)
--stream-filter-value-set string Stream filter value for publisher
--stream-filter-values string Stream consumer filter
--stream-offset string Stream consumer offset specification (default=next)
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().StringSliceVar(&metricTags, "metric-tags", []string{}, "Prometheus label-value pairs, eg. l1=v1,l2=v2")
rootCmd.PersistentFlags().
BoolVar(&cfg.LogOutOfOrder, "log-out-of-order-messages", false, "Print a log line when a message is received that is older than the previously received message")
rootCmd.PersistentFlags().
BoolVar(&cfg.SpreadConnections, "spread-connections", true, "Spread connections across URIs")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down
6 changes: 5 additions & 1 deletion pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func NewConsumer(cfg config.Config, id int) *Amqp10Consumer {
whichUri: 0,
}

// TODO
if cfg.SpreadConnections {
consumer.whichUri = id - 1%len(cfg.ConsumerUri)
}

// TODO: context?
consumer.Connect(context.TODO())

return consumer
Expand Down
10 changes: 7 additions & 3 deletions pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@ type Amqp10Publisher struct {
whichUri int
}

func NewPublisher(cfg config.Config, n int) *Amqp10Publisher {
func NewPublisher(cfg config.Config, id int) *Amqp10Publisher {
publisher := &Amqp10Publisher{
Id: n,
Id: id,
Connection: nil,
Sender: nil,
Config: cfg,
Terminus: topic.CalculateTopic(cfg.PublishTo, n),
Terminus: topic.CalculateTopic(cfg.PublishTo, id),
whichUri: 0,
}

if cfg.SpreadConnections {
publisher.whichUri = id - 1%len(cfg.PublisherUri)
}

publisher.Connect()

return publisher
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
ConsumerUri []string
Publishers int
Consumers int
SpreadConnections bool
PublishCount int
ConsumeCount int
PublishTo string
Expand Down
4 changes: 2 additions & 2 deletions pkg/mqtt_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func NewConsumer(cfg config.Config, id int) *MqttConsumer {
}).
SetProtocolVersion(4)

for _, uri := range cfg.ConsumerUri {
parsedUri := utils.ParseURI(uri, "mqtt", "1883")
for _, n := range utils.WrappedSequence(len(cfg.ConsumerUri), id-1) {
parsedUri := utils.ParseURI(cfg.ConsumerUri[n], "mqtt", "1883")
opts.AddBroker(parsedUri.Broker).
SetUsername(parsedUri.Username).
SetPassword(parsedUri.Password)
Expand Down
14 changes: 7 additions & 7 deletions pkg/mqtt_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ type MqttPublisher struct {
msg []byte
}

func NewPublisher(cfg config.Config, n int) *MqttPublisher {
func NewPublisher(cfg config.Config, id int) *MqttPublisher {
var token mqtt.Token

opts := mqtt.NewClientOptions().
SetClientID(fmt.Sprintf("omq-pub-%d", n)).
SetClientID(fmt.Sprintf("omq-pub-%d", id)).
SetAutoReconnect(true).
SetCleanSession(cfg.MqttPublisher.CleanSession).
SetConnectionLostHandler(func(client mqtt.Client, reason error) {
log.Info("connection lost", "protocol", "MQTT", "publisherId", n)
log.Info("connection lost", "protocol", "MQTT", "publisherId", id)
}).
SetProtocolVersion(4)

for _, uri := range cfg.PublisherUri {
parsedUri := utils.ParseURI(uri, "mqtt", "1883")
for _, n := range utils.WrappedSequence(len(cfg.PublisherUri), id-1) {
parsedUri := utils.ParseURI(cfg.PublisherUri[n], "mqtt", "1883")
opts.AddBroker(parsedUri.Broker).
SetUsername(parsedUri.Username).
SetPassword(parsedUri.Password)
Expand All @@ -49,15 +49,15 @@ func NewPublisher(cfg config.Config, n int) *MqttPublisher {
token = connection.Connect()
token.Wait()

topic := topic.CalculateTopic(cfg.PublishTo, n)
topic := topic.CalculateTopic(cfg.PublishTo, id)
// AMQP-1.0 and STOMP allow /exchange/amq.topic/ prefix
// since MQTT has no concept of exchanges, we need to remove it
// this should get more flexible in the future
topic = strings.TrimPrefix(topic, "/exchange/amq.topic/")
topic = strings.TrimPrefix(topic, "/topic/")

return &MqttPublisher{
Id: n,
Id: id,
Connection: connection,
Topic: topic,
Config: cfg,
Expand Down
4 changes: 4 additions & 0 deletions pkg/stomp_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func NewConsumer(cfg config.Config, id int) *StompConsumer {
whichUri: 0,
}

if cfg.SpreadConnections {
consumer.whichUri = id - 1%len(cfg.ConsumerUri)
}

consumer.Connect()

return consumer
Expand Down
4 changes: 4 additions & 0 deletions pkg/stomp_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func NewPublisher(cfg config.Config, id int) *StompPublisher {
Config: cfg,
}

if cfg.SpreadConnections {
publisher.whichUri = id - 1%len(cfg.PublisherUri)
}

publisher.Connect()

return publisher
Expand Down
13 changes: 13 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,16 @@ func ParseURI(rawURI string, defaultScheme string, defaultPort string) uri {

return *result
}

// generate a sequence of `len` integres starting with `start`
// and wrapped, so that it contains all values from 0 to len-1
func WrappedSequence(len int, start int) []int {
if start > len {
start = start % len
}
seq := make([]int, len)
for i := 0; i < len; i++ {
seq[i] = (start + i) % len
}
return seq
}
23 changes: 23 additions & 0 deletions pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils_test

import (
"strconv"
"testing"

"github.com/rabbitmq/omq/pkg/utils"
Expand Down Expand Up @@ -33,3 +34,25 @@ func TestURIParsing(t *testing.T) {
})
}
}

func TestWrappedSequence(t *testing.T) {
type test struct {
length int
start int
expectedSequence []int
}

tests := []test{
{length: 5, start: 0, expectedSequence: []int{0, 1, 2, 3, 4}},
{length: 5, start: 1, expectedSequence: []int{1, 2, 3, 4, 0}},
{length: 3, start: 1, expectedSequence: []int{1, 2, 0}},
{length: 1, start: 2, expectedSequence: []int{0}},
}

for n, tc := range tests {
t.Run(strconv.Itoa(n), func(t *testing.T) {
assert.Equal(t, tc.expectedSequence, utils.WrappedSequence(tc.length, tc.start))
})
}

}

0 comments on commit aabaab9

Please sign in to comment.