Skip to content

Commit

Permalink
feat: add shared subscriptions called information
Browse files Browse the repository at this point in the history
  • Loading branch information
ajatprabha committed Feb 7, 2024
1 parent f974fc1 commit 670f8a7
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 6 deletions.
26 changes: 23 additions & 3 deletions client_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type MQTTClientInfo struct {
CleanSession bool `json:"clean_session"`
AutoReconnect bool `json:"auto_reconnect"`
Connected bool `json:"connected"`
// Subscriptions contains the topics the client is subscribed to
// Note: Currently, this field only holds shared subscriptions.
Subscriptions []string `json:"subscriptions,omitempty"`
}

type infoResponse struct {
Expand Down Expand Up @@ -73,13 +76,29 @@ func (c *Client) readSubscriptionMeta() map[string]QOSLevel {
func (c *Client) multiClientInfo() []MQTTClientInfo {
c.clientMu.RLock()

if len(c.mqttClients) == 0 {
cls := c.mqttClients

if len(cls) == 0 {
c.clientMu.RUnlock()

return nil
}

bCh := make(chan MQTTClientInfo, len(c.mqttClients))
bCh := make(chan MQTTClientInfo, len(cls))

_ = c.execute(func(cc mqtt.Client) error {
for _, is := range cls {
if is.client == cc {
is.mu.Lock()
subs := is.subsCalled.Values()
is.mu.Unlock()

bCh <- transformClientInfo(cc, subs...)

return nil
}
}

bCh <- transformClientInfo(cc)

return nil
Expand Down Expand Up @@ -119,7 +138,7 @@ func (c *Client) singleClientInfo() []MQTTClientInfo {
return []MQTTClientInfo{bi}
}

func transformClientInfo(cc mqtt.Client) MQTTClientInfo {
func transformClientInfo(cc mqtt.Client, subscribedTopics ...string) MQTTClientInfo {
opts := cc.OptionsReader()

return MQTTClientInfo{
Expand All @@ -137,5 +156,6 @@ func transformClientInfo(cc mqtt.Client) MQTTClientInfo {
CleanSession: opts.CleanSession(),
AutoReconnect: opts.AutoReconnect(),
Connected: cc.IsConnected(),
Subscriptions: subscribedTopics,
}
}
5 changes: 4 additions & 1 deletion docs/docs/sdk/SDK.md
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ type Logger interface {
```

<a name="MQTTClientInfo"></a>
## type [MQTTClientInfo](https://github.com/gojek/courier-go/blob/main/client_telemetry.go#L14-L22)
## type [MQTTClientInfo](https://github.com/gojek/courier-go/blob/main/client_telemetry.go#L14-L25)

MQTTClientInfo contains information about the internal MQTT client

Expand All @@ -767,6 +767,9 @@ type MQTTClientInfo struct {
CleanSession bool `json:"clean_session"`
AutoReconnect bool `json:"auto_reconnect"`
Connected bool `json:"connected"`
// Subscriptions contains the topics the client is subscribed to
// Note: Currently, this field only holds shared subscriptions.
Subscriptions []string `json:"subscriptions,omitempty"`
}
```

Expand Down
1 change: 1 addition & 0 deletions exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
var errInvalidExecOpt = errors.New("courier: invalid exec option")

type internalState struct {
// currently holds only shared subscriptions info
subsCalled generic.Set[string]
client mqtt.Client
mu sync.Mutex
Expand Down
11 changes: 9 additions & 2 deletions http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestClient_TelemetryHandler(t *testing.T) {
name: "single connection mode",
status: http.StatusOK,
opts: func(t *testing.T) []ClientOption { return nil },
body: fmt.Sprintf(`{"multi":false,"clients":[{"addresses":[{"host":"%s","port":%d}],"client_id":"clientID","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}]}
body: fmt.Sprintf(`{"multi":false,"clients":[{"addresses":[{"host":"%s","port":%d}],"client_id":"clientID","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}],"subscriptions":{"$share/test/test-topic":0}}
`, testBrokerAddress.Host, testBrokerAddress.Port),
},
{
Expand Down Expand Up @@ -50,7 +50,7 @@ func TestClient_TelemetryHandler(t *testing.T) {
UseMultiConnectionMode,
}
},
body: fmt.Sprintf(`{"multi":true,"clients":[{"addresses":[{"host":"%s","port":%d}],"client_id":"clientID-0-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true},{"addresses":[{"host":"%s","port":%d}],"client_id":"clientID-1-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true}]}
body: fmt.Sprintf(`{"multi":true,"clients":[{"addresses":[{"host":"%s","port":%d}],"client_id":"clientID-0-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true,"subscriptions":["$share/test/test-topic"]},{"addresses":[{"host":"%s","port":%d}],"client_id":"clientID-1-1","username":"","resume_subs":false,"clean_session":false,"auto_reconnect":true,"connected":true,"subscriptions":["$share/test/test-topic"]}],"subscriptions":{"$share/test/test-topic":0}}
`, testBrokerAddress.Host, testBrokerAddress.Port, testBrokerAddress.Host, testBrokerAddress.Port),
},
}
Expand All @@ -69,6 +69,13 @@ func TestClient_TelemetryHandler(t *testing.T) {
_ = c.Run(ctx)
}()

WaitForConnection(c, 2*time.Second, 100*time.Millisecond)

assert.NoError(t, c.Subscribe(context.TODO(), "$share/test/test-topic",
func(ctx context.Context, ps PubSub, msg *Message) {
// do nothing
}))

h := c.InfoHandler()

assert.True(t, WaitForConnection(c, 2*time.Second, 100*time.Millisecond))
Expand Down

0 comments on commit 670f8a7

Please sign in to comment.