Skip to content

Commit

Permalink
Add support for consumers with multiple filters
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Sep 22, 2023
1 parent 93217ee commit 92ccb76
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 1 deletion.
11 changes: 10 additions & 1 deletion controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts := []jsm.ConsumerOption{
jsm.DurableName(spec.DurableName),
jsm.DeliverySubject(spec.DeliverSubject),
jsm.FilterStreamBySubject(spec.FilterSubject),
jsm.RateLimitBitsPerSecond(uint64(spec.RateLimitBps)),
jsm.MaxAckPending(uint(spec.MaxAckPending)),
jsm.ConsumerDescription(spec.Description),
Expand All @@ -319,6 +318,16 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
jsm.ConsumerOverrideReplicas(spec.Replicas),
}

if spec.FilterSubject != "" && len(spec.FilterSubjects) > 0 {
return nil, fmt.Errorf("cannot specify both FilterSubject and FilterSubjects")
}

if spec.FilterSubject != "" {
opts = append(opts, jsm.FilterStreamBySubject(spec.FilterSubject))
} else if len(spec.FilterSubjects) > 0 {
opts = append(opts, jsm.FilterStreamBySubject(spec.FilterSubjects...))
}

switch spec.DeliverPolicy {
case "all":
opts = append(opts, jsm.DeliverAllAvailable())
Expand Down
5 changes: 5 additions & 0 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ spec:
filterSubject:
description: Select only a specific incoming subjects, supports wildcards.
type: string
filterSubjects:
description: List of incoming subjects, supports wildcards. Available since 2.10.
type: array
items:
type: string
replayPolicy:
description: How messages are sent.
type: string
Expand Down
1 change: 1 addition & 0 deletions pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ConsumerSpec struct {
PreventUpdate bool `json:"preventUpdate"`
DurableName string `json:"durableName"`
FilterSubject string `json:"filterSubject"`
FilterSubjects []string `json:"filterSubjects"`
FlowControl bool `json:"flowControl"`
HeadersOnly bool `json:"headersOnly"`
HeartbeatInterval string `json:"heartbeatInterval"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 92ccb76

Please sign in to comment.