Skip to content

Commit

Permalink
[ADDED] Creating iterators for sync subscriptions (#1728)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Dec 17, 2024
1 parent 6bc4159 commit d6eaa84
Show file tree
Hide file tree
Showing 9 changed files with 663 additions and 60 deletions.
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, nil, true, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
cbValue.Call(oV)
}

return c.Conn.subscribe(subject, queue, natsCB, nil, false, nil)
return c.Conn.subscribe(subject, queue, natsCB, nil, nil, false, nil)
}

// FlushTimeout allows a Flush operation to have an associated timeout.
Expand Down
4 changes: 2 additions & 2 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1839,7 +1839,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}
sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
sub, err := nc.subscribe(deliver, queue, cb, ch, nil, isSync, jsi)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1910,7 +1910,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, nil, isSync, jsi)
if err != nil {
return nil, err
}
Expand Down
185 changes: 159 additions & 26 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ var (
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrPermissionViolation = errors.New("nats: permissions violation")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
Expand Down Expand Up @@ -510,6 +511,11 @@ type Options struct {

// SkipHostLookup skips the DNS lookup for the server hostname.
SkipHostLookup bool

// PermissionErrOnSubscribe - if set to true, the client will return ErrPermissionViolation
// from SubscribeSync if the server returns a permissions error for a subscription.
// Defaults to false.
PermissionErrOnSubscribe bool
}

const (
Expand Down Expand Up @@ -618,17 +624,19 @@ type Subscription struct {
// For holding information about a JetStream consumer.
jsi *jsSub

delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
draining bool
status SubStatus
statListeners map[chan SubStatus][]SubStatus
delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
errCh chan (error)
closed bool
sc bool
connClosed bool
draining bool
status SubStatus
statListeners map[chan SubStatus][]SubStatus
permissionsErr error

// Type of Subscription
typ SubscriptionType
Expand Down Expand Up @@ -1401,6 +1409,13 @@ func SkipHostLookup() Option {
}
}

func PermissionErrOnSubscribe(enabled bool) Option {
return func(o *Options) error {
o.PermissionErrOnSubscribe = enabled
return nil
}
}

// TLSHandshakeFirst is an Option to perform the TLS handshake first, that is
// before receiving the INFO protocol. This requires the server to also be
// configured with such option, otherwise the connection will fail.
Expand Down Expand Up @@ -3435,6 +3450,9 @@ slowConsumer:
}
}

var permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`)
var permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`)

// processTransientError is called when the server signals a non terminal error
// which does not close the connection or trigger a reconnect.
// This will trigger the async error callback if set.
Expand All @@ -3444,6 +3462,27 @@ slowConsumer:
func (nc *Conn) processTransientError(err error) {
nc.mu.Lock()
nc.err = err
if errors.Is(err, ErrPermissionViolation) {
matches := permissionsRe.FindStringSubmatch(err.Error())
if len(matches) >= 2 {
queueMatches := permissionsQueueRe.FindStringSubmatch(err.Error())
var q string
if len(queueMatches) >= 2 {
q = queueMatches[1]
}
subject := matches[1]
for _, sub := range nc.subs {
if sub.Subject == subject && sub.Queue == q && sub.permissionsErr == nil {
sub.mu.Lock()
if sub.errCh != nil {
sub.errCh <- err
}
sub.permissionsErr = err
sub.mu.Unlock()
}
}
}
}
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
Expand Down Expand Up @@ -3685,7 +3724,7 @@ func (nc *Conn) processErr(ie string) {
} else if e == MAX_CONNECTIONS_ERR {
close = nc.processOpErr(ErrMaxConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processTransientError(fmt.Errorf("nats: %s", ne))
nc.processTransientError(fmt.Errorf("%w: %s", ErrPermissionViolation, ne))
} else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) {
nc.processTransientError(ErrMaxSubscriptionsExceeded)
} else if authErr := checkAuthError(e); authErr != nil {
Expand Down Expand Up @@ -4042,7 +4081,7 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
// Create the response subscription we will use for all new style responses.
// This will be on an _INBOX with an additional terminal token. The subscription
// will be on a wildcard.
s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, false, nil)
s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, nil, false, nil)
if err != nil {
nc.mu.Unlock()
return nil, token, err
Expand Down Expand Up @@ -4140,7 +4179,7 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration)
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, nil, true, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4246,14 +4285,14 @@ func (nc *Conn) respToken(respInbox string) string {
// since it can't match more than one token.
// Messages will be delivered to the associated MsgHandler.
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, cb, nil, false, nil)
return nc.subscribe(subj, _EMPTY_, cb, nil, nil, false, nil)
}

// ChanSubscribe will express interest in the given subject and place
// all messages received on the channel.
// You should not close the channel until sub.Unsubscribe() has been called.
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, nil, ch, false, nil)
return nc.subscribe(subj, _EMPTY_, nil, ch, nil, false, nil)
}

// ChanQueueSubscribe will express interest in the given subject.
Expand All @@ -4263,7 +4302,7 @@ func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than QueueSubscribeSyncWithChan.
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, group, nil, ch, false, nil)
return nc.subscribe(subj, group, nil, ch, nil, false, nil)
}

// SubscribeSync will express interest on the given subject. Messages will
Expand All @@ -4273,15 +4312,19 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
return nil, ErrInvalidConnection
}
mch := make(chan *Msg, nc.Opts.SubChanLen)
return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil)
var errCh chan error
if nc.Opts.PermissionErrOnSubscribe {
errCh = make(chan error, 100)
}
return nc.subscribe(subj, _EMPTY_, nil, mch, errCh, true, nil)
}

// QueueSubscribe creates an asynchronous queue subscriber on the given subject.
// All subscribers with the same queue name will form the queue group and
// only one member of the group will be selected to receive any given
// message asynchronously.
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, queue, cb, nil, false, nil)
return nc.subscribe(subj, queue, cb, nil, nil, false, nil)
}

// QueueSubscribeSync creates a synchronous queue subscriber on the given
Expand All @@ -4290,7 +4333,11 @@ func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription
// given message synchronously using Subscription.NextMsg().
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
mch := make(chan *Msg, nc.Opts.SubChanLen)
return nc.subscribe(subj, queue, nil, mch, true, nil)
var errCh chan error
if nc.Opts.PermissionErrOnSubscribe {
errCh = make(chan error, 100)
}
return nc.subscribe(subj, queue, nil, mch, errCh, true, nil)
}

// QueueSubscribeSyncWithChan will express interest in the given subject.
Expand All @@ -4300,7 +4347,7 @@ func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than ChanQueueSubscribe.
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, queue, nil, ch, false, nil)
return nc.subscribe(subj, queue, nil, ch, nil, false, nil)
}

// badSubject will do quick test on whether a subject is acceptable.
Expand All @@ -4324,16 +4371,16 @@ func badQueue(qname string) bool {
}

// subscribe is the internal subscribe function that indicates interest in a subject.
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
nc.mu.Lock()
defer nc.mu.Unlock()
return nc.subscribeLocked(subj, queue, cb, ch, isSync, js)
return nc.subscribeLocked(subj, queue, cb, ch, errCh, isSync, js)
}

func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
Expand Down Expand Up @@ -4384,6 +4431,7 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
} else { // Sync Subscription
sub.typ = SyncSubscription
sub.mch = ch
sub.errCh = errCh
}

nc.subsMu.Lock()
Expand Down Expand Up @@ -4828,16 +4876,92 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
t := globalTimerPool.Get(timeout)
defer globalTimerPool.Put(t)

if s.errCh != nil {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case err := <-s.errCh:
return nil, err
case <-t.C:
return nil, ErrTimeout
}
} else {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case <-t.C:
return nil, ErrTimeout
}
}

return msg, nil
}

// nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not
// time out. It is only used internally for non-timeout subscription iterator.
func (s *Subscription) nextMsgNoTimeout() (*Msg, error) {
if s == nil {
return nil, ErrBadSubscription
}

s.mu.Lock()
err := s.validateNextMsgState(false)
if err != nil {
s.mu.Unlock()
return nil, err
}

// snapshot
mch := s.mch
s.mu.Unlock()

var ok bool
var msg *Msg

// If something is available right away, let's optimize that case.
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
return msg, nil
}
default:
}

if s.errCh != nil {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case err := <-s.errCh:
return nil, err
}
} else {
msg, ok = <-mch
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case <-t.C:
return nil, ErrTimeout
}

return msg, nil
Expand All @@ -4860,6 +4984,12 @@ func (s *Subscription) validateNextMsgState(pullSubInternal bool) error {
if s.mcb != nil {
return ErrSyncSubRequired
}
// if this subscription previously had a permissions error
// and no reconnect has been attempted, return the permissions error
// since the subscription does not exist on the server
if s.conn.Opts.PermissionErrOnSubscribe && s.permissionsErr != nil {
return s.permissionsErr
}
if s.sc {
s.changeSubStatus(SubscriptionActive)
s.sc = false
Expand Down Expand Up @@ -5235,6 +5365,9 @@ func (nc *Conn) resendSubscriptions() {
for _, s := range subs {
adjustedMax := uint64(0)
s.mu.Lock()
// when resending subscriptions, the permissions error should be cleared
// since the user may have fixed the permissions issue
s.permissionsErr = nil
if s.max > 0 {
if s.delivered < s.max {
adjustedMax = s.max - s.delivered
Expand Down
Loading

0 comments on commit d6eaa84

Please sign in to comment.