-
Notifications
You must be signed in to change notification settings - Fork 0
/
device_server.go
184 lines (144 loc) · 4.32 KB
/
device_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package msg2api
import (
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"github.com/gorilla/websocket"
"net/http"
"time"
)
// DeviceServer contains the websocket connection to the device and
// stores handler functions to handle device messages.
type DeviceServer struct {
*apiBase
// Update handles new measurement values coming from the device.
// 'values' maps a sensor ID to a measurement.
Update func(values map[string][]Measurement) *Error
// AddSensor is called when the device wants to register a new sensor.
// This should only be called once for each sensor and then be stored in the backend.
AddSensor func(name, unit string, port int32, factor float64) *Error
// RemoveSensor is called when the device wants to deregister a sensor.
RemoveSensor func(name string) *Error
// UpdateMetadata handles metadata updates for sensors and the device itself.
UpdateMetadata func(metadata *DeviceMetadata) *Error
}
var errAuthenticationFailed = errors.New("authentication failed")
func (d *DeviceServer) authenticate(key []byte) error {
var buf [sha256.Size]byte
if _, err := rand.Read(buf[:]); err != nil {
return err
}
challenge := hex.EncodeToString(buf[:])
d.socket.Write(challenge)
msgRaw, err := d.socket.Receive()
switch {
case err != nil:
return err
}
msg, err := hex.DecodeString(string(msgRaw))
if err != nil {
return err
}
mac := hmac.New(sha256.New, key)
mac.Write(buf[:])
expected := mac.Sum(nil)
if !hmac.Equal(msg, expected) {
return errAuthenticationFailed
}
return d.socket.Write("proceed")
}
// Run tries to authenticate the DeviceServer to the Device over the websocket and
// starts listening for commands from the Device on success.
func (d *DeviceServer) Run(key []byte) error {
var err error
if err = d.authenticate(key); err != nil {
goto fail
}
for {
var msg MessageIn
if err = d.socket.ReceiveJSON(&msg); err != nil {
goto fail
}
var opError *Error
switch msg.Command {
case "update":
opError = d.doUpdate(&msg)
case "addSensor":
opError = d.doAddSensor(&msg)
case "removeSensor":
opError = d.doRemoveSensor(&msg)
case "updateMetadata":
opError = d.doUpdateMetadata(&msg)
default:
opError = badCommand(msg.Command)
}
if opError != nil {
d.socket.WriteJSON(MessageOut{Error: opError})
} else {
now := time.Now().UnixNano() / 1e6
d.socket.WriteJSON(MessageOut{Now: &now})
}
}
fail:
d.socket.Close(websocket.CloseProtocolError, err.Error())
return err
}
// RequestRealtimeUpdates forwards a request for realtime updates on the given sensor IDs to the device.
func (d *DeviceServer) RequestRealtimeUpdates(sensors []string) {
d.socket.WriteJSON(MessageOut{Command: "requestRealtimeUpdates", Args: sensors})
}
func (d *DeviceServer) doUpdate(msg *MessageIn) *Error {
var args DeviceCmdUpdateArgs
if err := json.Unmarshal(msg.Args, &args); err != nil {
return invalidInput(err.Error(), "")
}
if d.Update == nil {
return operationFailed("not supported")
}
return d.Update(args.Values)
}
func (d *DeviceServer) doAddSensor(msg *MessageIn) *Error {
var args DeviceCmdAddSensorArgs
if err := json.Unmarshal(msg.Args, &args); err != nil {
return invalidInput(err.Error(), "")
}
if d.AddSensor == nil {
return operationFailed("not supported")
}
return d.AddSensor(args.Name, args.Unit, args.Port, args.Factor)
}
func (d *DeviceServer) doRemoveSensor(msg *MessageIn) *Error {
var args DeviceCmdRemoveSensorArgs
if err := json.Unmarshal(msg.Args, &args); err != nil {
return invalidInput(err.Error(), "")
}
if d.RemoveSensor == nil {
return operationFailed("not supported")
}
return d.RemoveSensor(args.Name)
}
func (d *DeviceServer) doUpdateMetadata(msg *MessageIn) *Error {
var args DeviceMetadata
if err := json.Unmarshal(msg.Args, &args); err != nil {
return invalidInput(err.Error(), "")
}
if d.UpdateMetadata == nil {
return operationFailed("not supported")
}
md := DeviceMetadata(args)
return d.UpdateMetadata(&md)
}
// NewDeviceServer returns a new DeviceServer running on a websocket on the given http connection.
func NewDeviceServer(w http.ResponseWriter, r *http.Request) (*DeviceServer, error) {
base, err := initAPIBaseFromHTTP(w, r, []string{deviceAPIProtocolV1})
if err != nil {
return nil, err
}
result := &DeviceServer{
apiBase: base,
}
return result, nil
}