Skip to content

Commit

Permalink
Add stream filtering support in STOMP
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 6, 2023
1 parent 0077339 commit f1a4619
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 19 deletions.
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func RootCmd() *cobra.Command {
rootCmd.PersistentFlags().
BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)")
rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValues, "stream-filter-values", "", "Stream consumer filter")
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count")

rootCmd.AddCommand(amqp_amqp)
Expand Down
39 changes: 20 additions & 19 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,26 @@ type MqttOptions struct {
}

type Config struct {
PublisherUri string
ConsumerUri string
Publishers int
Consumers int
PublishCount int
ConsumeCount int
PublishTo string
ConsumeFrom string
ConsumerCredits int
Size int
Rate int
Duration time.Duration
UseMillis bool
QueueDurability AmqpDurabilityMode
MessageDurability bool
StreamOffset string
Amqp AmqpOptions
MqttPublisher MqttOptions
MqttConsumer MqttOptions
PublisherUri string
ConsumerUri string
Publishers int
Consumers int
PublishCount int
ConsumeCount int
PublishTo string
ConsumeFrom string
ConsumerCredits int
Size int
Rate int
Duration time.Duration
UseMillis bool
QueueDurability AmqpDurabilityMode
MessageDurability bool
StreamOffset string
StreamFilterValues string
Amqp AmqpOptions
MqttPublisher MqttOptions
MqttConsumer MqttOptions
}

func NewConfig() Config {
Expand Down
5 changes: 5 additions & 0 deletions pkg/stomp_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,10 @@ func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error {
)
}

if cfg.StreamFilterValues != "" {
subscribeOpts = append(subscribeOpts,
stomp.SubscribeOpt.Header("x-stream-filter", cfg.StreamFilterValues))
}
log.Info("subscribe options", "filter", cfg.StreamFilterValues)
return subscribeOpts
}

0 comments on commit f1a4619

Please sign in to comment.