Skip to content

Commit

Permalink
Add AMQP-1.0 stream consumption filtering support
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Dec 12, 2023
1 parent f1a4619 commit b0f3330
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,7 @@ func (c Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
case config.UnsettledState:
durability = amqp.DurabilityUnsettledState
}
var filters []amqp.LinkFilter
if c.Config.StreamOffset != "" {
// parse stream offset
offset, err := parseStreamOffset(c.Config.StreamOffset)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
os.Exit(1)
}
filters = []amqp.LinkFilter{amqp.NewLinkFilter("rabbitmq:stream-offset-spec", 0, offset)}
}
receiver, err := c.Session.NewReceiver(context.TODO(), c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.ConsumerCredits), Filters: filters})
receiver, err := c.Session.NewReceiver(context.TODO(), c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.ConsumerCredits), Filters: buildLinkFilters(c.Config)})
if err != nil {
log.Error("consumer failed to create a receiver", "protocol", "amqp-1.0", "consumerId", c.Id, "error", err.Error())
return
Expand Down Expand Up @@ -121,6 +111,25 @@ func (c Amqp10Consumer) Stop(reason string) {
_ = c.Connection.Close()
}

func buildLinkFilters(cfg config.Config) []amqp.LinkFilter {
var filters []amqp.LinkFilter

if cfg.StreamOffset != "" {
// parse stream offset
offset, err := parseStreamOffset(cfg.StreamOffset)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
os.Exit(1)
}
filters = append(filters, amqp.NewLinkFilter("rabbitmq:stream-offset-spec", 0, offset))
}

if cfg.StreamFilterValues != "" {
filters = append(filters, amqp.NewLinkFilter("rabbitmq:stream-filter", 0, cfg.StreamFilterValues))
}
return filters
}

func parseStreamOffset(offset string) (any, error) {
switch offset {
case "":
Expand Down

0 comments on commit b0f3330

Please sign in to comment.