Skip to content

Commit

Permalink
WIP: AMQP-0.9.1 consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Jan 2, 2025
1 parent e8429a4 commit 437b591
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 47 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
## omq

`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0
and partially AMQP 0.9.1 (only for publishing). It is developed mostly for RabbitMQ but might be useful for other brokers
`omq` is a messaging system client for testing purposes. It currently supports AMQP 1.0, AMQP 0.9.1, STOMP and MQTT 3.1/3.1.1/5.0. It is developed mostly for RabbitMQ but might be useful for other brokers
as well (some tests against ActiveMQ were performed).

`omq` starts a group of publishers and a group of consumers, in both cases all publishers/consumers are identical,
Expand Down
76 changes: 75 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ import (
var cfg config.Config
var (
amqp_amqp = &cobra.Command{}
amqp_amqp091 = &cobra.Command{}
amqp_stomp = &cobra.Command{}
amqp_mqtt = &cobra.Command{}
stomp_stomp = &cobra.Command{}
stomp_amqp = &cobra.Command{}
stomp_amqp091 = &cobra.Command{}
stomp_mqtt = &cobra.Command{}
mqtt_mqtt = &cobra.Command{}
mqtt_amqp = &cobra.Command{}
mqtt_amqp091 = &cobra.Command{}
mqtt_stomp = &cobra.Command{}
amqp091_amqp091 = &cobra.Command{}
amqp091_amqp = &cobra.Command{}
Expand All @@ -53,6 +56,7 @@ var (
amqpAppProperties []string
amqpAppPropertyFilters []string
amqpPropertyFilters []string
streamOffset string
)

var (
Expand Down Expand Up @@ -114,6 +118,9 @@ func RootCmd() *cobra.Command {
amqpConsumerFlags.StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{},
"AMQP property filters, eg. key1=&p:prefix")

amqp091PublisherFlags := pflag.NewFlagSet("amqp091-publisher", pflag.ContinueOnError)
amqp091ConsumerFlags := pflag.NewFlagSet("amqp091-consumer", pflag.ContinueOnError)

amqp_amqp = &cobra.Command{
Use: "amqp-amqp",
Aliases: []string{"amqp"},
Expand All @@ -126,6 +133,17 @@ func RootCmd() *cobra.Command {
amqp_amqp.Flags().AddFlagSet(amqpPublisherFlags)
amqp_amqp.Flags().AddFlagSet(amqpConsumerFlags)

amqp_amqp091 = &cobra.Command{
Use: "amqp-amqp091",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.AMQP
cfg.ConsumerProto = config.AMQP091
start(cfg)
},
}
amqp_amqp.Flags().AddFlagSet(amqpPublisherFlags)
amqp_amqp.Flags().AddFlagSet(amqp091ConsumerFlags)

amqp_stomp = &cobra.Command{
Use: "amqp-stomp",
Run: func(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -167,6 +185,16 @@ func RootCmd() *cobra.Command {
}
stomp_amqp.Flags().AddFlagSet(amqpConsumerFlags)

stomp_amqp091 = &cobra.Command{
Use: "stomp-amqp091",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.STOMP
cfg.ConsumerProto = config.AMQP091
start(cfg)
},
}
stomp_amqp.Flags().AddFlagSet(amqp091ConsumerFlags)

stomp_mqtt = &cobra.Command{
Use: "stomp-mqtt",
Run: func(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -200,6 +228,17 @@ func RootCmd() *cobra.Command {
mqtt_amqp.Flags().AddFlagSet(mqttPublisherFlags)
mqtt_amqp.Flags().AddFlagSet(amqpConsumerFlags)

mqtt_amqp091 = &cobra.Command{
Use: "mqtt-amqp091",
Run: func(cmd *cobra.Command, args []string) {
cfg.PublisherProto = config.MQTT
cfg.ConsumerProto = config.AMQP091
start(cfg)
},
}
mqtt_amqp.Flags().AddFlagSet(mqttPublisherFlags)
mqtt_amqp.Flags().AddFlagSet(amqp091ConsumerFlags)

mqtt_stomp = &cobra.Command{
Use: "mqtt-stomp",
Run: func(cmd *cobra.Command, args []string) {
Expand All @@ -219,6 +258,8 @@ func RootCmd() *cobra.Command {
start(cfg)
},
}
amqp091_amqp091.Flags().AddFlagSet(amqp091PublisherFlags)
amqp091_amqp091.Flags().AddFlagSet(amqp091ConsumerFlags)

amqp091_amqp = &cobra.Command{
Use: "amqp091-amqp",
Expand All @@ -228,6 +269,8 @@ func RootCmd() *cobra.Command {
start(cfg)
},
}
amqp091_amqp.Flags().AddFlagSet(amqp091PublisherFlags)
amqp091_amqp.Flags().AddFlagSet(amqpConsumerFlags)

amqp091_mqtt = &cobra.Command{
Use: "amqp091-mqtt",
Expand All @@ -237,6 +280,7 @@ func RootCmd() *cobra.Command {
start(cfg)
},
}
amqp091_mqtt.Flags().AddFlagSet(amqp091PublisherFlags)

amqp091_stomp = &cobra.Command{
Use: "amqp091-stomp",
Expand All @@ -246,6 +290,7 @@ func RootCmd() *cobra.Command {
start(cfg)
},
}
amqp091_stomp.Flags().AddFlagSet(amqp091PublisherFlags)

versionCmd = &cobra.Command{
Use: "version",
Expand Down Expand Up @@ -304,7 +349,7 @@ func RootCmd() *cobra.Command {
"The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)")
rootCmd.PersistentFlags().StringVar(&cfg.ConsumerId, "consumer-id", "omq-consumer-%d",
"Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "",
rootCmd.PersistentFlags().StringVar(&streamOffset, "stream-offset", "",
"Stream consumer offset specification (default=next)")
rootCmd.PersistentFlags().Int32Var(&cfg.ConsumerPriority, "consumer-priority", 0, "Consumer priority")
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1,
Expand Down Expand Up @@ -353,13 +398,16 @@ func RootCmd() *cobra.Command {
"The DNS name that will return members to synchronize with")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_amqp091)
rootCmd.AddCommand(amqp_stomp)
rootCmd.AddCommand(amqp_mqtt)
rootCmd.AddCommand(stomp_stomp)
rootCmd.AddCommand(stomp_amqp)
rootCmd.AddCommand(stomp_amqp091)
rootCmd.AddCommand(stomp_mqtt)
rootCmd.AddCommand(mqtt_mqtt)
rootCmd.AddCommand(mqtt_amqp)
rootCmd.AddCommand(mqtt_amqp091)
rootCmd.AddCommand(mqtt_stomp)
rootCmd.AddCommand(amqp091_amqp091)
rootCmd.AddCommand(amqp091_amqp)
Expand Down Expand Up @@ -645,6 +693,12 @@ func sanitizeConfig(cfg *config.Config) error {
}
}

offset, err := parseStreamOffset(streamOffset)
if err != nil {
return fmt.Errorf("invalid stream offset value")
}
cfg.StreamOffset = offset

// AMQP application properties
cfg.Amqp.AppProperties = make(map[string][]string)
for _, val := range amqpAppProperties {
Expand Down Expand Up @@ -688,6 +742,26 @@ func sanitizeConfig(cfg *config.Config) error {
return nil
}

func parseStreamOffset(offset string) (any, error) {
switch offset {
case "":
return "", nil
case "next", "first", "last":
return offset, nil
default:
// check if streamOffset can be parsed as unsigned integer (chunkID)
if chunkID, err := strconv.ParseInt(offset, 10, 64); err == nil {
return chunkID, nil
}
// check if streamOffset can be parsed as an ISO 8601 timestamp
if timestamp, err := time.Parse(time.RFC3339, offset); err == nil {
return timestamp, nil
}
}
// return "", fmt.Errorf("invalid stream offset: %s", offset)
return offset, nil //, fmt.Errorf("invalid stream offset: %s", offset)
}

func handleInterupt(ctx context.Context, cancel context.CancelFunc) {
go func() {
c := make(chan os.Signal, 1)
Expand Down
16 changes: 10 additions & 6 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,21 @@ var _ = Describe("OMQ CLI", func() {
Eventually(session).Should(gbytes.Say(`omq_messages_consumed_total{priority="normal"} 1`))
},
Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"),
Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"),
Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"),
Entry("amqp -> amqp091", "amqp", "/queues/", "amqp", "/queues/"),
Entry("amqp -> stomp", "amqp", "/exchanges/amq.topic/", "stomp", "/topic/"),
Entry("amqp -> mqtt", "amqp", "/exchanges/amq.topic/", "mqtt", "/topic/"),
Entry("stomp -> stomp", "stomp", "/topic/", "stomp", "/topic/"),
Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"),
Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"),
Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"),
Entry("amqp091 -> amqp", "amqp091", "/queues/", "amqp", "/queues/"),
Entry("amqp091 -> amqp091", "amqp091", "/queues/", "amqp", "/queues/"),
Entry("amqp091 -> mqtt", "amqp091", "/exchanges/amq.topic/", "mqtt", "/topic/"),
Entry("amqp091 -> stomp", "amqp091", "/exchanges/amq.topic/", "stomp", "/topic/"),
Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"),
Entry("mqtt -> amqp091", "mqtt", "/topic/", "amqp", "/queues/"),
Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"),
Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"),
Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"),
Entry("stomp -> amqp091", "stomp", "/topic/", "amqp", "/queues/"),
Entry("stomp -> stomp", "stomp", "/topic/", "stomp", "/topic/"),
Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"),
)

DescribeTable("supports message priorities for AMQP and STOMP",
Expand Down
Loading

0 comments on commit 437b591

Please sign in to comment.