Skip to content

Commit

Permalink
AMQP 0.9.1 publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Jan 2, 2025
1 parent b7a616b commit f343540
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 120 deletions.
64 changes: 11 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## omq

`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0.
It is developed mostly for RabbitMQ but might be useful for other brokers as well (some tests against ActiveMQ
were performed).
`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0
and partially AMQP 0.9.1 (only for publishing). It is developed mostly for RabbitMQ but might be useful for other brokers
as well (some tests against ActiveMQ were performed).

`omq` starts a group of publishers and a group of consumers, in both cases all publishers/consumers are identical,
except for the target terminus/queue/routing key, which may be slightly different. The publishers can use
Expand Down Expand Up @@ -65,6 +65,12 @@ For convenience, if either `--publish-to` or `--consume-from` starts with `/exch
will remove that prefix. RabbitMQ only allows using a single topic exchange with MQTT (`amq.topic` by default), so this prefix doesn't make
much sense. Removing it makes it easier to use the same parameters across protocols.

AMQP 0.9.1 publishers use the same target address as AMQP 1.0:
* `/queues/foo` will publish to the default exchange with the routing key `foo`
* `/exchange/bar` will publish to the `bar` exchange with an empty routing key
* `/exchange/bar/baz` will publish to the `bar` exchange with the routing key `baz`
* any `--publish-to` value that doesn't match any of the above formats is treated as a routing key for the default exchange

Read more about how RabbitMQ handles sources and targets in different protocols:
* [AMQP 1.0](https://www.rabbitmq.com/docs/amqp#address-v1) format used by RabbitMQ 3.x
* [AMQP 1.0](https://www.rabbitmq.com/docs/amqp#address-v2) format used by RabbitMQ 4.0+ (the old format is still supported but deprecated)
Expand Down Expand Up @@ -127,53 +133,5 @@ messages published with perf-test can be consumed by `omq` or vice versa, and th

### Options

```
--amqp-app-property stringArray AMQP application properties, eg. key1=val1,val2
--amqp-app-property-filter stringArray AMQP application property filters, eg. key1=&p:prefix
--binding-key string AMQP 1.0 consumer binding key
--amqp-property-filter stringArray AMQP property filters, eg. subject=foo
--amqp-reject-rate int Rate of messages to reject (0-100%)
--amqp-release-rate int Rate of messages to release without accepting (0-100%)
--amqp-subject strings AMQP 1.0 message subject(s), eg. foo,bar,baz
--amqp-to strings AMQP 1.0 message To field (required for the anonymous terminus)
--mqtt-consumer-version int MQTT consumer protocol version (3, 4 or 5; default=5) (default 5)
--mqtt-publisher-clean-session MQTT publisher clean session (default true)
--mqtt-publisher-qos int MQTT publisher QoS level (0, 1 or 2; default=0)
--mqtt-publisher-version int MQTT consumer protocol version (3, 4 or 5; default=5) (default 5)
--binding-key string Binding key for queue declarations
--cleanup-queues Delete the queues at the end (omq only deletes the queues it explicitly declared)
-D, --cmessages int The number of messages to consume per consumer (default=MaxInt) (default 9223372036854775807)
-T, --consume-from string The queue/topic/terminus to consume from (%d will be replaced with the consumer's id) (default "/queues/omq-%d")
--consumer-credits int AMQP-1.0 consumer credits / STOMP prefetch count (default 1)
--consumer-id string Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random) (default "omq-consumer-%d")
-L, --consumer-latency duration consumer latency (time to accept message)
--consumer-priority int32 Consumer priority
--consumer-startup-delay duration Delay consumer startup to allow a backlog of messages to build up (eg. 10s)
--consumer-uri strings URI for consuming
-y, --consumers int The number of consumers to start (default 1)
--expected-instances int The number of instances to synchronize (default 1)
--expected-instances-endpoint string The DNS name that will return members to synchronize with
-h, --help help for omq
-l, --log-level log-level Log level (debug, info, error) (default info)
--log-out-of-order-messages Print a log line when a message is received that is older than the previously received message
-c, --max-in-flight int Maximum number of in-flight messages per publisher (default 1)
-d, --message-durability Mark messages as durable (default true)
--message-priority string Message priority (0-255, default=unset)
--message-ttl duration Message TTL (not set by default)
--metric-tags strings Prometheus label-value pairs, eg. l1=v1,l2=v2
-C, --pmessages int The number of messages to send per publisher (default 9223372036854775807)
--print-all-metrics Print all metrics before exiting
-t, --publish-to string The topic/terminus to publish to (%d will be replaced with the publisher's id) (default "/queues/omq-%d")
--publisher-id string Client ID for AMQP and MQTT publishers (%d => consumer's id, %r => random) (default "omq-publisher-%d")
--publisher-uri strings URI for publishing
-x, --publishers int The number of publishers to start (default 1)
--queue-durability queue-durability Queue durability (default: configuration - the queue definition is durable) (default configuration)
--queues predeclared Type of queues to declare (or predeclared to use existing queues) (default predeclared)
-r, --rate float32 Messages per second (-1 = unlimited) (default -1)
-s, --size int Message payload size in bytes (default 12)
--spread-connections Spread connections across URIs (default true)
--stream-offset string Stream consumer offset specification (default=next)
-z, --time duration Run duration (eg. 10s, 5m, 2h)
--uri strings URI for both publishers and consumers
-m, --use-millis Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)
```
Use `omq --help` for the full list of options. Keep in mind that some options are protocol-specific and therefore will only
be printed with the corresponding subcommand. For example `omq mqtt --help` will additionally show MQTT-specific options.
69 changes: 58 additions & 11 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ import (

var cfg config.Config
var (
amqp_amqp = &cobra.Command{}
amqp_stomp = &cobra.Command{}
amqp_mqtt = &cobra.Command{}
stomp_stomp = &cobra.Command{}
stomp_amqp = &cobra.Command{}
stomp_mqtt = &cobra.Command{}
mqtt_mqtt = &cobra.Command{}
mqtt_amqp = &cobra.Command{}
mqtt_stomp = &cobra.Command{}
versionCmd = &cobra.Command{}
amqp_amqp = &cobra.Command{}
amqp_stomp = &cobra.Command{}
amqp_mqtt = &cobra.Command{}
stomp_stomp = &cobra.Command{}
stomp_amqp = &cobra.Command{}
stomp_mqtt = &cobra.Command{}
mqtt_mqtt = &cobra.Command{}
mqtt_amqp = &cobra.Command{}
mqtt_stomp = &cobra.Command{}
amqp091_amqp091 = &cobra.Command{}
amqp091_amqp = &cobra.Command{}
amqp091_mqtt = &cobra.Command{}
amqp091_stomp = &cobra.Command{}
versionCmd = &cobra.Command{}
)

var (
Expand Down Expand Up @@ -206,6 +210,43 @@ func RootCmd() *cobra.Command {
}
mqtt_stomp.Flags().AddFlagSet(mqttPublisherFlags)

amqp091_amqp091 = &cobra.Command{
Use: "amqp091-amqp091",
Aliases: []string{"amqp091"},
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP091
cfg.ConsumerProto = config.AMQP091
start(cfg)
},
}

amqp091_amqp = &cobra.Command{
Use: "amqp091-amqp",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP091
cfg.ConsumerProto = config.AMQP
start(cfg)
},
}

amqp091_mqtt = &cobra.Command{
Use: "amqp091-mqtt",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP091
cfg.ConsumerProto = config.MQTT
start(cfg)
},
}

amqp091_stomp = &cobra.Command{
Use: "amqp091-stomp",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP091
cfg.ConsumerProto = config.STOMP
start(cfg)
},
}

versionCmd = &cobra.Command{
Use: "version",
Run: func(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -320,6 +361,10 @@ func RootCmd() *cobra.Command {
rootCmd.AddCommand(mqtt_mqtt)
rootCmd.AddCommand(mqtt_amqp)
rootCmd.AddCommand(mqtt_stomp)
rootCmd.AddCommand(amqp091_amqp091)
rootCmd.AddCommand(amqp091_amqp)
rootCmd.AddCommand(amqp091_mqtt)
rootCmd.AddCommand(amqp091_stomp)
rootCmd.AddCommand(versionCmd)

return rootCmd
Expand Down Expand Up @@ -502,7 +547,7 @@ func startPublishers(ctx context.Context, wg *sync.WaitGroup, startPublishing ch
// then we close the channel to allow all of them to start publishing "at once"
// but each publisher sleeps for a random sub-second time, to avoid burts of
// messages being published
p.Start(ctx, publisherReady, startPublishing)
p.Start(publisherReady, startPublishing)
}()
<-publisherReady
}
Expand Down Expand Up @@ -552,6 +597,8 @@ func defaultUri(proto string) string {
switch proto {
case "amqp":
uri = "amqp://localhost/"
case "amqp091":
uri = "amqp://localhost/"
case "stomp":
uri = "stomp://localhost:61613"
case "mqtt":
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/muesli/termenv v0.15.2 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241121093408-1a6679a20123 h1:vwJUkrY81ekNGCh2xN+ii16ZSRfsmClJkJhHGimiocA=
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241121093408-1a6679a20123/go.mod h1:Km231GyOZAw9I3SZIqkfB9VVzCsu8jvFWYdghmnwueM=
github.com/relvacode/iso8601 v1.6.0 h1:eFXUhMJN3Gz8Rcq82f9DTMW0svjtAVuIEULglM7QHTU=
Expand Down
5 changes: 4 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var _ = Describe("OMQ CLI", func() {
Eventually(session.Err).Should(gbytes.Say(`TOTAL CONSUMED messages=1`))
Eventually(session).Should(gbytes.Say(`omq_messages_consumed_total{priority="normal"} 1`))
},
Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"), // https://github.com/Azure/go-amqp/issues/313
Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"),
Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"),
Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"),
Entry("amqp -> stomp", "amqp", "/exchanges/amq.topic/", "stomp", "/topic/"),
Expand All @@ -117,6 +117,9 @@ var _ = Describe("OMQ CLI", func() {
Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"),
Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"),
Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"),
Entry("amqp091 -> amqp", "amqp091", "/queues/", "amqp", "/queues/"),
Entry("amqp091 -> mqtt", "amqp091", "/exchanges/amq.topic/", "mqtt", "/topic/"),
Entry("amqp091 -> stomp", "amqp091", "/exchanges/amq.topic/", "stomp", "/topic/"),
)

DescribeTable("supports message priorities for AMQP and STOMP",
Expand Down
Loading

0 comments on commit f343540

Please sign in to comment.