Skip to content

Commit

Permalink
Replace DefaultServerCapabilities with NewDefaultServerCapabilities()…
Browse files Browse the repository at this point in the history
… to avoid data race (#360)

Co-authored-by: JB <[email protected]>
  • Loading branch information
thedevop and mochi-co authored Feb 4, 2024
1 parent 65c7853 commit 686c35a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 27 deletions.
38 changes: 22 additions & 16 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,8 @@ const (
)

var (
// DefaultServerCapabilities defines the default features and capabilities provided by the server.
DefaultServerCapabilities = &Capabilities{
MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over
ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client
MaximumQos: 2, // maximum qos value available to clients
RetainAvailable: 1, // retain messages is available
MaximumPacketSize: 0, // no maximum packet size
TopicAliasMaximum: math.MaxUint16, // maximum topic alias value
WildcardSubAvailable: 1, // wildcard subscriptions are available
SubIDAvailable: 1, // subscription identifiers are available
SharedSubAvailable: 1, // shared subscriptions are available
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
}
// Deprecated: Use NewDefaultServerCapabilities to avoid data race issue.
DefaultServerCapabilities = NewDefaultServerCapabilities()

ErrListenerIDExists = errors.New("listener id already exists") // a listener with the same id already exists
ErrConnectionClosed = errors.New("connection not open") // connection is closed
Expand All @@ -72,6 +59,25 @@ type Capabilities struct {
SubIDAvailable byte
}

// NewDefaultServerCapabilities defines the default features and capabilities provided by the server.
func NewDefaultServerCapabilities() *Capabilities {
return &Capabilities{
MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
MaximumPacketSize: 0, // no maximum packet size
maximumPacketID: math.MaxUint16,
ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client
TopicAliasMaximum: math.MaxUint16, // maximum topic alias value
SharedSubAvailable: 1, // shared subscriptions are available
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
MaximumQos: 2, // maximum qos value available to clients
RetainAvailable: 1, // retain messages is available
WildcardSubAvailable: 1, // wildcard subscriptions are available
SubIDAvailable: 1, // subscription identifiers are available
}
}

// Compatibilities provides flags for using compatibility modes.
type Compatibilities struct {
ObscureNotAuthorized bool // return unspecified errors instead of not authorized
Expand Down Expand Up @@ -190,7 +196,7 @@ func New(opts *Options) *Server {
// ensureDefaults ensures that the server starts with sane default values, if none are provided.
func (o *Options) ensureDefaults() {
if o.Capabilities == nil {
o.Capabilities = DefaultServerCapabilities
o.Capabilities = NewDefaultServerCapabilities()
}

o.Capabilities.maximumPacketID = math.MaxUint16 // spec maximum is 65535
Expand Down
22 changes: 11 additions & 11 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ func (h *DelayHook) OnDisconnect(cl *Client, err error, expire bool) {
}

func newServer() *Server {
cc := *DefaultServerCapabilities
cc := NewDefaultServerCapabilities()
cc.MaximumMessageExpiryInterval = 0
cc.ReceiveMaximum = 0
s := New(&Options{
Logger: logger,
Capabilities: &cc,
Capabilities: cc,
})
_ = s.AddHook(new(AllowHook), nil)
return s
}

func newServerWithInlineClient() *Server {
cc := *DefaultServerCapabilities
cc := NewDefaultServerCapabilities()
cc.MaximumMessageExpiryInterval = 0
cc.ReceiveMaximum = 0
s := New(&Options{
Logger: logger,
Capabilities: &cc,
Capabilities: cc,
InlineClient: true,
})
_ = s.AddHook(new(AllowHook), nil)
Expand All @@ -125,7 +125,7 @@ func TestOptionsSetDefaults(t *testing.T) {
opts.ensureDefaults()

require.Equal(t, defaultSysTopicInterval, opts.SysTopicResendInterval)
require.Equal(t, DefaultServerCapabilities, opts.Capabilities)
require.Equal(t, NewDefaultServerCapabilities(), opts.Capabilities)

opts = new(Options)
opts.ensureDefaults()
Expand Down Expand Up @@ -1529,10 +1529,10 @@ func TestServerProcessPublishACLCheckDeny(t *testing.T) {

for _, tx := range tt {
t.Run(tx.name, func(t *testing.T) {
cc := *DefaultServerCapabilities
cc := NewDefaultServerCapabilities()
s := New(&Options{
Logger: logger,
Capabilities: &cc,
Capabilities: cc,
})
_ = s.AddHook(new(DenyHook), nil)
_ = s.Serve()
Expand Down Expand Up @@ -3131,22 +3131,22 @@ func TestServerLoadClients(t *testing.T) {
{ID: "v3-clean", ProtocolVersion: 4, Clean: true},
{ID: "v3-not-clean", ProtocolVersion: 4, Clean: false},
{
ID: "v5-clean",
ID: "v5-clean",
ProtocolVersion: 5,
Clean: true,
Clean: true,
Properties: storage.ClientProperties{
SessionExpiryInterval: 10,
},
},
{
ID: "v5-expire-interval-0",
ID: "v5-expire-interval-0",
ProtocolVersion: 5,
Properties: storage.ClientProperties{
SessionExpiryInterval: 0,
},
},
{
ID: "v5-expire-interval-not-0",
ID: "v5-expire-interval-not-0",
ProtocolVersion: 5,
Properties: storage.ClientProperties{
SessionExpiryInterval: 10,
Expand Down

0 comments on commit 686c35a

Please sign in to comment.