Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add InfoHandler and ClientInfoEmitter for exposing connected clients information #34

Merged
merged 12 commits into from
Feb 22, 2024
41 changes: 31 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Client struct {
rndPool *sync.Pool
clientMu sync.RWMutex
subMu sync.RWMutex

stopInfoEmitter context.CancelFunc
}

// NewClient creates the Client struct with the clientOptions provided,
Expand All @@ -54,6 +56,10 @@ func NewClient(opts ...ClientOption) (*Client, error) {
return nil, fmt.Errorf("at least WithAddress or WithResolver ClientOption should be used")
}

if co.infoEmitterCfg != nil && co.infoEmitterCfg.Emitter != nil && co.infoEmitterCfg.Interval.Seconds() < 1 {
return nil, fmt.Errorf("client info emitter interval must be greater than or equal to 1s")
}

c := &Client{
options: co,
subscriptions: map[string]*subscriptionMeta{},
Expand Down Expand Up @@ -89,15 +95,11 @@ func (c *Client) IsConnected() bool {

// Start will attempt to connect to the broker.
func (c *Client) Start() error {
if err := c.runConnect(); err != nil {
return err
}

if c.options.resolver != nil {
return c.runResolver()
}

return nil
return c.runConnect()
}

// Stop will disconnect from the broker and finish up any pending work on internal
Expand Down Expand Up @@ -128,6 +130,10 @@ func (c *Client) stop() error {
return nil
}, execAll)

if c.stopInfoEmitter != nil {
c.stopInfoEmitter()
}

if err == nil {
c.clientMu.Lock()
defer c.clientMu.Unlock()
Expand All @@ -139,6 +145,15 @@ func (c *Client) stop() error {
return err
}

func (c *Client) handleInfoEmitter() {
if c.options.infoEmitterCfg != nil && c.options.infoEmitterCfg.Emitter != nil {
ctx, cancel := context.WithCancel(context.Background())
c.stopInfoEmitter = cancel

go c.runBrokerInfoEmitter(ctx)
}
}

func (c *Client) handleToken(ctx context.Context, t mqtt.Token, timeoutErr error) error {
if err := c.waitForToken(ctx, t, timeoutErr); err != nil {
return err
Expand Down Expand Up @@ -179,24 +194,30 @@ func (c *Client) runResolver() error {
}
}

c.handleInfoEmitter()

go c.watchAddressUpdates(c.options.resolver)

return nil
}

func (c *Client) runConnect() error {
if len(c.options.brokerAddress) == 0 {
return nil
}

return c.execute(func(cc mqtt.Client) error {
err := c.execute(func(cc mqtt.Client) error {
t := cc.Connect()
if !t.WaitTimeout(c.options.connectTimeout) {
return ErrConnectTimeout
}

return t.Error()
}, execAll)

if err != nil {
return err
}

c.handleInfoEmitter()

return nil
}

func (c *Client) attemptSingleConnection(addrs []TCPAddress) error {
Expand Down
1 change: 1 addition & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ type clientOptions struct {
onReconnectHandler OnReconnectHandler
sharedSubscriptionPredicate SharedSubscriptionPredicate
logger Logger
infoEmitterCfg *ClientInfoEmitterConfig

newEncoder EncoderFunc
newDecoder DecoderFunc
Expand Down
3 changes: 2 additions & 1 deletion client_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"sync"
"testing"
Expand Down Expand Up @@ -260,7 +261,7 @@ func (s *ClientPublishSuite) TestPublishWithMultiConnectionMode() {
tba := testBrokerAddress
tba.Port = uint16(1883 + i)

clients[tba.String()] = mc
clients[fmt.Sprintf("%s-%d", tba.String(), ii)] = mc

mcks = append(mcks, mc, mt)
}
Expand Down
4 changes: 2 additions & 2 deletions client_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

// TCPAddress specifies Host and Port for remote broker
type TCPAddress struct {
Host string
Port uint16
Host string `json:"host"`
Port uint16 `json:"port"`
}

func (t TCPAddress) String() string { return fmt.Sprintf("%s:%d", t.Host, t.Port) }
Expand Down
157 changes: 157 additions & 0 deletions client_telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package courier

import (
"net/url"
"sort"
"strconv"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/gojekfarm/xtools/generic/slice"
"github.com/gojekfarm/xtools/generic/xmap"
)

// MQTTClientInfo contains information about the internal MQTT client
type MQTTClientInfo struct {
Addresses []TCPAddress `json:"addresses"`
ClientID string `json:"client_id"`
Username string `json:"username"`
ResumeSubs bool `json:"resume_subs"`
CleanSession bool `json:"clean_session"`
AutoReconnect bool `json:"auto_reconnect"`
Connected bool `json:"connected"`
// Subscriptions contains the topics the client is subscribed to
// Note: Currently, this field only holds shared subscriptions.
Subscriptions []string `json:"subscriptions,omitempty"`
}

type infoResponse struct {
MultiConnMode bool `json:"multi"`
Clients []MQTTClientInfo `json:"clients,omitempty"`
Subscriptions map[string]QOSLevel `json:"subscriptions,omitempty"`
}

func (c *Client) infoResponse() *infoResponse {
subs := c.readSubscriptionMeta()
ci := c.clientInfo()

return &infoResponse{
MultiConnMode: c.options.multiConnectionMode,
Clients: ci,
Subscriptions: subs,
}
}

func (c *Client) clientInfo() []MQTTClientInfo {
if c.options.multiConnectionMode {
return c.multiClientInfo()
}

return c.singleClientInfo()
}

func (c *Client) readSubscriptionMeta() map[string]QOSLevel {
c.subMu.RLock()

subs := make(map[string]QOSLevel, len(c.subscriptions))

for topic, sub := range c.subscriptions {
for _, opt := range sub.options {
switch v := opt.(type) {
case QOSLevel:
subs[topic] = v
}
}

// if no QOSLevel Option is provided, default to QOSZero
if _, ok := subs[topic]; !ok {
subs[topic] = QOSZero
}
}

c.subMu.RUnlock()

return subs
}

func (c *Client) multiClientInfo() []MQTTClientInfo {
c.clientMu.RLock()

cls := xmap.Values(c.mqttClients)

if len(cls) == 0 {
c.clientMu.RUnlock()

return nil
}

bCh := make(chan MQTTClientInfo, len(cls))

_ = c.execute(
func(cc mqtt.Client) error { return nil },
execOptWithState(func(f func(mqtt.Client) error, is *internalState) error {
is.mu.Lock()
subs := is.subsCalled.Values()
is.mu.Unlock()

bCh <- transformClientInfo(is.client, subs...)

return f(is.client)
}),
)

c.clientMu.RUnlock()

close(bCh)

bl := make([]MQTTClientInfo, 0, len(bCh))

for b := range bCh {
bl = append(bl, b)
}

sort.Slice(bl, func(i, j int) bool { return bl[i].ClientID < bl[j].ClientID })

return bl
}

func (c *Client) singleClientInfo() []MQTTClientInfo {
c.clientMu.RLock()
defer c.clientMu.RUnlock()

if c.mqttClient == nil {
return nil
}

var bi MQTTClientInfo

_ = c.execute(func(cc mqtt.Client) error {
bi = transformClientInfo(cc)

return nil
}, execAll)

return []MQTTClientInfo{bi}
}

func transformClientInfo(cc mqtt.Client, subscribedTopics ...string) MQTTClientInfo {
opts := cc.OptionsReader()

return MQTTClientInfo{
Addresses: slice.Map(opts.Servers(), func(u *url.URL) TCPAddress {
i, _ := strconv.Atoi(u.Port())

return TCPAddress{
Host: u.Hostname(),
Port: uint16(i),
}
}),
ClientID: opts.ClientID(),
Username: opts.Username(),
ResumeSubs: opts.ResumeSubs(),
CleanSession: opts.CleanSession(),
AutoReconnect: opts.AutoReconnect(),
Connected: cc.IsConnectionOpen(),
Subscriptions: subscribedTopics,
}
}
49 changes: 49 additions & 0 deletions client_telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package courier

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestClient_readSubscriptionMeta(t *testing.T) {
c := &Client{
subMu: sync.RWMutex{},
subscriptions: map[string]*subscriptionMeta{
"topic1": {
topic: "topic1",
options: []Option{QOSOne},
},
"topic2": {
topic: "topic2",
},
"topic3": {
topic: "topic3",
options: []Option{QOSTwo},
},
},
}

subs := c.readSubscriptionMeta()

assert.Equal(t, map[string]QOSLevel{
"topic1": 1,
"topic2": 0,
"topic3": 2,
}, subs)
}

func TestClient_clientInfo(t *testing.T) {
t.Run("single connection mode", func(t *testing.T) {
c := &Client{options: &clientOptions{}}
ci := c.clientInfo()
assert.Equal(t, []MQTTClientInfo(nil), ci)
})

t.Run("multi connection mode", func(t *testing.T) {
c := &Client{options: &clientOptions{multiConnectionMode: true}}
ci := c.clientInfo()
assert.Equal(t, []MQTTClientInfo(nil), ci)
})
}
Loading
Loading