diff --git a/cmd/root.go b/cmd/root.go index 9d348df..cbd8dd3 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -176,6 +176,7 @@ func RootCmd() *cobra.Command { BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)") rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)") rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValues, "stream-filter-values", "", "Stream consumer filter") + rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher") rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count") rootCmd.AddCommand(amqp_amqp) diff --git a/pkg/amqp10_client/publisher.go b/pkg/amqp10_client/publisher.go index 079a117..a71fa81 100644 --- a/pkg/amqp10_client/publisher.go +++ b/pkg/amqp10_client/publisher.go @@ -122,6 +122,11 @@ func (p Amqp10Publisher) Send() { if p.Config.Amqp.Subject != "" { msg.Properties = &amqp.MessageProperties{Subject: &p.Config.Amqp.Subject} } + + if p.Config.StreamFilterValueSet != "" { + msg.Annotations = amqp.Annotations{"x-stream-filter-value": p.Config.StreamFilterValueSet} + } + msg.Header = &amqp.MessageHeader{ Durable: p.Config.MessageDurability} timer := prometheus.NewTimer(metrics.PublishingLatency.With(prometheus.Labels{"protocol": "amqp-1.0"})) diff --git a/pkg/config/config.go b/pkg/config/config.go index a217616..cea4ceb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -30,26 +30,27 @@ type MqttOptions struct { } type Config struct { - PublisherUri string - ConsumerUri string - Publishers int - Consumers int - PublishCount int - ConsumeCount int - PublishTo string - ConsumeFrom string - ConsumerCredits int - Size int - Rate int - Duration time.Duration - UseMillis bool - QueueDurability AmqpDurabilityMode - MessageDurability bool - StreamOffset string - StreamFilterValues string - Amqp AmqpOptions - MqttPublisher MqttOptions - MqttConsumer MqttOptions + PublisherUri string + ConsumerUri string + Publishers int + Consumers int + PublishCount int + ConsumeCount int + PublishTo string + ConsumeFrom string + ConsumerCredits int + Size int + Rate int + Duration time.Duration + UseMillis bool + QueueDurability AmqpDurabilityMode + MessageDurability bool + StreamOffset string + StreamFilterValues string + StreamFilterValueSet string + Amqp AmqpOptions + MqttPublisher MqttOptions + MqttConsumer MqttOptions } func NewConfig() Config { diff --git a/pkg/stomp_client/publisher.go b/pkg/stomp_client/publisher.go index 29648a8..5934d89 100644 --- a/pkg/stomp_client/publisher.go +++ b/pkg/stomp_client/publisher.go @@ -12,6 +12,7 @@ import ( "github.com/rabbitmq/omq/pkg/utils" "github.com/go-stomp/stomp/v3" + "github.com/go-stomp/stomp/v3/frame" "github.com/prometheus/client_golang/prometheus" ) @@ -98,15 +99,9 @@ func (p StompPublisher) StartRateLimited(ctx context.Context) { func (p StompPublisher) Send() { utils.UpdatePayload(p.Config.UseMillis, &p.msg) - var msgDurability string - if p.Config.MessageDurability { - msgDurability = "true" - } else { - msgDurability = "false" - } timer := prometheus.NewTimer(metrics.PublishingLatency.With(prometheus.Labels{"protocol": "stomp"})) - err := p.Connection.Send(p.Topic, "", p.msg, stomp.SendOpt.Receipt, stomp.SendOpt.Header("persistent", msgDurability)) + err := p.Connection.Send(p.Topic, "", p.msg, buildHeaders(p.Config)...) timer.ObserveDuration() if err != nil { log.Error("message sending failure", "protocol", "STOMP", "publisherId", p.Id, "error", err) @@ -121,3 +116,23 @@ func (p StompPublisher) Stop(reason string) { log.Debug("closing connection", "protocol", "stomp", "publisherId", p.Id, "reason", reason) _ = p.Connection.Disconnect() } + +func buildHeaders(cfg config.Config) []func(*frame.Frame) error { + var headers []func(*frame.Frame) error + + headers = append(headers, stomp.SendOpt.Receipt) + + var msgDurability string + if cfg.MessageDurability { + msgDurability = "true" + } else { + msgDurability = "false" + } + headers = append(headers, stomp.SendOpt.Header("persistent", msgDurability)) + + if cfg.StreamFilterValueSet != "" { + headers = append(headers, stomp.SendOpt.Header("x-stream-filter-value", cfg.StreamFilterValueSet)) + } + + return headers +}