-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
130 lines (111 loc) · 4.31 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"crypto/subtle"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
"github.com/spf13/viper"
"net/http"
"time"
)
//todo response time unix timestamp
type ResponseMessage struct {
Message string `json:"message"`
ResponseTime string `json:"response_time"`
}
type Configurations struct {
ServerConfig HttpServerConfigurations
MqttConfig MqttConfigurations
}
type HttpServerConfigurations struct {
Port string
UserName string
Password string
MessageTimeout int
}
type MqttConfigurations struct {
BrokerAddress string
ClientID string
UserName string
Password string
}
var mqttClient mqtt.Client
var mqttMessageCallback = make(chan string)
var http2mqtt *echo.Echo
var configuration Configurations
func main() {
viper.SetConfigName("config")
viper.AddConfigPath(".")
viper.AutomaticEnv()
viper.SetConfigType("yml")
http2mqtt = echo.New()
http2mqtt.Use(middleware.Logger())
http2mqtt.Use(middleware.Recover())
//todo log library change
http2mqtt.Logger.SetLevel(1)
if err := viper.ReadInConfig(); err != nil {
http2mqtt.Logger.Fatal("Error reading config file " + err.Error())
}
err := viper.Unmarshal(&configuration)
if err != nil {
http2mqtt.Logger.Fatal("Unable to decode into struct, %v", err)
}
http2mqtt.Logger.Debug("http port " + configuration.ServerConfig.Port)
http2mqtt.Logger.Debug("basic auth username ", configuration.ServerConfig.UserName)
http2mqtt.Logger.Debug("basic auth password ", configuration.ServerConfig.Password)
http2mqtt.Logger.Debug("mqtt broker address ", configuration.MqttConfig.BrokerAddress)
http2mqtt.Logger.Debug("mqtt clientID ", configuration.MqttConfig.ClientID)
http2mqtt.Logger.Debug("mqtt username ", configuration.MqttConfig.UserName)
http2mqtt.Logger.Debug("mqtt password ", configuration.MqttConfig.Password)
mqttOptions := mqtt.NewClientOptions().AddBroker(configuration.MqttConfig.BrokerAddress)
mqttOptions.SetClientID(configuration.MqttConfig.ClientID)
mqttOptions.SetUsername(configuration.MqttConfig.UserName)
mqttOptions.SetPassword(configuration.MqttConfig.Password)
mqttOptions.SetDefaultPublishHandler(mqttMessageHandler)
mqttClient = mqtt.NewClient(mqttOptions)
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
http2mqtt.Logger.Fatal(token.Error())
}
http2mqtt.Use(middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) {
if subtle.ConstantTimeCompare([]byte(username), []byte(configuration.ServerConfig.UserName)) == 1 &&
subtle.ConstantTimeCompare([]byte(password), []byte(configuration.ServerConfig.Password)) == 1 {
return true, nil
}
return false, nil
}))
http2mqtt.GET("/*", updateMqMessage)
http2mqtt.Logger.Fatal(http2mqtt.Start(configuration.ServerConfig.Port))
}
// e.GET("/topic/:id", updateMqMessage)
func updateMqMessage(c echo.Context) error {
responseTime := time.Now()
http2mqtt.Logger.Debug("/get" + c.Request().URL.EscapedPath())
http2mqtt.Logger.Debug("/post" + c.Request().URL.EscapedPath())
if token := mqttClient.Subscribe("/get"+c.Request().URL.EscapedPath(), 2, nil); token.Wait() && token.Error() != nil {
http2mqtt.Logger.Fatal(token.Error())
}
message := c.QueryParam("message")
http2mqtt.Logger.Info(message)
mqttClient.Publish("/post"+c.Request().URL.EscapedPath(), 2, false, message)
select {
case msg := <-mqttMessageCallback:
mqttClient.Unsubscribe("/get" + c.Request().URL.EscapedPath())
return c.JSON(http.StatusOK, ResponseMessage{Message: msg, ResponseTime: time.Since(responseTime).String()})
case <-time.After(time.Duration(configuration.ServerConfig.MessageTimeout) * time.Millisecond):
http2mqtt.Logger.Info("timeout seconds")
mqttClient.Unsubscribe("/get" + c.Request().URL.EscapedPath())
return c.JSON(http.StatusGatewayTimeout, ResponseMessage{Message: "timeout", ResponseTime: (time.Duration(configuration.ServerConfig.MessageTimeout) * time.Millisecond).String()})
}
}
var mqttMessageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
topic := msg.Topic()
payload := msg.Payload()
http2mqtt.Logger.Info(topic)
http2mqtt.Logger.Info(payload)
if len(mqttMessageCallback) == 0 {
mqttMessageCallback <- string(payload)
} else {
//todo test required
http2mqtt.Logger.Info("channel full capacity")
}
}