-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.go
150 lines (139 loc) · 4.33 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// mqtt.go - MQTT Functionality
//
// ॐ भूर्भुवः स्वः
// तत्स॑वि॒तुर्वरे॑ण्यं॒
// भर्गो॑ दे॒वस्य॑ धीमहि।
// धियो॒ यो नः॑ प्रचो॒दया॑त्॥
//
//
// बोसजी के द्वारा रचित गो-मिल तन्त्राक्ष्
// ============================
//
// यह गो-क्रमादेश आधारित एम.क्यू.टी.टी अधिलेख में प्रचालेखन का तन्त्राक्ष् है।
//
// एक रचनात्मक भारतीय उत्पाद।
//
// go-mli - Boseji's Golang MQTT Logging command line
//
// Easy to use Golang based MQTT Command line logger.
//
// Sources
// -------
// https://github.com/boseji/go-mli
//
// License
// -------
//
// SPDX: GPL-3.0-or-later
//
// go-mli - Boseji's Golang MQTT Logging command line
// Copyright (C) 2024 by Abhijit Bose (aka. Boseji)
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by the
// Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <https://www.gnu.org/licenses/>.
//
// MQTT Functionality
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"os"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// setupMQTT setups up the options for the MQTT connection from the
// supplied configuration and returns the same.
func setupMQTT(m cfg,
cancel context.CancelFunc,
rec recorderFn) *mqtt.ClientOptions {
// From (https://www.emqx.com/en/blog/how-to-use-mqtt-in-golang)
opts := mqtt.NewClientOptions()
opts.AddBroker(m.ADDR)
opts.SetClientID(m.ClientID)
// If Username is available
if len(m.Username) > 0 {
opts.SetUsername(m.Username)
opts.SetPassword(m.Password)
}
// If CA files are available
if len(m.CAFile) > 0 {
certPool := x509.NewCertPool()
ca, err := os.ReadFile(m.CAFile)
if err != nil {
log.Fatalln(err.Error())
}
certPool.AppendCertsFromPEM(ca)
opts.SetTLSConfig(&tls.Config{
RootCAs: certPool,
})
}
// Set Callbacks
opts.SetDefaultPublishHandler(
func(client mqtt.Client, msg mqtt.Message) {
log.Printf("[MQTT] Received message: %q from topic: %q\n",
msg.Payload(), msg.Topic())
// Send for Record
rec(msg.Topic(), string(msg.Payload()))
})
opts.SetOnConnectHandler(
func(client mqtt.Client) {
log.Println("[MQTT] Connected to Broker")
})
opts.SetConnectionLostHandler(
func(client mqtt.Client, err error) {
log.Printf("[MQTT] Connect lost: %v", err)
cancel()
})
// Clean Sessions for each run
opts.SetCleanSession(true)
return opts
}
// connectMQTT creates the MQTT Client using the supplied MQTT options
// and returns the same upon connection.
func connectMQTT(opts *mqtt.ClientOptions) (mqtt.Client, error) {
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Wait() && token.Error() != nil {
return nil, fmt.Errorf("failed to connect to mqtt:\n %v", token.Error())
}
return client, nil
}
// disconnectMQTT helps to disconnect the client within a given time period
// supplied in number of milliseconds.
func disconnectMQTT(client mqtt.Client, ms uint) error {
if client == nil {
return fmt.Errorf("no client")
}
if client.IsConnected() {
client.Disconnect(ms)
log.Println("[MQTT] Disconnected.")
return nil
}
return fmt.Errorf("mqtt not connected")
}
// subscribeMQTT helps to create subscription to the supplied topics
// for the client.
func subscribeMQTT(client mqtt.Client, topic string) error {
if client == nil {
return fmt.Errorf("no client")
}
token := client.Subscribe(topic, 1, nil)
if token.Wait() && token.Error() != nil {
return fmt.Errorf("failed to subscribe to %q:\n %v",
topic, token.Error())
}
log.Printf("[MQTT] Subscribed to topic: %q\n", topic)
return nil
}