-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
command_wait.go
226 lines (186 loc) · 4.67 KB
/
command_wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Copyright 2021-22 Kirill Scherba <[email protected]>. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Teonet wait command module.
package teonet
import (
"encoding/binary"
"time"
"github.com/teonet-go/tru"
)
type CheckDataFunc func(data []byte) (ok bool)
type WaitData []byte
// WaitFrom wait answer from address. Attr is additional attributes by type:
//
// byte or int: wait command number in answer
// uint32: wait packet id in answer
// func([]byte)bool: check packet data with callback, data without command and id
// time.Duration: wait timeout (default 5 sec)
//
// answer packet data structure: [cmd][id][data] it depend of service api
func (teo *Teonet) WaitFrom(from string, attr ...interface{}) (data []byte, err error) {
attr = append(attr, true)
wr := teo.MakeWaitReader(attr...)
scr, err := teo.Subscribe(from, wr.Reader())
if err != nil {
return
}
defer teo.Unsubscribe(scr)
data, err = teo.WaitReaderAnswer(wr.Wait(), wr.Timeout())
return
}
// WaitReaderAnswer wait data from reader, return received data or error on timeout
func (teo *Teonet) WaitReaderAnswer(wait chan WaitData, timeout time.Duration) (data []byte, err error) {
select {
case data = <-wait:
case <-time.After(timeout):
err = ErrTimeout
}
return
}
// WaitReader contain create reader, wait channel and timeout
type WaitReader struct {
wait chan WaitData
reader func(c *Channel, p *Packet, e *Event) (processed bool)
timeout time.Duration
}
// MakeWaitReader create reader, wait channel and timeout from attr:
//
// byte or int: wait command number in answer
// uint32: wait packet id in answer
// func([]byte)bool: check packet data with callback, data without command and id
// time.Duration: wait timeout (default 5 sec)
// bool: created wait channel and send data to channel if true
//
// answer packet data structure: [cmd][id][data] it depend of service api
func (teo *Teonet) MakeWaitReader(attr ...interface{}) (wr *WaitReader) {
wr = new(WaitReader)
// Parse attr
const (
validCmd byte = 1 << iota
validID
validF
)
var param struct {
cmd byte
id uint32
f CheckDataFunc
check byte
wait bool
}
wr.timeout = tru.ClientConnectTimeout
for _, a := range attr {
switch v := a.(type) {
case byte:
param.cmd = v
param.check |= validCmd
case int:
param.cmd = byte(v)
param.check |= validCmd
case uint32:
param.id = v
param.check |= validID
case func([]byte) bool:
param.f = v
param.check |= validF
case bool:
param.wait = v
case time.Duration:
wr.timeout = v
default:
log.Error.Panicf("wrong reader attribute with type %T\n", v)
}
}
wr.reader = func(c *Channel, p *Packet, e *Event) (processed bool) {
// Skip not Data Events
if e.Event != EventData {
return
}
var idx = 0
// Check Command
if param.check&validCmd > 0 {
cmd := p.Data()[idx]
if cmd != param.cmd {
return
}
idx += 1
}
// Check ID
if param.check&validID > 0 {
if len(p.Data()[idx:]) < 4 {
return
}
id := binary.LittleEndian.Uint32(p.Data()[idx:])
if id != param.id {
return
}
idx += 4
}
// Check data func
if param.check&validF > 0 {
if !param.f(p.Data()[idx:]) {
return
}
}
// Send answer to wait channel
if param.wait {
select {
case wr.wait <- p.Data()[idx:]:
// Valid packet
processed = true
default:
msg := "!!! can't send message to wait channel, skip it"
teo.Log().Debug.Println(msg)
}
}
return
}
if param.wait {
wr.wait = make(chan WaitData)
}
return
}
// Wait data from wait channel
func (wr WaitReader) Wait() chan WaitData {
return wr.wait
}
// Reader call wait reader
func (wr WaitReader) Reader() func(c *Channel, p *Packet, e *Event) (processed bool) {
return wr.reader
}
// Timeout get timeout
func (wr WaitReader) Timeout() time.Duration {
return wr.timeout
}
// MakeWaitAttr make wait attribute
func (teo *Teonet) MakeWaitAttr() *WaitAttr {
return new(WaitAttr)
}
// WaitAttr wait attribute
type WaitAttr struct {
attr []interface{}
}
// Cmd append command to wait attribute
func (w *WaitAttr) Cmd(cmd byte) *WaitAttr {
w.attr = append(w.attr, cmd)
return w
}
// ID append id to wait attribute
func (w *WaitAttr) ID(id uint32) *WaitAttr {
w.attr = append(w.attr, id)
return w
}
// Func append func to wait attribute
func (w *WaitAttr) Func(f func([]byte) bool) *WaitAttr {
w.attr = append(w.attr, f)
return w
}
// Timeout append timeout to wait attribute
func (w *WaitAttr) Timeout(t time.Duration) *WaitAttr {
w.attr = append(w.attr, t)
return w
}
// GetAttr return wait attribute
func (w *WaitAttr) GetAttr() []interface{} {
return w.attr
}