diff --git a/README.md b/README.md index 57ccde7..31caabf 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ ## omq -`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0. -It is developed mostly for RabbitMQ but might be useful for other brokers as well (some tests against ActiveMQ -were performed). +`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0 +and partially AMQP 0.9.1 (only for publishing). It is developed mostly for RabbitMQ but might be useful for other brokers +as well (some tests against ActiveMQ were performed). `omq` starts a group of publishers and a group of consumers, in both cases all publishers/consumers are identical, except for the target terminus/queue/routing key, which may be slightly different. The publishers can use @@ -65,6 +65,12 @@ For convenience, if either `--publish-to` or `--consume-from` starts with `/exch will remove that prefix. RabbitMQ only allows using a single topic exchange with MQTT (`amq.topic` by default), so this prefix doesn't make much sense. Removing it makes it easier to use the same parameters across protocols. +AMQP 0.9.1 publishers use the same target address as AMQP 1.0: +* `/queues/foo` will publish to the default exchange with the routing key `foo` +* `/exchange/bar` will publish to the `bar` exchange with an empty routing key +* `/exchange/bar/baz` will publish to the `bar` exchange with the routing key `baz` +* any `--publish-to` value that doesn't match any of the above formats is treated as a routing key for the default exchange + Read more about how RabbitMQ handles sources and targets in different protocols: * [AMQP 1.0](https://www.rabbitmq.com/docs/amqp#address-v1) format used by RabbitMQ 3.x * [AMQP 1.0](https://www.rabbitmq.com/docs/amqp#address-v2) format used by RabbitMQ 4.0+ (the old format is still supported but deprecated) @@ -127,53 +133,5 @@ messages published with perf-test can be consumed by `omq` or vice versa, and th ### Options -``` - --amqp-app-property stringArray AMQP application properties, eg. key1=val1,val2 - --amqp-app-property-filter stringArray AMQP application property filters, eg. key1=&p:prefix - --binding-key string AMQP 1.0 consumer binding key - --amqp-property-filter stringArray AMQP property filters, eg. subject=foo - --amqp-reject-rate int Rate of messages to reject (0-100%) - --amqp-release-rate int Rate of messages to release without accepting (0-100%) - --amqp-subject strings AMQP 1.0 message subject(s), eg. foo,bar,baz - --amqp-to strings AMQP 1.0 message To field (required for the anonymous terminus) - --mqtt-consumer-version int MQTT consumer protocol version (3, 4 or 5; default=5) (default 5) - --mqtt-publisher-clean-session MQTT publisher clean session (default true) - --mqtt-publisher-qos int MQTT publisher QoS level (0, 1 or 2; default=0) - --mqtt-publisher-version int MQTT consumer protocol version (3, 4 or 5; default=5) (default 5) - --binding-key string Binding key for queue declarations - --cleanup-queues Delete the queues at the end (omq only deletes the queues it explicitly declared) - -D, --cmessages int The number of messages to consume per consumer (default=MaxInt) (default 9223372036854775807) - -T, --consume-from string The queue/topic/terminus to consume from (%d will be replaced with the consumer's id) (default "/queues/omq-%d") - --consumer-credits int AMQP-1.0 consumer credits / STOMP prefetch count (default 1) - --consumer-id string Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random) (default "omq-consumer-%d") - -L, --consumer-latency duration consumer latency (time to accept message) - --consumer-priority int32 Consumer priority - --consumer-startup-delay duration Delay consumer startup to allow a backlog of messages to build up (eg. 10s) - --consumer-uri strings URI for consuming - -y, --consumers int The number of consumers to start (default 1) - --expected-instances int The number of instances to synchronize (default 1) - --expected-instances-endpoint string The DNS name that will return members to synchronize with - -h, --help help for omq - -l, --log-level log-level Log level (debug, info, error) (default info) - --log-out-of-order-messages Print a log line when a message is received that is older than the previously received message - -c, --max-in-flight int Maximum number of in-flight messages per publisher (default 1) - -d, --message-durability Mark messages as durable (default true) - --message-priority string Message priority (0-255, default=unset) - --message-ttl duration Message TTL (not set by default) - --metric-tags strings Prometheus label-value pairs, eg. l1=v1,l2=v2 - -C, --pmessages int The number of messages to send per publisher (default 9223372036854775807) - --print-all-metrics Print all metrics before exiting - -t, --publish-to string The topic/terminus to publish to (%d will be replaced with the publisher's id) (default "/queues/omq-%d") - --publisher-id string Client ID for AMQP and MQTT publishers (%d => consumer's id, %r => random) (default "omq-publisher-%d") - --publisher-uri strings URI for publishing - -x, --publishers int The number of publishers to start (default 1) - --queue-durability queue-durability Queue durability (default: configuration - the queue definition is durable) (default configuration) - --queues predeclared Type of queues to declare (or predeclared to use existing queues) (default predeclared) - -r, --rate float32 Messages per second (-1 = unlimited) (default -1) - -s, --size int Message payload size in bytes (default 12) - --spread-connections Spread connections across URIs (default true) - --stream-offset string Stream consumer offset specification (default=next) - -z, --time duration Run duration (eg. 10s, 5m, 2h) - --uri strings URI for both publishers and consumers - -m, --use-millis Use milliseconds for timestamps (automatically enabled when no publishers or no consumers) -``` +Use `omq --help` for the full list of options. Keep in mind that some options are protocol-specific and therefore will only +be printed with the corresponding subcommand. For example `omq mqtt --help` will additionally show MQTT-specific options. diff --git a/cmd/root.go b/cmd/root.go index 0198c66..2fea272 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -32,16 +32,20 @@ import ( var cfg config.Config var ( - amqp_amqp = &cobra.Command{} - amqp_stomp = &cobra.Command{} - amqp_mqtt = &cobra.Command{} - stomp_stomp = &cobra.Command{} - stomp_amqp = &cobra.Command{} - stomp_mqtt = &cobra.Command{} - mqtt_mqtt = &cobra.Command{} - mqtt_amqp = &cobra.Command{} - mqtt_stomp = &cobra.Command{} - versionCmd = &cobra.Command{} + amqp_amqp = &cobra.Command{} + amqp_stomp = &cobra.Command{} + amqp_mqtt = &cobra.Command{} + stomp_stomp = &cobra.Command{} + stomp_amqp = &cobra.Command{} + stomp_mqtt = &cobra.Command{} + mqtt_mqtt = &cobra.Command{} + mqtt_amqp = &cobra.Command{} + mqtt_stomp = &cobra.Command{} + amqp091_amqp091 = &cobra.Command{} + amqp091_amqp = &cobra.Command{} + amqp091_mqtt = &cobra.Command{} + amqp091_stomp = &cobra.Command{} + versionCmd = &cobra.Command{} ) var ( @@ -206,6 +210,43 @@ func RootCmd() *cobra.Command { } mqtt_stomp.Flags().AddFlagSet(mqttPublisherFlags) + amqp091_amqp091 = &cobra.Command{ + Use: "amqp091-amqp091", + Aliases: []string{"amqp091"}, + Run: func(cmd *cobra.Command, args []string) { + cfg.PublisherProto = config.AMQP091 + cfg.ConsumerProto = config.AMQP091 + start(cfg) + }, + } + + amqp091_amqp = &cobra.Command{ + Use: "amqp091-amqp", + Run: func(cmd *cobra.Command, args []string) { + cfg.PublisherProto = config.AMQP091 + cfg.ConsumerProto = config.AMQP + start(cfg) + }, + } + + amqp091_mqtt = &cobra.Command{ + Use: "amqp091-mqtt", + Run: func(cmd *cobra.Command, args []string) { + cfg.PublisherProto = config.AMQP091 + cfg.ConsumerProto = config.MQTT + start(cfg) + }, + } + + amqp091_stomp = &cobra.Command{ + Use: "amqp091-stomp", + Run: func(cmd *cobra.Command, args []string) { + cfg.PublisherProto = config.AMQP091 + cfg.ConsumerProto = config.STOMP + start(cfg) + }, + } + versionCmd = &cobra.Command{ Use: "version", Run: func(cmd *cobra.Command, args []string) { @@ -320,6 +361,10 @@ func RootCmd() *cobra.Command { rootCmd.AddCommand(mqtt_mqtt) rootCmd.AddCommand(mqtt_amqp) rootCmd.AddCommand(mqtt_stomp) + rootCmd.AddCommand(amqp091_amqp091) + rootCmd.AddCommand(amqp091_amqp) + rootCmd.AddCommand(amqp091_mqtt) + rootCmd.AddCommand(amqp091_stomp) rootCmd.AddCommand(versionCmd) return rootCmd @@ -502,7 +547,7 @@ func startPublishers(ctx context.Context, wg *sync.WaitGroup, startPublishing ch // then we close the channel to allow all of them to start publishing "at once" // but each publisher sleeps for a random sub-second time, to avoid burts of // messages being published - p.Start(ctx, publisherReady, startPublishing) + p.Start(publisherReady, startPublishing) }() <-publisherReady } @@ -552,6 +597,8 @@ func defaultUri(proto string) string { switch proto { case "amqp": uri = "amqp://localhost/" + case "amqp091": + uri = "amqp://localhost/" case "stomp": uri = "stomp://localhost:61613" case "mqtt": diff --git a/go.mod b/go.mod index 8ff05c9..79aa67e 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/muesli/termenv v0.15.2 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect diff --git a/go.sum b/go.sum index c70a237..0ae6b21 100644 --- a/go.sum +++ b/go.sum @@ -184,6 +184,8 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241121093408-1a6679a20123 h1:vwJUkrY81ekNGCh2xN+ii16ZSRfsmClJkJhHGimiocA= github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241121093408-1a6679a20123/go.mod h1:Km231GyOZAw9I3SZIqkfB9VVzCsu8jvFWYdghmnwueM= github.com/relvacode/iso8601 v1.6.0 h1:eFXUhMJN3Gz8Rcq82f9DTMW0svjtAVuIEULglM7QHTU= diff --git a/main_test.go b/main_test.go index 11f795e..406d3f7 100644 --- a/main_test.go +++ b/main_test.go @@ -108,7 +108,7 @@ var _ = Describe("OMQ CLI", func() { Eventually(session.Err).Should(gbytes.Say(`TOTAL CONSUMED messages=1`)) Eventually(session).Should(gbytes.Say(`omq_messages_consumed_total{priority="normal"} 1`)) }, - Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"), // https://github.com/Azure/go-amqp/issues/313 + Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"), Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"), Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"), Entry("amqp -> stomp", "amqp", "/exchanges/amq.topic/", "stomp", "/topic/"), @@ -117,6 +117,9 @@ var _ = Describe("OMQ CLI", func() { Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"), Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"), Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"), + Entry("amqp091 -> amqp", "amqp091", "/queues/", "amqp", "/queues/"), + Entry("amqp091 -> mqtt", "amqp091", "/exchanges/amq.topic/", "mqtt", "/topic/"), + Entry("amqp091 -> stomp", "amqp091", "/exchanges/amq.topic/", "stomp", "/topic/"), ) DescribeTable("supports message priorities for AMQP and STOMP", diff --git a/pkg/amqp091/publisher.go b/pkg/amqp091/publisher.go new file mode 100644 index 0000000..9908ee4 --- /dev/null +++ b/pkg/amqp091/publisher.go @@ -0,0 +1,228 @@ +package amqp091 + +import ( + "context" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + amqp091 "github.com/rabbitmq/amqp091-go" + "github.com/rabbitmq/omq/pkg/config" + "github.com/rabbitmq/omq/pkg/log" + "github.com/rabbitmq/omq/pkg/metrics" + "github.com/rabbitmq/omq/pkg/utils" +) + +type Amqp091Publisher struct { + Id int + Connection *amqp091.Connection + Channel *amqp091.Channel + confirms chan amqp091.Confirmation + publishTimes map[uint64]time.Time + publishTimesLock sync.Mutex + exchange string + routingKey string + Config config.Config + msg []byte + whichUri int + ctx context.Context +} + +func NewPublisher(ctx context.Context, cfg config.Config, id int) *Amqp091Publisher { + exchange, routingKey := parseExchangeAndRoutingKey(cfg.PublishTo) + publisher := &Amqp091Publisher{ + Id: id, + Connection: nil, + Config: cfg, + exchange: exchange, + routingKey: utils.InjectId(routingKey, id), + publishTimes: make(map[uint64]time.Time), + whichUri: 0, + ctx: ctx, + } + + if cfg.SpreadConnections { + publisher.whichUri = (id - 1) % len(cfg.PublisherUri) + } + + publisher.Connect() + + return publisher +} + +func (p *Amqp091Publisher) Connect() { + var conn *amqp091.Connection + var err error + + if p.Connection != nil { + _ = p.Connection.Close() + } + p.Connection = nil + + for p.Connection == nil { + if p.whichUri >= len(p.Config.PublisherUri) { + p.whichUri = 0 + } + uri := p.Config.PublisherUri[p.whichUri] + p.whichUri++ + conn, err = amqp091.Dial(uri) + if err != nil { + log.Error("connection failed", "id", p.Id, "error", err.Error()) + select { + case <-time.After(1 * time.Second): + continue + case <-p.ctx.Done(): + return + } + } else { + log.Debug("publisher connected", "id", p.Id, "uri", uri) + p.Connection = conn + } + } + for p.Channel, err = p.Connection.Channel(); err != nil; { + log.Error("channel creation failed", "id", p.Id, "error", err.Error()) + time.Sleep(1 * time.Second) + } + p.confirms = make(chan amqp091.Confirmation) // p.Config.MaxInFlight ? + _ = p.Channel.Confirm(false) + p.Channel.NotifyPublish(p.confirms) +} + +func (p *Amqp091Publisher) Start(publisherReady chan bool, startPublishing chan bool) { + p.msg = utils.MessageBody(p.Config.Size) + + close(publisherReady) + + select { + case <-p.ctx.Done(): + return + case <-startPublishing: + // short random delay to avoid all publishers publishing at the same time + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + } + + log.Info("publisher started", "id", p.Id, "rate", utils.Rate(p.Config.Rate), "exchange", p.exchange, "routing key", p.routingKey) + var farewell string + if p.Config.Rate == 0 { + // idle connection + <-p.ctx.Done() + farewell = "context cancelled" + } else { + farewell = p.StartPublishing() + } + p.Stop(farewell) +} + +func (p *Amqp091Publisher) StartPublishing() string { + limiter := utils.RateLimiter(p.Config.Rate) + + go p.handleConfirms() + + var msgSent atomic.Int64 + for { + select { + case <-p.ctx.Done(): + return "context cancelled" + default: + n := uint64(msgSent.Add(1)) + if n > uint64(p.Config.PublishCount) { + return "--pmessages value reached" + } + if p.Config.Rate > 0 { + _ = limiter.Wait(p.ctx) + } + err := p.SendAsync(n) + if err != nil { + p.Connect() + } else { + metrics.MessagesPublished.Inc() + log.Debug("message sent", "id", p.Id, "deliveryTag", n) + } + } + } +} + +func (p *Amqp091Publisher) SendAsync(n uint64) error { + msg := p.prepareMessage() + + p.setPublishTime(n, time.Now()) + err := p.Channel.PublishWithContext(p.ctx, p.exchange, p.routingKey, false, false, msg) + return err +} + +func (p *Amqp091Publisher) handleConfirms() { + for confirm := range p.confirms { + if confirm.Ack { + pubTime := p.getPublishTime(confirm.DeliveryTag) + latency := time.Since(pubTime) + metrics.PublishingLatency.Update(latency.Seconds()) + metrics.MessagesConfirmed.Inc() + log.Debug("message confirmed", "id", p.Id, "delivery_tag", confirm.DeliveryTag, "latency", latency) + } else { + if confirm.DeliveryTag == 0 { + log.Debug("handleConfirms completed (channel closed)") + return + } + _ = p.getPublishTime(confirm.DeliveryTag) + log.Debug("message not confirmed by the broker", "id", p.Id, "delivery_tag", confirm.DeliveryTag) + } + } +} + +func (p *Amqp091Publisher) Stop(reason string) { + log.Debug("closing publisher connection", "id", p.Id, "reason", reason) + for len(p.publishTimes) > 0 { + time.Sleep(100 * time.Millisecond) + } + if p.Channel != nil { + _ = p.Channel.Close() + } + if p.Connection != nil { + _ = p.Connection.Close() + } +} + +func (p *Amqp091Publisher) prepareMessage() amqp091.Publishing { + utils.UpdatePayload(p.Config.UseMillis, &p.msg) + return amqp091.Publishing{ + DeliveryMode: amqp091.Persistent, + Body: p.msg, + } +} + +func (p *Amqp091Publisher) setPublishTime(deliveryTag uint64, t time.Time) { + p.publishTimesLock.Lock() + p.publishTimes[deliveryTag] = t + p.publishTimesLock.Unlock() +} + +func (p *Amqp091Publisher) getPublishTime(deliveryTag uint64) time.Time { + p.publishTimesLock.Lock() + t := p.publishTimes[deliveryTag] + delete(p.publishTimes, deliveryTag) + p.publishTimesLock.Unlock() + return t +} + +func parseExchangeAndRoutingKey(target string) (string, string) { + exchange := "" + routingKey := target + + parts := strings.SplitN(target, "/", 4) + + switch { + case len(parts) == 3 && parts[0] == "" && parts[1] == "queues": + // /queues/queueName -> default exchange and the queueName as routing key + routingKey = parts[2] + case len(parts) == 4 && parts[0] == "" && parts[1] == "exchanges": + exchange = parts[2] + routingKey = parts[3] + case len(parts) == 3 && parts[0] == "" && parts[1] == "exchanges": + exchange = parts[2] + routingKey = "" + } + + return exchange, routingKey +} diff --git a/pkg/amqp10/publisher.go b/pkg/amqp10/publisher.go index 7e99c2d..65fd425 100644 --- a/pkg/amqp10/publisher.go +++ b/pkg/amqp10/publisher.go @@ -149,7 +149,7 @@ func (p *Amqp10Publisher) CreateSender() { } } -func (p *Amqp10Publisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) { +func (p *Amqp10Publisher) Start(publisherReady chan bool, startPublishing chan bool) { p.msg = utils.MessageBody(p.Config.Size) var err error @@ -163,7 +163,7 @@ func (p *Amqp10Publisher) Start(ctx context.Context, publisherReady chan bool, s close(publisherReady) select { - case <-ctx.Done(): + case <-p.ctx.Done(): return case <-startPublishing: // short random delay to avoid all publishers publishing at the same time @@ -174,28 +174,28 @@ func (p *Amqp10Publisher) Start(ctx context.Context, publisherReady chan bool, s var farewell string if p.Config.Rate == 0 { // idle connection - <-ctx.Done() + <-p.ctx.Done() farewell = "context cancelled" } else { - farewell = p.StartPublishing(ctx) + farewell = p.StartPublishing() } p.Stop(farewell) } -func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string { +func (p *Amqp10Publisher) StartPublishing() string { limiter := utils.RateLimiter(p.Config.Rate) var msgSent atomic.Int64 for { select { - case <-ctx.Done(): + case <-p.ctx.Done(): return "context cancelled" default: if msgSent.Add(1) > int64(p.Config.PublishCount) { return "--pmessages value reached" } if p.Config.Rate > 0 { - _ = limiter.Wait(ctx) + _ = limiter.Wait(p.ctx) } p.poolWg.Add(1) _ = p.pool.Submit(func() { @@ -204,9 +204,9 @@ func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string { }() var err error if p.Config.Amqp.SendSettled { - err = p.SendSync(ctx) + err = p.SendSync() } else { - err = p.SendAsync(ctx) + err = p.SendAsync() } if err != nil { p.Connect() @@ -216,20 +216,20 @@ func (p *Amqp10Publisher) StartPublishing(ctx context.Context) string { } } -func (p *Amqp10Publisher) SendAsync(ctx context.Context) error { +func (p *Amqp10Publisher) SendAsync() error { msg := p.prepareMessage() startTime := time.Now() if p.Sender == nil { return errors.New("sender is nil") } - receipt, err := p.Sender.SendWithReceipt(ctx, msg, nil) + receipt, err := p.Sender.SendWithReceipt(p.ctx, msg, nil) if err != nil { select { - case <-ctx.Done(): + case <-p.ctx.Done(): return nil default: - err = p.handleSendErrors(ctx, err) + err = p.handleSendErrors(p.ctx, err) if err != nil { return err } @@ -240,7 +240,7 @@ func (p *Amqp10Publisher) SendAsync(ctx context.Context) error { return nil } -func (p *Amqp10Publisher) SendSync(ctx context.Context) error { +func (p *Amqp10Publisher) SendSync() error { msg := p.prepareMessage() startTime := time.Now() if p.Sender == nil { @@ -249,7 +249,7 @@ func (p *Amqp10Publisher) SendSync(ctx context.Context) error { err := p.Sender.Send(context.TODO(), msg, nil) latency := time.Since(startTime) log.Debug("message sent", "id", p.Id, "destination", p.Terminus, "latency", latency, "appProps", msg.ApplicationProperties) - err = p.handleSendErrors(ctx, err) + err = p.handleSendErrors(p.ctx, err) if err != nil { return err } diff --git a/pkg/common/common.go b/pkg/common/common.go index 4c6b9e0..fb16be4 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/rabbitmq/omq/pkg/amqp091" "github.com/rabbitmq/omq/pkg/amqp10" "github.com/rabbitmq/omq/pkg/config" "github.com/rabbitmq/omq/pkg/mqtt" @@ -11,7 +12,7 @@ import ( ) type Publisher interface { - Start(context.Context, chan bool, chan bool) + Start(chan bool, chan bool) } type Consumer interface { @@ -26,6 +27,12 @@ func NewPublisher(ctx context.Context, cfg config.Config, id int) (Publisher, er return nil, fmt.Errorf("failed to create an AMQP-1.0 publisher") } return p, nil + case config.AMQP091: + p := amqp091.NewPublisher(ctx, cfg, id) + if p == nil { + return nil, fmt.Errorf("failed to create an AMQP-0.9.1 publisher") + } + return p, nil case config.STOMP: p := stomp.NewPublisher(ctx, cfg, id) if p == nil { diff --git a/pkg/config/config.go b/pkg/config/config.go index e1be980..e79aeb0 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -10,6 +10,7 @@ type Protocol int const ( AMQP Protocol = iota + AMQP091 STOMP MQTT MQTT5 diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 728336c..ded4564 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -34,6 +34,7 @@ var metricsServer *MetricsServer var ( CommandLineArgs *vmetrics.Gauge MessagesPublished *vmetrics.Counter + MessagesConfirmed *vmetrics.Counter MessagesConsumedNormalPriority *vmetrics.Counter MessagesConsumedHighPriority *vmetrics.Counter MessagesConsumedOutOfOrderNormalPriority *vmetrics.Counter @@ -88,6 +89,7 @@ func registerMetrics(globalLabels map[string]string) { highPriorityLabels := labelsToString(high) MessagesPublished = vmetrics.GetOrCreateCounter("omq_messages_published_total" + labelsToString(globalLabels)) + MessagesConfirmed = vmetrics.GetOrCreateCounter("omq_messages_confirmed_total" + labelsToString(globalLabels)) MessagesConsumedNormalPriority = vmetrics.GetOrCreateCounter(`omq_messages_consumed_total` + normalPriorityLabels) MessagesConsumedHighPriority = vmetrics.GetOrCreateCounter(`omq_messages_consumed_total` + highPriorityLabels) MessagesConsumedOutOfOrderNormalPriority = vmetrics.GetOrCreateCounter(`omq_messages_consumed_out_of_order` + normalPriorityLabels) @@ -182,6 +184,7 @@ func (m *MetricsServer) Stop() { func (m *MetricsServer) PrintSummary() { log.Print("TOTAL PUBLISHED", "messages", MessagesPublished.Get(), + "confirmed", MessagesConfirmed.Get(), "rate", fmt.Sprintf("%.2f/s", float64(MessagesPublished.Get())/time.Since(m.started).Seconds())) log.Print("TOTAL CONSUMED", "messages", MessagesConsumedNormalPriority.Get()+MessagesConsumedHighPriority.Get(), diff --git a/pkg/mgmt/mgmt.go b/pkg/mgmt/mgmt.go index edf18b3..3b4b190 100644 --- a/pkg/mgmt/mgmt.go +++ b/pkg/mgmt/mgmt.go @@ -74,7 +74,7 @@ func (m *Mgmt) connection() rmq.IConnection { func (m *Mgmt) DeclareQueues(cfg config.Config) { log.Info("Declaring queues...") // declare queues for AMQP publishers - if cfg.PublisherProto == config.AMQP && strings.HasPrefix(cfg.PublishTo, "/queues/") { + if (cfg.PublisherProto == config.AMQP || cfg.PublisherProto == config.AMQP091) && strings.HasPrefix(cfg.PublishTo, "/queues/") { queueName := strings.TrimPrefix(cfg.PublishTo, "/queues/") for i := 1; i <= cfg.Publishers; i++ { m.DeclareAndBind(cfg, utils.InjectId(queueName, i), i) @@ -195,6 +195,27 @@ func parsePublishTo(proto config.Protocol, publishTo string, id int) (string, st } } + if proto == config.AMQP091 { + if parts[1] == "queues" && len(parts) == 3 { + exchange = "amq.default" + routingKey = parts[2] + } + + if parts[1] == "exchanges" && len(parts) == 3 { + exchange = parts[2] + } + + if parts[1] == "exchanges" && len(parts) == 4 { + exchange = parts[2] + routingKey = parts[3] + } + + if len(parts) == 2 { + exchange = "amq.default" + routingKey = parts[1] + } + } + if proto == config.MQTT { exchange = "amq.topic" routingKey = strings.TrimPrefix(publishTo, "/topic/") diff --git a/pkg/mqtt/common.go b/pkg/mqtt/common.go index f3cb814..11d604b 100644 --- a/pkg/mqtt/common.go +++ b/pkg/mqtt/common.go @@ -14,7 +14,7 @@ type Consumer interface { } type Publisher interface { - Start(context.Context, chan bool, chan bool) + Start(chan bool, chan bool) } func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer { @@ -27,9 +27,9 @@ func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer { func NewPublisher(ctx context.Context, cfg config.Config, id int) Publisher { if cfg.MqttPublisher.Version == 5 { - return NewMqtt5Publisher(cfg, id) + return NewMqtt5Publisher(ctx, cfg, id) } else { - return NewMqttPublisher(cfg, id) + return NewMqttPublisher(ctx, cfg, id) } } diff --git a/pkg/mqtt/mqtt_client_test.go b/pkg/mqtt/mqtt_client_test.go index 3ba7268..cf4148e 100644 --- a/pkg/mqtt/mqtt_client_test.go +++ b/pkg/mqtt/mqtt_client_test.go @@ -1,6 +1,8 @@ package mqtt import ( + "context" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/rabbitmq/omq/pkg/config" @@ -16,7 +18,7 @@ var _ = Context("MQTT 3.1/3.1.1 client", func() { CleanSession: true, }, } - publisher := NewMqttPublisher(cfg, 1) + publisher := NewMqttPublisher(context.Background(), cfg, 1) opts := publisher.connectionOptions() Expect(opts.ClientID).To(Equal("my-client-id-1")) Expect(opts.Servers[0].Host).To(Equal("publisher:1883")) @@ -32,7 +34,7 @@ var _ = Context("MQTT 3.1/3.1.1 client", func() { CleanSession: true, }, } - publisher := NewMqtt5Publisher(cfg, 1) + publisher := NewMqtt5Publisher(context.Background(), cfg, 1) opts := publisher.connectionOptions() Expect(opts.ClientID).To(Equal("my-client-id-1")) Expect(opts.ServerUrls[0].Host).To(Equal("publisher:1883")) diff --git a/pkg/mqtt/publisher.go b/pkg/mqtt/publisher.go index c912f99..241bb89 100644 --- a/pkg/mqtt/publisher.go +++ b/pkg/mqtt/publisher.go @@ -19,20 +19,22 @@ type MqttPublisher struct { Connection mqtt.Client Topic string Config config.Config + ctx context.Context msg []byte } -func NewMqttPublisher(cfg config.Config, id int) MqttPublisher { +func NewMqttPublisher(ctx context.Context, cfg config.Config, id int) MqttPublisher { topic := publisherTopic(cfg.PublishTo, id) return MqttPublisher{ Id: id, Connection: nil, Topic: topic, Config: cfg, + ctx: ctx, } } -func (p *MqttPublisher) Connect(ctx context.Context) { +func (p *MqttPublisher) Connect() { var token mqtt.Token opts := p.connectionOptions() @@ -45,7 +47,7 @@ func (p *MqttPublisher) Connect(ctx context.Context) { } log.Error("publisher connection failed", "id", p.Id, "error", token.Error()) select { - case <-ctx.Done(): + case <-p.ctx.Done(): return case <-time.After(1 * time.Second): continue @@ -77,21 +79,21 @@ func (p MqttPublisher) connectionOptions() *mqtt.ClientOptions { return opts } -func (p MqttPublisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) { +func (p MqttPublisher) Start(publisherReady chan bool, startPublishing chan bool) { defer func() { if p.Connection != nil { p.Connection.Disconnect(250) } }() - p.Connect(ctx) + p.Connect() p.msg = utils.MessageBody(p.Config.Size) close(publisherReady) select { - case <-ctx.Done(): + case <-p.ctx.Done(): return case <-startPublishing: // short random delay to avoid all publishers publishing at the same time @@ -103,28 +105,28 @@ func (p MqttPublisher) Start(ctx context.Context, publisherReady chan bool, star var farewell string if p.Config.Rate == 0 { // idle connection - <-ctx.Done() + <-p.ctx.Done() farewell = "context cancelled" } else { - farewell = p.StartPublishing(ctx) + farewell = p.StartPublishing() } p.Stop(farewell) } -func (p MqttPublisher) StartPublishing(ctx context.Context) string { +func (p MqttPublisher) StartPublishing() string { limiter := utils.RateLimiter(p.Config.Rate) var msgSent atomic.Int64 for { select { - case <-ctx.Done(): + case <-p.ctx.Done(): return "time limit reached" default: if msgSent.Add(1) > int64(p.Config.PublishCount) { return "--pmessages value reached" } if p.Config.Rate > 0 { - _ = limiter.Wait(ctx) + _ = limiter.Wait(p.ctx) } p.Send() } diff --git a/pkg/mqtt/publisher_v5.go b/pkg/mqtt/publisher_v5.go index 736db9f..231f28e 100644 --- a/pkg/mqtt/publisher_v5.go +++ b/pkg/mqtt/publisher_v5.go @@ -21,26 +21,28 @@ type Mqtt5Publisher struct { Connection *autopaho.ConnectionManager Topic string Config config.Config + ctx context.Context msg []byte } -func NewMqtt5Publisher(cfg config.Config, id int) Mqtt5Publisher { +func NewMqtt5Publisher(ctx context.Context, cfg config.Config, id int) Mqtt5Publisher { topic := publisherTopic(cfg.PublishTo, id) return Mqtt5Publisher{ Id: id, Connection: nil, Topic: topic, Config: cfg, + ctx: ctx, } } -func (p *Mqtt5Publisher) Connect(ctx context.Context) { +func (p *Mqtt5Publisher) Connect() { opts := p.connectionOptions() - connection, err := autopaho.NewConnection(ctx, opts) + connection, err := autopaho.NewConnection(p.ctx, opts) if err != nil { log.Error("publisher connection failed", "id", p.Id, "error", err) } - err = connection.AwaitConnection(ctx) + err = connection.AwaitConnection(p.ctx) if err != nil { // AwaitConnection only returns an error if the context is cancelled return @@ -78,15 +80,15 @@ func (p Mqtt5Publisher) connectionOptions() autopaho.ClientConfig { return opts } -func (p Mqtt5Publisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) { - p.Connect(ctx) +func (p Mqtt5Publisher) Start(publisherReady chan bool, startPublishing chan bool) { + p.Connect() p.msg = utils.MessageBody(p.Config.Size) close(publisherReady) select { - case <-ctx.Done(): + case <-p.ctx.Done(): return case <-startPublishing: // short random delay to avoid all publishers publishing at the same time @@ -98,10 +100,10 @@ func (p Mqtt5Publisher) Start(ctx context.Context, publisherReady chan bool, sta var farewell string if p.Config.Rate == 0 { // idle connection - <-ctx.Done() + <-p.ctx.Done() farewell = "context cancelled" } else { - farewell = p.StartPublishing(ctx) + farewell = p.StartPublishing() } // TODO it seems that sometimes if we stop quickly after sending // a message, this message is not delivered, even though Publish @@ -110,30 +112,30 @@ func (p Mqtt5Publisher) Start(ctx context.Context, publisherReady chan bool, sta p.Stop(farewell) } -func (p Mqtt5Publisher) StartPublishing(ctx context.Context) string { +func (p Mqtt5Publisher) StartPublishing() string { limiter := utils.RateLimiter(p.Config.Rate) var msgSent atomic.Int64 for { select { - case <-ctx.Done(): + case <-p.ctx.Done(): return "time limit reached" default: if msgSent.Add(1) > int64(p.Config.PublishCount) { return "--pmessages value reached" } if p.Config.Rate > 0 { - _ = limiter.Wait(ctx) + _ = limiter.Wait(p.ctx) } - p.Send(ctx) + p.Send() } } } -func (p Mqtt5Publisher) Send(ctx context.Context) { +func (p Mqtt5Publisher) Send() { utils.UpdatePayload(p.Config.UseMillis, &p.msg) startTime := time.Now() - _, err := p.Connection.Publish(ctx, &paho.Publish{ + _, err := p.Connection.Publish(p.ctx, &paho.Publish{ QoS: byte(p.Config.MqttPublisher.QoS), Topic: p.Topic, Payload: p.msg, diff --git a/pkg/stomp/publisher.go b/pkg/stomp/publisher.go index 20b26cc..b66753c 100644 --- a/pkg/stomp/publisher.go +++ b/pkg/stomp/publisher.go @@ -21,6 +21,7 @@ type StompPublisher struct { Connection *stomp.Conn Topic string Config config.Config + ctx context.Context msg []byte whichUri int } @@ -31,6 +32,7 @@ func NewPublisher(ctx context.Context, cfg config.Config, id int) *StompPublishe Connection: nil, Topic: utils.InjectId(cfg.PublishTo, id), Config: cfg, + ctx: ctx, } if cfg.SpreadConnections { @@ -71,13 +73,13 @@ func (p *StompPublisher) Connect() { } } -func (p *StompPublisher) Start(ctx context.Context, publisherReady chan bool, startPublishing chan bool) { +func (p *StompPublisher) Start(publisherReady chan bool, startPublishing chan bool) { p.msg = utils.MessageBody(p.Config.Size) close(publisherReady) select { - case <-ctx.Done(): + case <-p.ctx.Done(): return case <-startPublishing: // short random delay to avoid all publishers publishing at the same time @@ -89,28 +91,28 @@ func (p *StompPublisher) Start(ctx context.Context, publisherReady chan bool, st var farewell string if p.Config.Rate == 0 { // idle connection - <-ctx.Done() + <-p.ctx.Done() farewell = "context cancelled" } else { - farewell = p.StartPublishing(ctx) + farewell = p.StartPublishing() } p.Stop(farewell) } -func (p *StompPublisher) StartPublishing(ctx context.Context) string { +func (p *StompPublisher) StartPublishing() string { limiter := utils.RateLimiter(p.Config.Rate) var msgSent atomic.Int64 for { select { - case <-ctx.Done(): + case <-p.ctx.Done(): return "context cancelled" default: if msgSent.Add(1) > int64(p.Config.PublishCount) { return "--pmessages value reached" } if p.Config.Rate > 0 { - _ = limiter.Wait(ctx) + _ = limiter.Wait(p.ctx) } err := p.Send() if err != nil {