Skip to content

Commit

Permalink
Consumer latency (#8)
Browse files Browse the repository at this point in the history
* Add --consumer-latency option

* Remove defaults that are generated

Cobra already prints the defaults for some flags
  • Loading branch information
mkuratczyk authored Feb 9, 2024
1 parent 3d2f0da commit c43ae66
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 9 deletions.
24 changes: 15 additions & 9 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func RootCmd() *cobra.Command {
}
amqp_mqtt.Flags().IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0, "MQTT consumer QoS level (0, 1 or 2; default=0)")
amqp_mqtt.Flags().
BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, "MQTT consumer clean session (default = true)")
BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, "MQTT consumer clean session")

stomp_stomp = &cobra.Command{
Use: "stomp-stomp",
Expand All @@ -95,7 +95,7 @@ func RootCmd() *cobra.Command {
}
stomp_mqtt.Flags().IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0, "MQTT consumer QoS level (0, 1 or 2; default=0)")
stomp_mqtt.Flags().
BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, "MQTT consumer clean session (default = true)")
BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, "MQTT consumer clean session")

mqtt_mqtt = &cobra.Command{
Use: "mqtt-mqtt",
Expand All @@ -107,9 +107,9 @@ func RootCmd() *cobra.Command {
mqtt_mqtt.Flags().IntVar(&cfg.MqttPublisher.QoS, "mqtt-publisher-qos", 0, "MQTT publisher QoS level (0, 1 or 2; default=0)")
mqtt_mqtt.Flags().IntVar(&cfg.MqttConsumer.QoS, "mqtt-consumer-qos", 0, "MQTT consumer QoS level (0, 1 or 2; default=0)")
mqtt_mqtt.Flags().
BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, "MQTT publisher clean session (default = true)")
BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, "MQTT publisher clean session")
mqtt_mqtt.Flags().
BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, "MQTT consumer clean session (default = true)")
BoolVar(&cfg.MqttConsumer.CleanSession, "mqtt-consumer-clean-session", true, "MQTT consumer clean session")

mqtt_amqp = &cobra.Command{
Use: "mqtt-amqp",
Expand All @@ -119,7 +119,7 @@ func RootCmd() *cobra.Command {
}
mqtt_amqp.Flags().IntVar(&cfg.MqttPublisher.QoS, "mqtt-qos", 0, "MQTT publisher QoS level (0, 1 or 2; default=0)")
mqtt_amqp.Flags().
BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, "MQTT publisher clean session (default = true)")
BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, "MQTT publisher clean session")

mqtt_stomp = &cobra.Command{
Use: "mqtt-stomp",
Expand All @@ -129,7 +129,7 @@ func RootCmd() *cobra.Command {
}
mqtt_stomp.Flags().IntVar(&cfg.MqttPublisher.QoS, "mqtt-qos", 0, "MQTT publisher QoS level (0, 1 or 2; default=0)")
mqtt_stomp.Flags().
BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, "MQTT publisher clean session (default = true)")
BoolVar(&cfg.MqttPublisher.CleanSession, "mqtt-publisher-clean-session", true, "MQTT publisher clean session")

versionCmd = &cobra.Command{
Use: "version",
Expand Down Expand Up @@ -166,28 +166,29 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().IntVarP(&cfg.Publishers, "publishers", "x", 1, "The number of publishers to start")
rootCmd.PersistentFlags().IntVarP(&cfg.Consumers, "consumers", "y", 1, "The number of consumers to start")
rootCmd.PersistentFlags().
IntVarP(&cfg.PublishCount, "pmessages", "C", math.MaxInt, "The number of messages to send per publisher (default=MaxInt)")
IntVarP(&cfg.PublishCount, "pmessages", "C", math.MaxInt, "The number of messages to send per publisher")
rootCmd.PersistentFlags().
IntVarP(&cfg.ConsumeCount, "cmessages", "D", math.MaxInt, "The number of messages to consume per consumer (default=MaxInt)")
rootCmd.PersistentFlags().
StringVarP(&cfg.PublishTo, "publish-to", "t", "/topic/omq", "The topic/terminus to publish to (%d will be replaced with the publisher's id)")
rootCmd.PersistentFlags().
StringVarP(&cfg.ConsumeFrom, "consume-from", "T", "/topic/omq", "The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)")
rootCmd.PersistentFlags().IntVarP(&cfg.Size, "size", "s", 12, "Message payload size in bytes")
rootCmd.PersistentFlags().IntVarP(&cfg.Rate, "rate", "r", -1, "Messages per second (-1 = unlimited; default=-1)")
rootCmd.PersistentFlags().IntVarP(&cfg.Rate, "rate", "r", -1, "Messages per second (-1 = unlimited)")
rootCmd.PersistentFlags().DurationVarP(&cfg.Duration, "time", "z", 0, "Run duration (eg. 10s, 5m, 2h)")
rootCmd.PersistentFlags().
BoolVarP(&cfg.UseMillis, "use-millis", "m", false, "Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)")
rootCmd.PersistentFlags().
VarP(enumflag.New(&cfg.QueueDurability, "queue-durability", config.AmqpDurabilityModes, enumflag.EnumCaseInsensitive), "queue-durability", "", "Queue durability (default: configuration - the queue definition is durable)")
rootCmd.PersistentFlags().StringVar(&cfg.Amqp.Subject, "amqp-subject", "", "AMQP 1.0 message subject")
rootCmd.PersistentFlags().
BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)")
BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable")
rootCmd.PersistentFlags().StringVar(&cfg.MessagePriority, "message-priority", "", "Message priority (0-255, default=unset)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValues, "stream-filter-values", "", "Stream consumer filter")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher")
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count")
rootCmd.PersistentFlags().DurationVarP(&cfg.ConsumerLatency, "consumer-latency", "L", 0*time.Second, "consumer latency (time to accept message)")

rootCmd.AddCommand(amqp_amqp)
rootCmd.AddCommand(amqp_stomp)
Expand Down Expand Up @@ -260,6 +261,11 @@ func start(cfg config.Config, publisherProto common.Protocol, consumerProto comm
}
}

if cfg.ConsumerLatency != 0 && consumerProto == common.MQTT {
log.Error("Consumer latency is not supported for MQTT consumers")
os.Exit(1)
}

if cfg.Duration > 0 {
log.Debug("Will stop all consumers and publishers at " + time.Now().Add(cfg.Duration).String())
time.AfterFunc(cfg.Duration, func() { cancel() })
Expand Down
3 changes: 3 additions & 0 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strconv"
"time"

"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
Expand Down Expand Up @@ -93,6 +94,8 @@ func (c Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {

log.Debug("message received", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic, "size", len(payload))

log.Debug("consumer latency", "protocol", "amqp-1.0", "consumerId", c.Id, "latency", c.Config.ConsumerLatency)
time.Sleep(c.Config.ConsumerLatency)
err = receiver.AcceptMessage(context.TODO(), msg)
if err != nil {
log.Error("message NOT accepted", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Config struct {
PublishTo string
ConsumeFrom string
ConsumerCredits int
ConsumerLatency time.Duration
Size int
Rate int
Duration time.Duration
Expand Down
3 changes: 3 additions & 0 deletions pkg/stomp_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stomp_client
import (
"context"
"strconv"
"time"

"github.com/rabbitmq/omq/pkg/config"
"github.com/rabbitmq/omq/pkg/log"
Expand Down Expand Up @@ -72,6 +73,8 @@ func (c StompConsumer) Start(ctx context.Context, subscribed chan bool) {
m.Observe(utils.CalculateEndToEndLatency(c.Config.UseMillis, &msg.Body))
log.Debug("message received", "protocol", "stomp", "consumerId", c.Id, "destination", c.Topic, "size", len(msg.Body), "ack required", msg.ShouldAck())

log.Debug("consumer latency", "protocol", "stomp", "consumerId", c.Id, "latency", c.Config.ConsumerLatency)
time.Sleep(c.Config.ConsumerLatency)
err = c.Connection.Ack(msg)
if err != nil {
log.Error("message NOT acknowledged", "protocol", "stomp", "consumerId", c.Id, "destination", c.Topic)
Expand Down

0 comments on commit c43ae66

Please sign in to comment.