Skip to content

Commit

Permalink
Add stream filtering for STOMP and AMQP-1.0 publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 12, 2023
1 parent b0f3330 commit 4be3e5e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 27 deletions.
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func RootCmd() *cobra.Command {
BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValues, "stream-filter-values", "", "Stream consumer filter")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValueSet, "stream-filter-value-set", "", "Stream filter value for publisher")
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count")

rootCmd.AddCommand(amqp_amqp)
Expand Down
5 changes: 5 additions & 0 deletions pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (p Amqp10Publisher) Send() {
if p.Config.Amqp.Subject != "" {
msg.Properties = &amqp.MessageProperties{Subject: &p.Config.Amqp.Subject}
}

if p.Config.StreamFilterValueSet != "" {
msg.Annotations = amqp.Annotations{"x-stream-filter-value": p.Config.StreamFilterValueSet}
}

msg.Header = &amqp.MessageHeader{
Durable: p.Config.MessageDurability}
timer := prometheus.NewTimer(metrics.PublishingLatency.With(prometheus.Labels{"protocol": "amqp-1.0"}))
Expand Down
41 changes: 21 additions & 20 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,27 @@ type MqttOptions struct {
}

type Config struct {
PublisherUri string
ConsumerUri string
Publishers int
Consumers int
PublishCount int
ConsumeCount int
PublishTo string
ConsumeFrom string
ConsumerCredits int
Size int
Rate int
Duration time.Duration
UseMillis bool
QueueDurability AmqpDurabilityMode
MessageDurability bool
StreamOffset string
StreamFilterValues string
Amqp AmqpOptions
MqttPublisher MqttOptions
MqttConsumer MqttOptions
PublisherUri string
ConsumerUri string
Publishers int
Consumers int
PublishCount int
ConsumeCount int
PublishTo string
ConsumeFrom string
ConsumerCredits int
Size int
Rate int
Duration time.Duration
UseMillis bool
QueueDurability AmqpDurabilityMode
MessageDurability bool
StreamOffset string
StreamFilterValues string
StreamFilterValueSet string
Amqp AmqpOptions
MqttPublisher MqttOptions
MqttConsumer MqttOptions
}

func NewConfig() Config {
Expand Down
29 changes: 22 additions & 7 deletions pkg/stomp_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/rabbitmq/omq/pkg/utils"

"github.com/go-stomp/stomp/v3"
"github.com/go-stomp/stomp/v3/frame"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -98,15 +99,9 @@ func (p StompPublisher) StartRateLimited(ctx context.Context) {

func (p StompPublisher) Send() {
utils.UpdatePayload(p.Config.UseMillis, &p.msg)
var msgDurability string
if p.Config.MessageDurability {
msgDurability = "true"
} else {
msgDurability = "false"
}

timer := prometheus.NewTimer(metrics.PublishingLatency.With(prometheus.Labels{"protocol": "stomp"}))
err := p.Connection.Send(p.Topic, "", p.msg, stomp.SendOpt.Receipt, stomp.SendOpt.Header("persistent", msgDurability))
err := p.Connection.Send(p.Topic, "", p.msg, buildHeaders(p.Config)...)
timer.ObserveDuration()
if err != nil {
log.Error("message sending failure", "protocol", "STOMP", "publisherId", p.Id, "error", err)
Expand All @@ -121,3 +116,23 @@ func (p StompPublisher) Stop(reason string) {
log.Debug("closing connection", "protocol", "stomp", "publisherId", p.Id, "reason", reason)
_ = p.Connection.Disconnect()
}

func buildHeaders(cfg config.Config) []func(*frame.Frame) error {
var headers []func(*frame.Frame) error

headers = append(headers, stomp.SendOpt.Receipt)

var msgDurability string
if cfg.MessageDurability {
msgDurability = "true"
} else {
msgDurability = "false"
}
headers = append(headers, stomp.SendOpt.Header("persistent", msgDurability))

if cfg.StreamFilterValueSet != "" {
headers = append(headers, stomp.SendOpt.Header("x-stream-filter-value", cfg.StreamFilterValueSet))
}

return headers
}

0 comments on commit 4be3e5e

Please sign in to comment.