diff --git a/client_telemetry.go b/client_telemetry.go index 6463b44..3d1027f 100644 --- a/client_telemetry.go +++ b/client_telemetry.go @@ -8,6 +8,7 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gojekfarm/xtools/generic/slice" + "github.com/gojekfarm/xtools/generic/xmap" ) // MQTTClientInfo contains information about the internal MQTT client @@ -76,7 +77,7 @@ func (c *Client) readSubscriptionMeta() map[string]QOSLevel { func (c *Client) multiClientInfo() []MQTTClientInfo { c.clientMu.RLock() - cls := c.mqttClients + cls := xmap.Values(c.mqttClients) if len(cls) == 0 { c.clientMu.RUnlock() @@ -86,23 +87,18 @@ func (c *Client) multiClientInfo() []MQTTClientInfo { bCh := make(chan MQTTClientInfo, len(cls)) - _ = c.execute(func(cc mqtt.Client) error { - for _, is := range cls { - if is.client == cc { - is.mu.Lock() - subs := is.subsCalled.Values() - is.mu.Unlock() - - bCh <- transformClientInfo(cc, subs...) - - return nil - } - } + _ = c.execute( + func(cc mqtt.Client) error { return nil }, + execOptWithState(func(f func(mqtt.Client) error, is *internalState) error { + is.mu.Lock() + subs := is.subsCalled.Values() + is.mu.Unlock() - bCh <- transformClientInfo(cc) + bCh <- transformClientInfo(is.client, subs...) - return nil - }, execAll) + return f(is.client) + }), + ) c.clientMu.RUnlock()