Skip to content

Commit

Permalink
add ClientInfoEmitter
Browse files Browse the repository at this point in the history
  • Loading branch information
ajatprabha committed Dec 4, 2023
1 parent d1a422a commit e5485c9
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 111 deletions.
29 changes: 28 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Client struct {
rndPool *sync.Pool
clientMu sync.RWMutex
subMu sync.RWMutex

stopInfoEmitter context.CancelFunc
}

// NewClient creates the Client struct with the clientOptions provided,
Expand All @@ -54,6 +56,10 @@ func NewClient(opts ...ClientOption) (*Client, error) {
return nil, fmt.Errorf("at least WithAddress or WithResolver ClientOption should be used")
}

if co.infoEmitterCfg != nil && co.infoEmitterCfg.Emitter != nil && co.infoEmitterCfg.Interval.Seconds() < 1 {
return nil, fmt.Errorf("client info emitter interval must be greater than or equal to 1s")
}

c := &Client{
options: co,
subscriptions: map[string]*subscriptionMeta{},
Expand Down Expand Up @@ -97,6 +103,8 @@ func (c *Client) Start() error {
return c.runResolver()
}

c.handleInfoEmitter()

return nil
}

Expand All @@ -122,11 +130,26 @@ func (c *Client) Run(ctx context.Context) error {
}

func (c *Client) stop() error {
return c.execute(func(cc mqtt.Client) error {
err := c.execute(func(cc mqtt.Client) error {
cc.Disconnect(uint(c.options.gracefulShutdownPeriod / time.Millisecond))

return nil
}, execAll)

if c.stopInfoEmitter != nil {
c.stopInfoEmitter()
}

return err
}

func (c *Client) handleInfoEmitter() {
if c.options.infoEmitterCfg != nil && c.options.infoEmitterCfg.Emitter != nil {
ctx, cancel := context.WithCancel(context.Background())
c.stopInfoEmitter = cancel

go c.runBrokerInfoEmitter(ctx)
}
}

func (c *Client) handleToken(ctx context.Context, t mqtt.Token, timeoutErr error) error {
Expand Down Expand Up @@ -169,6 +192,8 @@ func (c *Client) runResolver() error {
}
}

c.handleInfoEmitter()

go c.watchAddressUpdates(c.options.resolver)

return nil
Expand Down Expand Up @@ -272,6 +297,8 @@ func reconnectHandler(client PubSub, o *clientOptions) mqtt.ReconnectHandler {

func connectionLostHandler(o *clientOptions) mqtt.ConnectionLostHandler {
return func(_ mqtt.Client, err error) {
o.logger.Error(context.Background(), err, map[string]any{"handler": "OnConnectionLostHandler"})

if o.onConnectionLostHandler != nil {
o.onConnectionLostHandler(err)
}
Expand Down
1 change: 1 addition & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ type clientOptions struct {
onReconnectHandler OnReconnectHandler
sharedSubscriptionPredicate SharedSubscriptionPredicate
logger Logger
infoEmitterCfg *ClientInfoEmitterConfig

newEncoder EncoderFunc
newDecoder DecoderFunc
Expand Down
89 changes: 89 additions & 0 deletions client_telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package courier

import (
"net/url"
"sort"
"strconv"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/gojekfarm/xtools/generic/slice"
)

// MQTTClientInfo contains information about the internal MQTT client
type MQTTClientInfo struct {
Addresses []TCPAddress `json:"addresses"`
ClientID string `json:"client_id"`
Username string `json:"username"`
ResumeSubs bool `json:"resume_subs"`
CleanSession bool `json:"clean_session"`
AutoReconnect bool `json:"auto_reconnect"`
Connected bool `json:"connected"`
}

type clientIntoList []MQTTClientInfo

func (c *Client) allClientInfo() clientIntoList {
if c.options.multiConnectionMode {
return c.multiClientInfo()
}

return c.singleClientInfo()
}

func (c *Client) multiClientInfo() clientIntoList {
c.clientMu.RLock()
bCh := make(chan MQTTClientInfo, len(c.mqttClients))
c.clientMu.RUnlock()

_ = c.execute(func(cc mqtt.Client) error {
bCh <- transformClientInfo(cc)

return nil
}, execAll)

close(bCh)

bl := make(clientIntoList, 0, len(bCh))

for b := range bCh {
bl = append(bl, b)
}

sort.Slice(bl, func(i, j int) bool { return bl[i].ClientID < bl[j].ClientID })

return bl
}

func (c *Client) singleClientInfo() clientIntoList {
var bi MQTTClientInfo

_ = c.execute(func(cc mqtt.Client) error {
bi = transformClientInfo(cc)

return nil
}, execAll)

return clientIntoList{bi}
}

func transformClientInfo(cc mqtt.Client) MQTTClientInfo {
opts := cc.OptionsReader()

return MQTTClientInfo{
Addresses: slice.Map(opts.Servers(), func(u *url.URL) TCPAddress {
i, _ := strconv.Atoi(u.Port())

return TCPAddress{
Host: u.Hostname(),
Port: uint16(i),
}
}),
ClientID: opts.ClientID(),
Username: opts.Username(),
ResumeSubs: opts.ResumeSubs(),
CleanSession: opts.CleanSession(),
AutoReconnect: opts.AutoReconnect(),
Connected: cc.IsConnected(),
}
}
60 changes: 52 additions & 8 deletions docs/docs/sdk/SDK.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Package courier contains the client that can be used to interact with the courie
- [func \(c \*Client\) UsePublisherMiddleware\(mwf ...PublisherMiddlewareFunc\)](#Client.UsePublisherMiddleware)
- [func \(c \*Client\) UseSubscriberMiddleware\(mwf ...SubscriberMiddlewareFunc\)](#Client.UseSubscriberMiddleware)
- [func \(c \*Client\) UseUnsubscriberMiddleware\(mwf ...UnsubscriberMiddlewareFunc\)](#Client.UseUnsubscriberMiddleware)
- [type ClientInfoEmitter](#ClientInfoEmitter)
- [type ClientInfoEmitterConfig](#ClientInfoEmitterConfig)
- [type ClientOption](#ClientOption)
- [func WithAddress\(host string, port uint16\) ClientOption](#WithAddress)
- [func WithAutoReconnect\(autoReconnect bool\) ClientOption](#WithAutoReconnect)
Expand Down Expand Up @@ -64,6 +66,7 @@ Package courier contains the client that can be used to interact with the courie
- [func DefaultEncoderFunc\(\_ context.Context, w io.Writer\) Encoder](#DefaultEncoderFunc)
- [type EncoderFunc](#EncoderFunc)
- [type Logger](#Logger)
- [type MQTTClientInfo](#MQTTClientInfo)
- [type Message](#Message)
- [func NewMessageWithDecoder\(payloadDecoder Decoder\) \*Message](#NewMessageWithDecoder)
- [func \(m \*Message\) DecodePayload\(v interface\{\}\) error](#Message.DecodePayload)
Expand Down Expand Up @@ -164,7 +167,7 @@ func WaitForConnection(c ConnectionInformer, waitFor time.Duration, tick time.Du
WaitForConnection checks if the Client is connected, it calls ConnectionInformer.IsConnected after every tick and waitFor is the maximum duration it can block. Returns true only when ConnectionInformer.IsConnected returns true

<a name="Client"></a>
## type [Client](https://github.com/gojek/courier-go/blob/main/client.go#L22-L41)
## type [Client](https://github.com/gojek/courier-go/blob/main/client.go#L22-L43)

Client allows to communicate with an MQTT broker

Expand All @@ -175,7 +178,7 @@ type Client struct {
```

<a name="NewClient"></a>
### func [NewClient](https://github.com/gojek/courier-go/blob/main/client.go#L46)
### func [NewClient](https://github.com/gojek/courier-go/blob/main/client.go#L48)

```go
func NewClient(opts ...ClientOption) (*Client, error)
Expand Down Expand Up @@ -234,7 +237,7 @@ c.Stop()
</details>

<a name="Client.IsConnected"></a>
### func \(\*Client\) [IsConnected](https://github.com/gojek/courier-go/blob/main/client.go#L78)
### func \(\*Client\) [IsConnected](https://github.com/gojek/courier-go/blob/main/client.go#L84)

```go
func (c *Client) IsConnected() bool
Expand All @@ -252,7 +255,7 @@ func (c *Client) Publish(ctx context.Context, topic string, message interface{},
Publish allows to publish messages to an MQTT broker

<a name="Client.Run"></a>
### func \(\*Client\) [Run](https://github.com/gojek/courier-go/blob/main/client.go#L110)
### func \(\*Client\) [Run](https://github.com/gojek/courier-go/blob/main/client.go#L118)

```go
func (c *Client) Run(ctx context.Context) error
Expand All @@ -261,7 +264,7 @@ func (c *Client) Run(ctx context.Context) error
Run will start running the Client. This makes Client compatible with github.com/gojekfarm/xrun package. https://pkg.go.dev/github.com/gojekfarm/xrun

<a name="Client.Start"></a>
### func \(\*Client\) [Start](https://github.com/gojek/courier-go/blob/main/client.go#L91)
### func \(\*Client\) [Start](https://github.com/gojek/courier-go/blob/main/client.go#L97)

```go
func (c *Client) Start() error
Expand All @@ -270,7 +273,7 @@ func (c *Client) Start() error
Start will attempt to connect to the broker.

<a name="Client.Stop"></a>
### func \(\*Client\) [Stop](https://github.com/gojek/courier-go/blob/main/client.go#L106)
### func \(\*Client\) [Stop](https://github.com/gojek/courier-go/blob/main/client.go#L114)

```go
func (c *Client) Stop()
Expand All @@ -297,13 +300,13 @@ func (c *Client) SubscribeMultiple(ctx context.Context, topicsWithQos map[string
SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker

<a name="Client.TelemetryHandler"></a>
### func \(\*Client\) [TelemetryHandler](https://github.com/gojek/courier-go/blob/main/http.go#L29)
### func \(\*Client\) [TelemetryHandler](https://github.com/gojek/courier-go/blob/main/http.go#L9)

```go
func (c *Client) TelemetryHandler() http.Handler
```

TelemetryHandler returns a http.Handler that exposes the connected brokers information
TelemetryHandler returns a http.Handler that exposes the connected clients information

<a name="Client.Unsubscribe"></a>
### func \(\*Client\) [Unsubscribe](https://github.com/gojek/courier-go/blob/main/client_unsubscribe.go#L10)
Expand Down Expand Up @@ -341,6 +344,30 @@ func (c *Client) UseUnsubscriberMiddleware(mwf ...UnsubscriberMiddlewareFunc)

UseUnsubscriberMiddleware appends a UnsubscriberMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip subscriptions. They are executed in the order that they are applied to the Client.

<a name="ClientInfoEmitter"></a>
## type [ClientInfoEmitter](https://github.com/gojek/courier-go/blob/main/metrics.go#L10-L12)

ClientInfoEmitter emits broker info. This can be called concurrently, implementations should be concurrency safe.

```go
type ClientInfoEmitter interface {
Emit(ctx context.Context, info MQTTClientInfo)
}
```

<a name="ClientInfoEmitterConfig"></a>
## type [ClientInfoEmitterConfig](https://github.com/gojek/courier-go/blob/main/metrics.go#L15-L19)

ClientInfoEmitterConfig is used to configure the broker info emitter.

```go
type ClientInfoEmitterConfig struct {
// Interval is the interval at which the broker info emitter emits broker info.
Interval time.Duration
Emitter ClientInfoEmitter
}
```

<a name="ClientOption"></a>
## type [ClientOption](https://github.com/gojek/courier-go/blob/main/client_options.go#L17)

Expand Down Expand Up @@ -686,6 +713,23 @@ type Logger interface {
}
```

<a name="MQTTClientInfo"></a>
## type [MQTTClientInfo](https://github.com/gojek/courier-go/blob/main/client_telemetry.go#L14-L22)

MQTTClientInfo contains information about the internal MQTT client

```go
type MQTTClientInfo struct {
Addresses []TCPAddress `json:"addresses"`
ClientID string `json:"client_id"`
Username string `json:"username"`
ResumeSubs bool `json:"resume_subs"`
CleanSession bool `json:"clean_session"`
AutoReconnect bool `json:"auto_reconnect"`
Connected bool `json:"connected"`
}
```

<a name="Message"></a>
## type [Message](https://github.com/gojek/courier-go/blob/main/message.go#L4-L12)

Expand Down
Loading

0 comments on commit e5485c9

Please sign in to comment.