Skip to content

Commit

Permalink
[FIXED] Closing connection on max subscriptions exceeded
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Sep 2, 2024
1 parent b61c7c5 commit cfeb28b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 20 deletions.
50 changes: 30 additions & 20 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ const (

// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
MAX_CONNECTIONS_ERR = "maximum connections exceeded"

// MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit
MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded"
)

// Errors
Expand Down Expand Up @@ -140,6 +143,7 @@ var (
ErrNoResponders = errors.New("nats: no responders available for request")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded")
)

// GetDefaultOptions returns default configuration options for the client.
Expand Down Expand Up @@ -2952,11 +2956,11 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {

// processOpErr handles errors from reading or parsing the protocol.
// The lock should not be held entering this function.
func (nc *Conn) processOpErr(err error) {
func (nc *Conn) processOpErr(err error) bool {
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
nc.mu.Unlock()
return
return false
}

if nc.Opts.AllowReconnect && nc.status == CONNECTED {
Expand All @@ -2976,14 +2980,12 @@ func (nc *Conn) processOpErr(err error) {
nc.clearPendingFlushCalls()

go nc.doReconnect(err, false)
nc.mu.Unlock()
return
return false
}

nc.changeConnStatus(DISCONNECTED)
nc.err = err
nc.mu.Unlock()
nc.close(CLOSED, true, nil)
return true
}

// dispatch is responsible for calling any async callbacks
Expand Down Expand Up @@ -3080,7 +3082,9 @@ func (nc *Conn) readLoop() {
err = nc.parse(buf)
}
if err != nil {
nc.processOpErr(err)
if shouldClose := nc.processOpErr(err); shouldClose {
nc.close(CLOSED, true, nil)
}
break
}
}
Expand Down Expand Up @@ -3410,15 +3414,17 @@ slowConsumer:
}
}

// processPermissionsViolation is called when the server signals a subject
// permissions violation on either publish or subscribe.
func (nc *Conn) processPermissionsViolation(err string) {
// processTransientError is called when the server signals a non terminal error
// which does not close the connection or trigger a reconnect.
// This will trigger the async error callback if set.
// These errors include the following:
// - permissions violation on publish or subscribe
// - maximum subscriptions exceeded
func (nc *Conn) processTransientError(err error) {
nc.mu.Lock()
// create error here so we can pass it as a closure to the async cb dispatcher.
e := errors.New("nats: " + err)
nc.err = e
nc.err = err
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) })
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
nc.mu.Unlock()
}
Expand Down Expand Up @@ -3650,15 +3656,17 @@ func (nc *Conn) processErr(ie string) {
// convert to lower case.
e := strings.ToLower(ne)

close := false
var close bool

// FIXME(dlc) - process Slow Consumer signals special.
if e == STALE_CONNECTION {
nc.processOpErr(ErrStaleConnection)
close = nc.processOpErr(ErrStaleConnection)
} else if e == MAX_CONNECTIONS_ERR {
nc.processOpErr(ErrMaxConnectionsExceeded)
close = nc.processOpErr(ErrMaxConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processPermissionsViolation(ne)
nc.processTransientError(fmt.Errorf("nats: %s", ne))
} else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) {
nc.processTransientError(ErrMaxSubscriptionsExceeded)
} else if authErr := checkAuthError(e); authErr != nil {
nc.mu.Lock()
close = nc.processAuthError(authErr)
Expand Down Expand Up @@ -5107,7 +5115,9 @@ func (nc *Conn) processPingTimer() {
nc.pout++
if nc.pout > nc.Opts.MaxPingsOut {
nc.mu.Unlock()
nc.processOpErr(ErrStaleConnection)
if shouldClose := nc.processOpErr(ErrStaleConnection); shouldClose {
nc.close(CLOSED, true, nil)
}
return
}

Expand Down
33 changes: 33 additions & 0 deletions test/sub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package test
import (
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1737,3 +1738,35 @@ func TestSubscriptionEvents(t *testing.T) {
close(blockChan)
})
}

func TestMaxSubscriptionsExceeded(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
max_subscriptions: 5
`))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

ch := make(chan error)
nc, err := nats.Connect(s.ClientURL(), nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) {
ch <- err
}))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

for i := 0; i < 6; i++ {
s, err := nc.Subscribe("foo", func(_ *nats.Msg) {})
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
defer s.Unsubscribe()
}

WaitOnChannel(t, ch, nats.ErrMaxSubscriptionsExceeded)

// wait for the server to process the SUBs
time.Sleep(100 * time.Millisecond)
}

0 comments on commit cfeb28b

Please sign in to comment.