Skip to content

Commit

Permalink
Merge pull request #571 from ripienaar/adr_query_match
Browse files Browse the repository at this point in the history
Support ADR-44 metadata for stream and consumer matches
  • Loading branch information
ripienaar authored Sep 9, 2024
2 parents c79a81a + fdfd787 commit 543abd8
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 0 deletions.
6 changes: 6 additions & 0 deletions api/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ const (
JSMetricPrefix = "$JS.EVENT.METRIC"
JSAdvisoryPrefix = "$JS.EVENT.ADVISORY"
JSApiAccountInfo = "$JS.API.INFO"

JSMetaCreatedServerLevel = "_nats.created.server.api_level"
JSMetaCreatedServerVersion = "_nats.created.server.version"
JSMetaCurrentServerLevel = "_nats.server.api_level"
JSMetaCurrentServerVersion = "_nats.server.version"
JsMetaRequiredServerLevel = "_nats.server.require.api_level"
)

// Responses to requests sent to a server from a client.
Expand Down
28 changes: 28 additions & 0 deletions consumer_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package jsm
import (
"errors"
"fmt"
"strconv"
"time"

"github.com/expr-lang/expr"
"github.com/nats-io/jsm.go/api"
"gopkg.in/yaml.v3"
)

Expand All @@ -41,12 +43,21 @@ type consumerQuery struct {
ackPending int
pending uint64
leader string
apiLevel int
ageLimit time.Duration
lastDeliveryLimit time.Duration
}

type ConsumerQueryOpt func(query *consumerQuery) error

// ConsumerQueryApiLevelMin limits results to assets requiring API Level above or equal to level
func ConsumerQueryApiLevelMin(level int) ConsumerQueryOpt {
return func(q *consumerQuery) error {
q.apiLevel = level
return nil
}
}

// ConsumerQueryExpression filters the consumers using the expr expression language
func ConsumerQueryExpression(e string) ConsumerQueryOpt {
return func(q *consumerQuery) error {
Expand Down Expand Up @@ -177,6 +188,7 @@ func (s *Stream) QueryConsumers(opts ...ConsumerQueryOpt) ([]*Consumer, error) {
q.matchDelivery,
q.matchReplicas,
q.matchLeaderServer,
q.matchApiLevel,
}

var consumers []*Consumer
Expand Down Expand Up @@ -298,6 +310,22 @@ func (q *consumerQuery) matchPull(consumers []*Consumer) ([]*Consumer, error) {
})
}

func (q *consumerQuery) matchApiLevel(consumers []*Consumer) ([]*Consumer, error) {
return q.cbMatcher(consumers, q.apiLevel > 0, func(consumer *Consumer) bool {
var v string
var requiredLevel int

if len(consumer.Metadata()) > 0 {
v = consumer.Metadata()[api.JsMetaRequiredServerLevel]
if v != "" {
requiredLevel, _ = strconv.Atoi(v)
}
}

return (!q.invert && requiredLevel >= q.apiLevel) || (q.invert && requiredLevel < q.apiLevel)
})
}

func (q *consumerQuery) matchExpression(consumers []*Consumer) ([]*Consumer, error) {
if q.expression == "" {
return consumers, nil
Expand Down
39 changes: 39 additions & 0 deletions stream_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package jsm
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/expr-lang/expr"
"github.com/nats-io/jsm.go/api"
"gopkg.in/yaml.v3"
)

Expand All @@ -42,10 +44,19 @@ type streamQuery struct {
expression string
leader string
matchers []streamMatcher
apiLevel int
}

type StreamQueryOpt func(query *streamQuery) error

// StreamQueryApiLevelMin limits results to assets requiring API Level above or equal to level
func StreamQueryApiLevelMin(level int) StreamQueryOpt {
return func(q *streamQuery) error {
q.apiLevel = level
return nil
}
}

// StreamQueryExpression filters the stream using the expr expression language
func StreamQueryExpression(e string) StreamQueryOpt {
return func(q *streamQuery) error {
Expand Down Expand Up @@ -195,6 +206,7 @@ func (m *Manager) QueryStreams(opts ...StreamQueryOpt) ([]*Stream, error) {
q.matchSourced,
q.matchMirrored,
q.matchLeaderServer,
q.matchApiLevel,
}

streams, _, err := m.Streams(nil)
Expand Down Expand Up @@ -454,6 +466,33 @@ func (q *streamQuery) matchConsumerLimit(streams []*Stream) ([]*Stream, error) {
return matched, nil
}

func (q *streamQuery) matchApiLevel(streams []*Stream) ([]*Stream, error) {
if q.apiLevel <= 0 {
return streams, nil
}

var matched []*Stream

for _, stream := range streams {
var v string
var requiredLevel int

meta := stream.Configuration().Metadata
if len(meta) > 0 {
v = meta[api.JsMetaRequiredServerLevel]
if v != "" {
requiredLevel, _ = strconv.Atoi(v)
}
}

if (!q.invert && requiredLevel >= q.apiLevel) || (q.invert && requiredLevel < q.apiLevel) {
matched = append(matched, stream)
}
}

return matched, nil
}

func (q *streamQuery) matchCluster(streams []*Stream) ([]*Stream, error) {
if q.cluster == nil {
return streams, nil
Expand Down
61 changes: 61 additions & 0 deletions test/consumer_query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package test

import (
"testing"
"time"

"github.com/nats-io/jsm.go"
natsd "github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)

func checkConsumerQueryMatched(t *testing.T, s *jsm.Stream, expect int, opts ...jsm.ConsumerQueryOpt) {
t.Helper()

matched, err := s.QueryConsumers(opts...)
checkErr(t, err, "query failed")
if len(matched) != expect {
t.Fatalf("expected %d matched, got %d", expect, len(matched))
}
}

func TestConsumerApiLevel(t *testing.T) {
withJSCluster(t, func(t *testing.T, _ []*natsd.Server, nc *nats.Conn, mgr *jsm.Manager) {
s, err := mgr.NewStream("q1", jsm.Subjects("in.q1"), jsm.MemoryStorage(), jsm.Replicas(2))
checkErr(t, err, "create failed")

_, err = s.NewConsumer(jsm.PauseUntil(time.Now().Add(time.Hour)), jsm.DurableName("PAUSED"))
checkErr(t, err, "create failed")

_, err = s.NewConsumer(jsm.DurableName("OLD"))
checkErr(t, err, "create failed")

checkConsumerQueryMatched(t, s, 1, jsm.ConsumerQueryApiLevelMin(1))
checkConsumerQueryMatched(t, s, 2, jsm.ConsumerQueryApiLevelMin(0))

res, err := s.QueryConsumers(jsm.ConsumerQueryApiLevelMin(1))
checkErr(t, err, "query failed")
if res[0].Name() != "PAUSED" {
t.Fatalf("did not match paused consumer")
}

res, err = s.QueryConsumers(jsm.ConsumerQueryApiLevelMin(1), jsm.ConsumerQueryInvert())
checkErr(t, err, "query failed")
if res[0].Name() != "OLD" {
t.Fatalf("did not match unpaused consumer")
}
})
}
13 changes: 13 additions & 0 deletions test/stream_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,16 @@ func TestStreamSubjectWildcardMatch(t *testing.T) {
checkStreamQueryMatched(t, mgr, 1, jsm.StreamQuerySubjectWildcard("in.*.*.>"), jsm.StreamQueryInvert())
})
}

func TestStreamApiLevelMatch(t *testing.T) {
withJSCluster(t, func(t *testing.T, _ []*natsd.Server, _ *nats.Conn, mgr *jsm.Manager) {
_, err := mgr.NewStream("q1", jsm.Subjects("in.q1", "in.q1.other"), jsm.MemoryStorage())
checkErr(t, err, "create failed")

_, err = mgr.NewStream("q2", jsm.Subjects("in.q2", "in.q2.other"), jsm.MemoryStorage())
checkErr(t, err, "create failed")

checkStreamQueryMatched(t, mgr, 0, jsm.StreamQueryApiLevelMin(1))
checkStreamQueryMatched(t, mgr, 2, jsm.StreamQueryApiLevelMin(1), jsm.StreamQueryInvert())
})
}

0 comments on commit 543abd8

Please sign in to comment.