Skip to content

Commit

Permalink
add TelemetryHandler for exposing connected brokers information
Browse files Browse the repository at this point in the history
  • Loading branch information
ajatprabha committed Dec 4, 2023
1 parent 69a9d69 commit bd26961
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 1 deletion.
2 changes: 1 addition & 1 deletion client_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (c *Client) multipleClients(addrs []TCPAddress) (map[string]mqtt.Client, er
return err
}

clients.Store(ia.addr.String(), cc)
clients.Store(fmt.Sprintf("%s-%d", ia.addr.String(), ia.index), cc)

return nil
}), accumulateErrors); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions docs/docs/sdk/SDK.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Package courier contains the client that can be used to interact with the courie
- [func \(c \*Client\) Stop\(\)](#Client.Stop)
- [func \(c \*Client\) Subscribe\(ctx context.Context, topic string, callback MessageHandler, opts ...Option\) error](#Client.Subscribe)
- [func \(c \*Client\) SubscribeMultiple\(ctx context.Context, topicsWithQos map\[string\]QOSLevel, callback MessageHandler\) error](#Client.SubscribeMultiple)
- [func \(c \*Client\) TelemetryHandler\(\) http.Handler](#Client.TelemetryHandler)
- [func \(c \*Client\) Unsubscribe\(ctx context.Context, topics ...string\) error](#Client.Unsubscribe)
- [func \(c \*Client\) UsePublisherMiddleware\(mwf ...PublisherMiddlewareFunc\)](#Client.UsePublisherMiddleware)
- [func \(c \*Client\) UseSubscriberMiddleware\(mwf ...SubscriberMiddlewareFunc\)](#Client.UseSubscriberMiddleware)
Expand Down Expand Up @@ -295,6 +296,15 @@ func (c *Client) SubscribeMultiple(ctx context.Context, topicsWithQos map[string

SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker

<a name="Client.TelemetryHandler"></a>
### func \(\*Client\) [TelemetryHandler](https://github.com/gojek/courier-go/blob/main/http.go#L29)

```go
func (c *Client) TelemetryHandler() http.Handler
```

TelemetryHandler returns a http.Handler that exposes the connected brokers information

<a name="Client.Unsubscribe"></a>
### func \(\*Client\) [Unsubscribe](https://github.com/gojek/courier-go/blob/main/client_unsubscribe.go#L10)

Expand Down
101 changes: 101 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package courier

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"sort"
"strconv"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/gojekfarm/xtools/generic/slice"
)

type brokerList []broker

type broker struct {
Addresses []TCPAddress `json:"addresses"`
ClientID string `json:"client_id"`
Username string `json:"username"`
ResumeSubs bool `json:"resume_subs"`
CleanSession bool `json:"clean_session"`
AutoReconnect bool `json:"auto_reconnect"`
Connected bool `json:"connected"`
}

// TelemetryHandler returns a http.Handler that exposes the connected brokers information
func (c *Client) TelemetryHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !c.options.multiConnectionMode {
var bi broker

err := c.execute(func(cc mqtt.Client) error {
bi = brokerInfo(cc)

return nil
}, execAll)

writeResponse(w, brokerList{bi}, err, false)

return
}

var bl brokerList
bCh := make(chan broker, len(c.mqttClients))

err := c.execute(func(cc mqtt.Client) error {
bCh <- brokerInfo(cc)

return nil
}, execAll)

close(bCh)

for b := range bCh {
bl = append(bl, b)
}

sort.Slice(bl, func(i, j int) bool { return bl[i].ClientID < bl[j].ClientID })

writeResponse(w, bl, err, true)
})
}

func writeResponse(w http.ResponseWriter, list brokerList, err error, multi bool) {
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = fmt.Fprintf(w, `{"error": "%s"}`, err.Error())

return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"multi": multi,
"brokers": list,
})
}

func brokerInfo(cc mqtt.Client) broker {
opts := cc.OptionsReader()

return broker{
Addresses: slice.Map(opts.Servers(), func(u *url.URL) TCPAddress {
i, _ := strconv.Atoi(u.Port())

return TCPAddress{
Host: u.Hostname(),
Port: uint16(i),
}
}),
ClientID: opts.ClientID(),
Username: opts.Username(),
ResumeSubs: opts.ResumeSubs(),
CleanSession: opts.CleanSession(),
AutoReconnect: opts.AutoReconnect(),
Connected: cc.IsConnected(),
}
}
95 changes: 95 additions & 0 deletions http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package courier

import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestClient_TelemetryHandler(t *testing.T) {
tests := []struct {
name string
status int
body string
opts func(*testing.T) []ClientOption
}{
{
name: "single connection mode",
status: http.StatusOK,
opts: func(t *testing.T) []ClientOption { return nil },
body: fmt.Sprintf(`{"brokers":[{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}],"multi":false}
`, testBrokerAddress.Host, testBrokerAddress.Port),
},
{
name: "multi connection mode",
status: http.StatusOK,
opts: func(t *testing.T) []ClientOption {
ch := make(chan []TCPAddress, 1)
dCh := make(chan struct{})
mr := newMockResolver(t)

mr.On("UpdateChan").Return(ch)
mr.On("Done").Return(dCh)

go func() {
ch <- []TCPAddress{testBrokerAddress, testBrokerAddress}

<-time.After(2 * time.Second)
close(ch)

dCh <- struct{}{}
}()

return []ClientOption{
WithResolver(mr),
UseMultiConnectionMode,
}
},
body: fmt.Sprintf(`{"brokers":[{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID-0-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true},{"addresses":[{"Host":"%s","Port":%d}],"client_id":"clientID-1-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}],"multi":true}
`, testBrokerAddress.Host, testBrokerAddress.Port, testBrokerAddress.Host, testBrokerAddress.Port),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eOpts := tt.opts(t)

c, err := NewClient(append(defOpts, eOpts...)...)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
_ = c.Run(ctx)
}()

h := c.TelemetryHandler()

assert.True(t, WaitForConnection(c, 2*time.Second, 100*time.Millisecond))

req := httptest.NewRequest(http.MethodGet, "/telemetry", nil)
rr := httptest.NewRecorder()

h.ServeHTTP(rr, req)

assert.Equal(t, tt.status, rr.Code)
assert.Equal(t, tt.body, rr.Body.String())
})
}
}

func Test_writeResponse_error(t *testing.T) {
rr := httptest.NewRecorder()

writeResponse(rr, nil, errors.New("error"), false)

assert.Equal(t, http.StatusInternalServerError, rr.Code)
assert.Equal(t, `{"error": "error"}`, rr.Body.String())
}

0 comments on commit bd26961

Please sign in to comment.