-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathinput.go
executable file
·197 lines (169 loc) · 5.58 KB
/
input.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package thingiverseio
import (
"github.com/ThingiverseIO/thingiverseio/config"
"github.com/ThingiverseIO/thingiverseio/core"
"github.com/ThingiverseIO/thingiverseio/descriptor"
"github.com/ThingiverseIO/thingiverseio/message"
"github.com/ThingiverseIO/uuid"
"github.com/joernweissenborn/eventual2go"
"github.com/joernweissenborn/eventual2go/typedevents"
)
// Input is a ThingiverseIO node which imports functionality from the ThingiverseIO network.
type Input struct {
core core.InputCore
}
// NewInput creates a new Input instance for a given service descriptor. Configuration is automatically determined by the thingiversio/config package.
func NewInput(desc string) (i *Input, err error) {
i, err = NewInputFromConfig(desc, config.Configure())
return
}
// NewInputFromConfig creates a new Input instance for a given configuration.
func NewInputFromConfig(desc string, cfg *config.UserConfig) (i *Input, err error) {
var d descriptor.Descriptor
if d, err = descriptor.Parse(desc); err != nil {
return
}
tracker, provider := core.DefaultBackends()
core, err := core.NewInputCore(d, cfg, tracker, provider...)
i = &Input{
core: core,
}
return
}
// Remove shuts down the Input.
func (i *Input) Remove() {
i.core.Shutdown()
}
// Run starts service discovery.
func (i *Input) Run() {
i.core.Run()
}
// UUID returns the UUID of an Input instance.
func (i *Input) UUID() uuid.UUID {
return i.core.UUID()
}
// Connected returns true if the Input instance is connected to at least 1 suitable Output, otherwise false.
func (i *Input) Connected() bool {
return i.core.Connected()
}
// ConnectedObservable returns a eventual2go/typedevents.BoolObservable which represents the connection state.
func (i *Input) ConnectedObservable() *typedevents.BoolObservable {
return i.core.ConnectedObservable()
}
// WaitUntilConnected waits until the input is connected.
func (i *Input) WaitUntilConnected() {
f := i.ConnectedObservable().Stream().First()
if i.Connected() {
return
}
f.WaitUntilComplete()
}
// Call executes a ThingiverseIO Call and returns a ResultFuture, which gets completed if a suitable output reponds.
func (i *Input) Call(function string, parameter interface{}) (result *ResultFuture, err error) {
data, err := encode(parameter)
if err != nil {
return
}
r, _, _, err := i.core.Request(function, message.CALL, data)
result = &ResultFuture{r.Future}
return
}
// CallAll executes a ThingiverseIO CallAll and returns the Requests UUID and stream on which results are delivered. The stream must be closed manually!
func (i *Input) CallAll(function string, parameter interface{}) (results *ResultStream, err error) {
data, err := encode(parameter)
if err != nil {
return
}
_, r, _, err := i.core.Request(function, message.CALLALL, data)
results = &ResultStream{r.Stream}
return
}
// Trigger executes a ThingiverseIO Trigger.
func (i *Input) Trigger(function string, parameter interface{}) (err error) {
data, err := encode(parameter)
if err != nil {
return
}
_, _, _, err = i.core.Request(function, message.TRIGGER, data)
return
}
// TriggerAll executes a ThingiverseIO TriggerAll.
func (i *Input) TriggerAll(function string, parameter interface{}) (err error) {
data, err := encode(parameter)
if err != nil {
return
}
_, _, _, err = i.core.Request(function, message.TRIGGERALL, data)
return
}
// StartListen starts listening to the given function.
func (i *Input) StartListen(function string) (err error) {
err = i.core.StartListen(function)
return
}
// StopListen stops listening to the given function.
func (i *Input) StopListen(function string) {
i.core.StopListen(function)
}
// StartConsume starts consuming the given stream.
func (i *Input) StartConsume(stream string) (err error) {
err = i.core.StartConsume(stream)
return
}
// StopConsume stops consuming the given stream.
func (i *Input) StopConsume(stream string) {
i.core.StopConsume(stream)
}
// GetStream gets stream of StreamEvent for the given stream.
func (i *Input) GetStream(stream string) (s *StreamEventStream, err error) {
ss, err := i.core.GetStream(stream)
if err != nil {
return
}
s = &StreamEventStream{ss.Transform(toStreamEvent(stream))}
return
}
// StartObservation starts observation of the given property.
func (i *Input) StartObservation(property string) (err error) {
err = i.core.StartObservation(property)
return
}
// GetProperty gets the current value of the property.
func (i *Input) GetProperty(property string) (p Property, err error) {
o, err := i.core.GetProperty(property)
if err != nil {
return
}
p = Property{
Name: property,
value: o.Value().([]byte),
}
return
}
// GetPropertyObservable gets an observable linked to the property.
func (i *Input) GetPropertyObservable(property string) (p *PropertyObservable, cancel *eventual2go.Completer, err error) {
o, err := i.core.GetProperty(property)
if err != nil {
return
}
do, cancel := o.Derive(toProperty(property))
p = &PropertyObservable{do}
return
}
// UpdateProperty updates the value of the property. Returns a Future which gets completed when the update has been arrived.
func (i *Input) UpdateProperty(property string) (p *PropertyFuture, err error) {
v, err := i.core.UpdateProperty(property)
if err != nil {
return
}
p = &PropertyFuture{v.Then(propertyFromFuture(property))}
return
}
// StopObservation stops observation of the given property.
func (i *Input) StopObservation(property string) {
i.core.StopObservation(property)
}
// ListenResults returns a ResultStream to receive results of Trigger or TriggerAll function calls.
func (i *Input) ListenResults() *ResultStream {
return &ResultStream{i.core.ListenStream().Stream}
}