diff --git a/client.go b/client.go
index 55fa515..00dabff 100644
--- a/client.go
+++ b/client.go
@@ -38,6 +38,8 @@ type Client struct {
rndPool *sync.Pool
clientMu sync.RWMutex
subMu sync.RWMutex
+
+ stopInfoEmitter context.CancelFunc
}
// NewClient creates the Client struct with the clientOptions provided,
@@ -54,6 +56,10 @@ func NewClient(opts ...ClientOption) (*Client, error) {
return nil, fmt.Errorf("at least WithAddress or WithResolver ClientOption should be used")
}
+ if co.infoEmitterCfg != nil && co.infoEmitterCfg.Emitter != nil && co.infoEmitterCfg.Interval.Seconds() < 1 {
+ return nil, fmt.Errorf("client info emitter interval must be greater than or equal to 1s")
+ }
+
c := &Client{
options: co,
subscriptions: map[string]*subscriptionMeta{},
@@ -97,6 +103,8 @@ func (c *Client) Start() error {
return c.runResolver()
}
+ c.handleInfoEmitter()
+
return nil
}
@@ -122,11 +130,26 @@ func (c *Client) Run(ctx context.Context) error {
}
func (c *Client) stop() error {
- return c.execute(func(cc mqtt.Client) error {
+ err := c.execute(func(cc mqtt.Client) error {
cc.Disconnect(uint(c.options.gracefulShutdownPeriod / time.Millisecond))
return nil
}, execAll)
+
+ if c.stopInfoEmitter != nil {
+ c.stopInfoEmitter()
+ }
+
+ return err
+}
+
+func (c *Client) handleInfoEmitter() {
+ if c.options.infoEmitterCfg != nil && c.options.infoEmitterCfg.Emitter != nil {
+ ctx, cancel := context.WithCancel(context.Background())
+ c.stopInfoEmitter = cancel
+
+ go c.runBrokerInfoEmitter(ctx)
+ }
}
func (c *Client) handleToken(ctx context.Context, t mqtt.Token, timeoutErr error) error {
@@ -169,6 +192,8 @@ func (c *Client) runResolver() error {
}
}
+ c.handleInfoEmitter()
+
go c.watchAddressUpdates(c.options.resolver)
return nil
@@ -272,6 +297,8 @@ func reconnectHandler(client PubSub, o *clientOptions) mqtt.ReconnectHandler {
func connectionLostHandler(o *clientOptions) mqtt.ConnectionLostHandler {
return func(_ mqtt.Client, err error) {
+ o.logger.Error(context.Background(), err, map[string]any{"handler": "OnConnectionLostHandler"})
+
if o.onConnectionLostHandler != nil {
o.onConnectionLostHandler(err)
}
diff --git a/client_options.go b/client_options.go
index a5c4a7d..eed4ee4 100644
--- a/client_options.go
+++ b/client_options.go
@@ -236,6 +236,7 @@ type clientOptions struct {
onReconnectHandler OnReconnectHandler
sharedSubscriptionPredicate SharedSubscriptionPredicate
logger Logger
+ infoEmitterCfg *ClientInfoEmitterConfig
newEncoder EncoderFunc
newDecoder DecoderFunc
diff --git a/client_telemetry.go b/client_telemetry.go
new file mode 100644
index 0000000..851b738
--- /dev/null
+++ b/client_telemetry.go
@@ -0,0 +1,89 @@
+package courier
+
+import (
+ "net/url"
+ "sort"
+ "strconv"
+
+ mqtt "github.com/eclipse/paho.mqtt.golang"
+
+ "github.com/gojekfarm/xtools/generic/slice"
+)
+
+// MQTTClientInfo contains information about the internal MQTT client
+type MQTTClientInfo struct {
+ Addresses []TCPAddress `json:"addresses"`
+ ClientID string `json:"client_id"`
+ Username string `json:"username"`
+ ResumeSubs bool `json:"resume_subs"`
+ CleanSession bool `json:"clean_session"`
+ AutoReconnect bool `json:"auto_reconnect"`
+ Connected bool `json:"connected"`
+}
+
+type clientIntoList []MQTTClientInfo
+
+func (c *Client) allClientInfo() clientIntoList {
+ if c.options.multiConnectionMode {
+ return c.multiClientInfo()
+ }
+
+ return c.singleClientInfo()
+}
+
+func (c *Client) multiClientInfo() clientIntoList {
+ c.clientMu.RLock()
+ bCh := make(chan MQTTClientInfo, len(c.mqttClients))
+ c.clientMu.RUnlock()
+
+ _ = c.execute(func(cc mqtt.Client) error {
+ bCh <- transformClientInfo(cc)
+
+ return nil
+ }, execAll)
+
+ close(bCh)
+
+ bl := make(clientIntoList, 0, len(bCh))
+
+ for b := range bCh {
+ bl = append(bl, b)
+ }
+
+ sort.Slice(bl, func(i, j int) bool { return bl[i].ClientID < bl[j].ClientID })
+
+ return bl
+}
+
+func (c *Client) singleClientInfo() clientIntoList {
+ var bi MQTTClientInfo
+
+ _ = c.execute(func(cc mqtt.Client) error {
+ bi = transformClientInfo(cc)
+
+ return nil
+ }, execAll)
+
+ return clientIntoList{bi}
+}
+
+func transformClientInfo(cc mqtt.Client) MQTTClientInfo {
+ opts := cc.OptionsReader()
+
+ return MQTTClientInfo{
+ Addresses: slice.Map(opts.Servers(), func(u *url.URL) TCPAddress {
+ i, _ := strconv.Atoi(u.Port())
+
+ return TCPAddress{
+ Host: u.Hostname(),
+ Port: uint16(i),
+ }
+ }),
+ ClientID: opts.ClientID(),
+ Username: opts.Username(),
+ ResumeSubs: opts.ResumeSubs(),
+ CleanSession: opts.CleanSession(),
+ AutoReconnect: opts.AutoReconnect(),
+ Connected: cc.IsConnected(),
+ }
+}
diff --git a/docs/docs/sdk/SDK.md b/docs/docs/sdk/SDK.md
index d9db37f..cb5b03f 100644
--- a/docs/docs/sdk/SDK.md
+++ b/docs/docs/sdk/SDK.md
@@ -28,6 +28,8 @@ Package courier contains the client that can be used to interact with the courie
- [func \(c \*Client\) UsePublisherMiddleware\(mwf ...PublisherMiddlewareFunc\)](#Client.UsePublisherMiddleware)
- [func \(c \*Client\) UseSubscriberMiddleware\(mwf ...SubscriberMiddlewareFunc\)](#Client.UseSubscriberMiddleware)
- [func \(c \*Client\) UseUnsubscriberMiddleware\(mwf ...UnsubscriberMiddlewareFunc\)](#Client.UseUnsubscriberMiddleware)
+- [type ClientInfoEmitter](#ClientInfoEmitter)
+- [type ClientInfoEmitterConfig](#ClientInfoEmitterConfig)
- [type ClientOption](#ClientOption)
- [func WithAddress\(host string, port uint16\) ClientOption](#WithAddress)
- [func WithAutoReconnect\(autoReconnect bool\) ClientOption](#WithAutoReconnect)
@@ -64,6 +66,7 @@ Package courier contains the client that can be used to interact with the courie
- [func DefaultEncoderFunc\(\_ context.Context, w io.Writer\) Encoder](#DefaultEncoderFunc)
- [type EncoderFunc](#EncoderFunc)
- [type Logger](#Logger)
+- [type MQTTClientInfo](#MQTTClientInfo)
- [type Message](#Message)
- [func NewMessageWithDecoder\(payloadDecoder Decoder\) \*Message](#NewMessageWithDecoder)
- [func \(m \*Message\) DecodePayload\(v interface\{\}\) error](#Message.DecodePayload)
@@ -164,7 +167,7 @@ func WaitForConnection(c ConnectionInformer, waitFor time.Duration, tick time.Du
WaitForConnection checks if the Client is connected, it calls ConnectionInformer.IsConnected after every tick and waitFor is the maximum duration it can block. Returns true only when ConnectionInformer.IsConnected returns true
-## type [Client](https://github.com/gojek/courier-go/blob/main/client.go#L22-L41)
+## type [Client](https://github.com/gojek/courier-go/blob/main/client.go#L22-L43)
Client allows to communicate with an MQTT broker
@@ -175,7 +178,7 @@ type Client struct {
```
-### func [NewClient](https://github.com/gojek/courier-go/blob/main/client.go#L46)
+### func [NewClient](https://github.com/gojek/courier-go/blob/main/client.go#L48)
```go
func NewClient(opts ...ClientOption) (*Client, error)
@@ -234,7 +237,7 @@ c.Stop()
-### func \(\*Client\) [IsConnected](https://github.com/gojek/courier-go/blob/main/client.go#L78)
+### func \(\*Client\) [IsConnected](https://github.com/gojek/courier-go/blob/main/client.go#L84)
```go
func (c *Client) IsConnected() bool
@@ -252,7 +255,7 @@ func (c *Client) Publish(ctx context.Context, topic string, message interface{},
Publish allows to publish messages to an MQTT broker
-### func \(\*Client\) [Run](https://github.com/gojek/courier-go/blob/main/client.go#L110)
+### func \(\*Client\) [Run](https://github.com/gojek/courier-go/blob/main/client.go#L118)
```go
func (c *Client) Run(ctx context.Context) error
@@ -261,7 +264,7 @@ func (c *Client) Run(ctx context.Context) error
Run will start running the Client. This makes Client compatible with github.com/gojekfarm/xrun package. https://pkg.go.dev/github.com/gojekfarm/xrun
-### func \(\*Client\) [Start](https://github.com/gojek/courier-go/blob/main/client.go#L91)
+### func \(\*Client\) [Start](https://github.com/gojek/courier-go/blob/main/client.go#L97)
```go
func (c *Client) Start() error
@@ -270,7 +273,7 @@ func (c *Client) Start() error
Start will attempt to connect to the broker.
-### func \(\*Client\) [Stop](https://github.com/gojek/courier-go/blob/main/client.go#L106)
+### func \(\*Client\) [Stop](https://github.com/gojek/courier-go/blob/main/client.go#L114)
```go
func (c *Client) Stop()
@@ -297,13 +300,13 @@ func (c *Client) SubscribeMultiple(ctx context.Context, topicsWithQos map[string
SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker
-### func \(\*Client\) [TelemetryHandler](https://github.com/gojek/courier-go/blob/main/http.go#L29)
+### func \(\*Client\) [TelemetryHandler](https://github.com/gojek/courier-go/blob/main/http.go#L9)
```go
func (c *Client) TelemetryHandler() http.Handler
```
-TelemetryHandler returns a http.Handler that exposes the connected brokers information
+TelemetryHandler returns a http.Handler that exposes the connected clients information
### func \(\*Client\) [Unsubscribe](https://github.com/gojek/courier-go/blob/main/client_unsubscribe.go#L10)
@@ -341,6 +344,30 @@ func (c *Client) UseUnsubscriberMiddleware(mwf ...UnsubscriberMiddlewareFunc)
UseUnsubscriberMiddleware appends a UnsubscriberMiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip subscriptions. They are executed in the order that they are applied to the Client.
+
+## type [ClientInfoEmitter](https://github.com/gojek/courier-go/blob/main/metrics.go#L10-L12)
+
+ClientInfoEmitter emits broker info. This can be called concurrently, implementations should be concurrency safe.
+
+```go
+type ClientInfoEmitter interface {
+ Emit(ctx context.Context, info MQTTClientInfo)
+}
+```
+
+
+## type [ClientInfoEmitterConfig](https://github.com/gojek/courier-go/blob/main/metrics.go#L15-L19)
+
+ClientInfoEmitterConfig is used to configure the broker info emitter.
+
+```go
+type ClientInfoEmitterConfig struct {
+ // Interval is the interval at which the broker info emitter emits broker info.
+ Interval time.Duration
+ Emitter ClientInfoEmitter
+}
+```
+
## type [ClientOption](https://github.com/gojek/courier-go/blob/main/client_options.go#L17)
@@ -686,6 +713,23 @@ type Logger interface {
}
```
+
+## type [MQTTClientInfo](https://github.com/gojek/courier-go/blob/main/client_telemetry.go#L14-L22)
+
+MQTTClientInfo contains information about the internal MQTT client
+
+```go
+type MQTTClientInfo struct {
+ Addresses []TCPAddress `json:"addresses"`
+ ClientID string `json:"client_id"`
+ Username string `json:"username"`
+ ResumeSubs bool `json:"resume_subs"`
+ CleanSession bool `json:"clean_session"`
+ AutoReconnect bool `json:"auto_reconnect"`
+ Connected bool `json:"connected"`
+}
+```
+
## type [Message](https://github.com/gojek/courier-go/blob/main/message.go#L4-L12)
diff --git a/http.go b/http.go
index 2cea2a8..e4ad159 100644
--- a/http.go
+++ b/http.go
@@ -2,100 +2,19 @@ package courier
import (
"encoding/json"
- "fmt"
"net/http"
- "net/url"
- "sort"
- "strconv"
-
- mqtt "github.com/eclipse/paho.mqtt.golang"
-
- "github.com/gojekfarm/xtools/generic/slice"
)
-type brokerList []broker
-
-type broker struct {
- Addresses []TCPAddress `json:"addresses"`
- ClientID string `json:"client_id"`
- Username string `json:"username"`
- ResumeSubs bool `json:"resume_subs"`
- CleanSession bool `json:"clean_session"`
- AutoReconnect bool `json:"auto_reconnect"`
- Connected bool `json:"connected"`
-}
-
-// TelemetryHandler returns a http.Handler that exposes the connected brokers information
+// TelemetryHandler returns a http.Handler that exposes the connected clients information
func (c *Client) TelemetryHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if !c.options.multiConnectionMode {
- var bi broker
-
- err := c.execute(func(cc mqtt.Client) error {
- bi = brokerInfo(cc)
-
- return nil
- }, execAll)
-
- writeResponse(w, brokerList{bi}, err, false)
-
- return
- }
-
- var bl brokerList
- bCh := make(chan broker, len(c.mqttClients))
-
- err := c.execute(func(cc mqtt.Client) error {
- bCh <- brokerInfo(cc)
-
- return nil
- }, execAll)
-
- close(bCh)
-
- for b := range bCh {
- bl = append(bl, b)
- }
-
- sort.Slice(bl, func(i, j int) bool { return bl[i].ClientID < bl[j].ClientID })
-
- writeResponse(w, bl, err, true)
- })
-}
-
-func writeResponse(w http.ResponseWriter, list brokerList, err error, multi bool) {
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- _, _ = fmt.Fprintf(w, `{"error": "%s"}`, err.Error())
-
- return
- }
-
- w.WriteHeader(http.StatusOK)
- w.Header().Set("Content-Type", "application/json")
- _ = json.NewEncoder(w).Encode(map[string]any{
- "multi": multi,
- "brokers": list,
+ cl := c.allClientInfo()
+
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode(map[string]any{
+ "multi": c.options.multiConnectionMode,
+ "clients": cl,
+ })
})
}
-
-func brokerInfo(cc mqtt.Client) broker {
- opts := cc.OptionsReader()
-
- return broker{
- Addresses: slice.Map(opts.Servers(), func(u *url.URL) TCPAddress {
- i, _ := strconv.Atoi(u.Port())
-
- return TCPAddress{
- Host: u.Hostname(),
- Port: uint16(i),
- }
- }),
- ClientID: opts.ClientID(),
- Username: opts.Username(),
- ResumeSubs: opts.ResumeSubs(),
- CleanSession: opts.CleanSession(),
- AutoReconnect: opts.AutoReconnect(),
- Connected: cc.IsConnected(),
- }
-}
diff --git a/http_test.go b/http_test.go
index 5d1c576..eb13ba5 100644
--- a/http_test.go
+++ b/http_test.go
@@ -2,7 +2,6 @@ package courier
import (
"context"
- "errors"
"fmt"
"net/http"
"net/http/httptest"
@@ -23,7 +22,7 @@ func TestClient_TelemetryHandler(t *testing.T) {
name: "single connection mode",
status: http.StatusOK,
opts: func(t *testing.T) []ClientOption { return nil },
- body: fmt.Sprintf(`{"brokers":[{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}],"multi":false}
+ body: fmt.Sprintf(`{"clients":[{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}],"multi":false}
`, testBrokerAddress.Host, testBrokerAddress.Port),
},
{
@@ -51,7 +50,7 @@ func TestClient_TelemetryHandler(t *testing.T) {
UseMultiConnectionMode,
}
},
- body: fmt.Sprintf(`{"brokers":[{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID-0-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true},{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID-1-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}],"multi":true}
+ body: fmt.Sprintf(`{"clients":[{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID-0-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true},{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID-1-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}],"multi":true}
`, testBrokerAddress.Host, testBrokerAddress.Port, testBrokerAddress.Host, testBrokerAddress.Port),
},
}
@@ -84,12 +83,3 @@ func TestClient_TelemetryHandler(t *testing.T) {
})
}
}
-
-func Test_writeResponse_error(t *testing.T) {
- rr := httptest.NewRecorder()
-
- writeResponse(rr, nil, errors.New("error"), false)
-
- assert.Equal(t, http.StatusInternalServerError, rr.Code)
- assert.Equal(t, `{"error": "error"}`, rr.Body.String())
-}
diff --git a/metrics.go b/metrics.go
new file mode 100644
index 0000000..e620dee
--- /dev/null
+++ b/metrics.go
@@ -0,0 +1,41 @@
+package courier
+
+import (
+ "context"
+ "time"
+)
+
+// ClientInfoEmitter emits broker info.
+// This can be called concurrently, implementations should be concurrency safe.
+type ClientInfoEmitter interface {
+ Emit(ctx context.Context, info MQTTClientInfo)
+}
+
+// ClientInfoEmitterConfig is used to configure the broker info emitter.
+type ClientInfoEmitterConfig struct {
+ // Interval is the interval at which the broker info emitter emits broker info.
+ Interval time.Duration
+ Emitter ClientInfoEmitter
+}
+
+func (cfg *ClientInfoEmitterConfig) apply(o *clientOptions) { o.infoEmitterCfg = cfg }
+
+func (c *Client) runBrokerInfoEmitter(ctx context.Context) {
+ tick := time.NewTicker(c.options.infoEmitterCfg.Interval)
+ defer tick.Stop()
+
+ em := c.options.infoEmitterCfg.Emitter
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tick.C:
+ cl := c.allClientInfo()
+
+ for _, ci := range cl {
+ go em.Emit(ctx, ci)
+ }
+ }
+ }
+}
diff --git a/metrics_test.go b/metrics_test.go
new file mode 100644
index 0000000..4d37ae9
--- /dev/null
+++ b/metrics_test.go
@@ -0,0 +1,116 @@
+package courier
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+)
+
+type mockEmitter struct {
+ mock.Mock
+}
+
+func newMockEmitter(t *testing.T) *mockEmitter {
+ m := &mockEmitter{}
+ m.Test(t)
+ return m
+}
+
+func (m *mockEmitter) Emit(ctx context.Context, info MQTTClientInfo) { m.Called(ctx, info) }
+
+func TestClient_ClientInfoEmitter(t *testing.T) {
+ tests := []struct {
+ name string
+ mock func(*sync.WaitGroup, *mock.Mock)
+ opts func(*testing.T) []ClientOption
+ }{
+ {
+ name: "single connection mode",
+ opts: func(t *testing.T) []ClientOption { return nil },
+ mock: func(wg *sync.WaitGroup, m *mock.Mock) {
+ wg.Add(1)
+
+ m.On("Emit", mock.Anything, mock.Anything).Return().Run(func(args mock.Arguments) {
+ wg.Done()
+ }).Once()
+ },
+ },
+ {
+ name: "multi connection mode",
+ opts: func(t *testing.T) []ClientOption {
+ ch := make(chan []TCPAddress, 1)
+ dCh := make(chan struct{})
+ mr := newMockResolver(t)
+
+ mr.On("UpdateChan").Return(ch)
+ mr.On("Done").Return(dCh)
+
+ go func() {
+ ch <- []TCPAddress{testBrokerAddress, testBrokerAddress}
+
+ <-time.After(2 * time.Second)
+ close(ch)
+
+ dCh <- struct{}{}
+ }()
+
+ return []ClientOption{
+ WithResolver(mr),
+ UseMultiConnectionMode,
+ }
+ },
+ mock: func(wg *sync.WaitGroup, m *mock.Mock) {
+ wg.Add(2)
+
+ m.On("Emit", mock.Anything, mock.Anything).Return().Run(func(args mock.Arguments) {
+ wg.Done()
+ }).Twice()
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ eOpts := tt.opts(t)
+ mc := newMockEmitter(t)
+ eOpts = append(eOpts, &ClientInfoEmitterConfig{
+ Interval: time.Second,
+ Emitter: mc,
+ })
+
+ wg := &sync.WaitGroup{}
+ if tt.mock != nil {
+ tt.mock(wg, &mc.Mock)
+ }
+
+ c, err := NewClient(append(defOpts, eOpts...)...)
+ assert.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ go func() {
+ _ = c.Run(ctx)
+ }()
+
+ assert.True(t, WaitForConnection(c, 2*time.Second, 100*time.Millisecond))
+
+ wg.Wait()
+
+ cancel()
+
+ mc.AssertExpectations(t)
+ })
+ }
+
+ t.Run("NewClientWithLessThanOneSecondEmitterIntervalError", func(t *testing.T) {
+ _, err := NewClient(append(defOpts, &ClientInfoEmitterConfig{
+ Interval: 100 * time.Millisecond,
+ Emitter: newMockEmitter(t),
+ })...)
+ assert.EqualError(t, err, "client info emitter interval must be greater than or equal to 1s")
+ })
+}