Skip to content

Commit

Permalink
add InfoHandler and ClientInfoEmitter for exposing connected clients …
Browse files Browse the repository at this point in the history
…information (#34)

* add TelemetryHandler for exposing connected brokers information

* add ClientInfoEmitter

* chore: add json tags to TCPAddress

* feat: add subscriptions to info response

* test: add spec for multi conn mode

* feat: add shared subscriptions called information

* use IsConnectionOpen to determine connected state

* refactor: use execOptWithState to read internalState in multiClientInfo
  • Loading branch information
ajatprabha authored Feb 22, 2024
1 parent fcaba40 commit 6e2933c
Show file tree
Hide file tree
Showing 12 changed files with 598 additions and 21 deletions.
41 changes: 31 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Client struct {
rndPool *sync.Pool
clientMu sync.RWMutex
subMu sync.RWMutex

stopInfoEmitter context.CancelFunc
}

// NewClient creates the Client struct with the clientOptions provided,
Expand All @@ -54,6 +56,10 @@ func NewClient(opts ...ClientOption) (*Client, error) {
return nil, fmt.Errorf("at least WithAddress or WithResolver ClientOption should be used")
}

if co.infoEmitterCfg != nil && co.infoEmitterCfg.Emitter != nil && co.infoEmitterCfg.Interval.Seconds() < 1 {
return nil, fmt.Errorf("client info emitter interval must be greater than or equal to 1s")
}

c := &Client{
options: co,
subscriptions: map[string]*subscriptionMeta{},
Expand Down Expand Up @@ -89,15 +95,11 @@ func (c *Client) IsConnected() bool {

// Start will attempt to connect to the broker.
func (c *Client) Start() error {
if err := c.runConnect(); err != nil {
return err
}

if c.options.resolver != nil {
return c.runResolver()
}

return nil
return c.runConnect()
}

// Stop will disconnect from the broker and finish up any pending work on internal
Expand Down Expand Up @@ -128,6 +130,10 @@ func (c *Client) stop() error {
return nil
}, execAll)

if c.stopInfoEmitter != nil {
c.stopInfoEmitter()
}

if err == nil {
c.clientMu.Lock()
defer c.clientMu.Unlock()
Expand All @@ -139,6 +145,15 @@ func (c *Client) stop() error {
return err
}

func (c *Client) handleInfoEmitter() {
if c.options.infoEmitterCfg != nil && c.options.infoEmitterCfg.Emitter != nil {
ctx, cancel := context.WithCancel(context.Background())
c.stopInfoEmitter = cancel

go c.runBrokerInfoEmitter(ctx)
}
}

func (c *Client) handleToken(ctx context.Context, t mqtt.Token, timeoutErr error) error {
if err := c.waitForToken(ctx, t, timeoutErr); err != nil {
return err
Expand Down Expand Up @@ -179,24 +194,30 @@ func (c *Client) runResolver() error {
}
}

c.handleInfoEmitter()

go c.watchAddressUpdates(c.options.resolver)

return nil
}

func (c *Client) runConnect() error {
if len(c.options.brokerAddress) == 0 {
return nil
}

return c.execute(func(cc mqtt.Client) error {
err := c.execute(func(cc mqtt.Client) error {
t := cc.Connect()
if !t.WaitTimeout(c.options.connectTimeout) {
return ErrConnectTimeout
}

return t.Error()
}, execAll)

if err != nil {
return err
}

c.handleInfoEmitter()

return nil
}

func (c *Client) attemptSingleConnection(addrs []TCPAddress) error {
Expand Down
1 change: 1 addition & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ type clientOptions struct {
onReconnectHandler OnReconnectHandler
sharedSubscriptionPredicate SharedSubscriptionPredicate
logger Logger
infoEmitterCfg *ClientInfoEmitterConfig

newEncoder EncoderFunc
newDecoder DecoderFunc
Expand Down
3 changes: 2 additions & 1 deletion client_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"sync"
"testing"
Expand Down Expand Up @@ -260,7 +261,7 @@ func (s *ClientPublishSuite) TestPublishWithMultiConnectionMode() {
tba := testBrokerAddress
tba.Port = uint16(1883 + i)

clients[tba.String()] = mc
clients[fmt.Sprintf("%s-%d", tba.String(), ii)] = mc

mcks = append(mcks, mc, mt)
}
Expand Down
4 changes: 2 additions & 2 deletions client_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

// TCPAddress specifies Host and Port for remote broker
type TCPAddress struct {
Host string
Port uint16
Host string `json:"host"`
Port uint16 `json:"port"`
}

func (t TCPAddress) String() string { return fmt.Sprintf("%s:%d", t.Host, t.Port) }
Expand Down
157 changes: 157 additions & 0 deletions client_telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package courier

import (
"net/url"
"sort"
"strconv"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/gojekfarm/xtools/generic/slice"
"github.com/gojekfarm/xtools/generic/xmap"
)

// MQTTClientInfo contains information about the internal MQTT client
type MQTTClientInfo struct {
Addresses []TCPAddress `json:"addresses"`
ClientID string `json:"client_id"`
Username string `json:"username"`
ResumeSubs bool `json:"resume_subs"`
CleanSession bool `json:"clean_session"`
AutoReconnect bool `json:"auto_reconnect"`
Connected bool `json:"connected"`
// Subscriptions contains the topics the client is subscribed to
// Note: Currently, this field only holds shared subscriptions.
Subscriptions []string `json:"subscriptions,omitempty"`
}

type infoResponse struct {
MultiConnMode bool `json:"multi"`
Clients []MQTTClientInfo `json:"clients,omitempty"`
Subscriptions map[string]QOSLevel `json:"subscriptions,omitempty"`
}

func (c *Client) infoResponse() *infoResponse {
subs := c.readSubscriptionMeta()
ci := c.clientInfo()

return &infoResponse{
MultiConnMode: c.options.multiConnectionMode,
Clients: ci,
Subscriptions: subs,
}
}

func (c *Client) clientInfo() []MQTTClientInfo {
if c.options.multiConnectionMode {
return c.multiClientInfo()
}

return c.singleClientInfo()
}

func (c *Client) readSubscriptionMeta() map[string]QOSLevel {
c.subMu.RLock()

subs := make(map[string]QOSLevel, len(c.subscriptions))

for topic, sub := range c.subscriptions {
for _, opt := range sub.options {
switch v := opt.(type) {
case QOSLevel:
subs[topic] = v
}
}

// if no QOSLevel Option is provided, default to QOSZero
if _, ok := subs[topic]; !ok {
subs[topic] = QOSZero
}
}

c.subMu.RUnlock()

return subs
}

func (c *Client) multiClientInfo() []MQTTClientInfo {
c.clientMu.RLock()

cls := xmap.Values(c.mqttClients)

if len(cls) == 0 {
c.clientMu.RUnlock()

return nil
}

bCh := make(chan MQTTClientInfo, len(cls))

_ = c.execute(
func(cc mqtt.Client) error { return nil },
execOptWithState(func(f func(mqtt.Client) error, is *internalState) error {
is.mu.Lock()
subs := is.subsCalled.Values()
is.mu.Unlock()

bCh <- transformClientInfo(is.client, subs...)

return f(is.client)
}),
)

c.clientMu.RUnlock()

close(bCh)

bl := make([]MQTTClientInfo, 0, len(bCh))

for b := range bCh {
bl = append(bl, b)
}

sort.Slice(bl, func(i, j int) bool { return bl[i].ClientID < bl[j].ClientID })

return bl
}

func (c *Client) singleClientInfo() []MQTTClientInfo {
c.clientMu.RLock()
defer c.clientMu.RUnlock()

if c.mqttClient == nil {
return nil
}

var bi MQTTClientInfo

_ = c.execute(func(cc mqtt.Client) error {
bi = transformClientInfo(cc)

return nil
}, execAll)

return []MQTTClientInfo{bi}
}

func transformClientInfo(cc mqtt.Client, subscribedTopics ...string) MQTTClientInfo {
opts := cc.OptionsReader()

return MQTTClientInfo{
Addresses: slice.Map(opts.Servers(), func(u *url.URL) TCPAddress {
i, _ := strconv.Atoi(u.Port())

return TCPAddress{
Host: u.Hostname(),
Port: uint16(i),
}
}),
ClientID: opts.ClientID(),
Username: opts.Username(),
ResumeSubs: opts.ResumeSubs(),
CleanSession: opts.CleanSession(),
AutoReconnect: opts.AutoReconnect(),
Connected: cc.IsConnectionOpen(),
Subscriptions: subscribedTopics,
}
}
49 changes: 49 additions & 0 deletions client_telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package courier

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestClient_readSubscriptionMeta(t *testing.T) {
c := &Client{
subMu: sync.RWMutex{},
subscriptions: map[string]*subscriptionMeta{
"topic1": {
topic: "topic1",
options: []Option{QOSOne},
},
"topic2": {
topic: "topic2",
},
"topic3": {
topic: "topic3",
options: []Option{QOSTwo},
},
},
}

subs := c.readSubscriptionMeta()

assert.Equal(t, map[string]QOSLevel{
"topic1": 1,
"topic2": 0,
"topic3": 2,
}, subs)
}

func TestClient_clientInfo(t *testing.T) {
t.Run("single connection mode", func(t *testing.T) {
c := &Client{options: &clientOptions{}}
ci := c.clientInfo()
assert.Equal(t, []MQTTClientInfo(nil), ci)
})

t.Run("multi connection mode", func(t *testing.T) {
c := &Client{options: &clientOptions{multiConnectionMode: true}}
ci := c.clientInfo()
assert.Equal(t, []MQTTClientInfo(nil), ci)
})
}
Loading

0 comments on commit 6e2933c

Please sign in to comment.