-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathoutput.go
executable file
·128 lines (109 loc) · 3.57 KB
/
output.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package thingiverseio
import (
"github.com/ThingiverseIO/thingiverseio/config"
"github.com/ThingiverseIO/thingiverseio/core"
"github.com/ThingiverseIO/thingiverseio/descriptor"
"github.com/ThingiverseIO/uuid"
"github.com/joernweissenborn/eventual2go/typedevents"
)
// Output is a ThingiverseIO node which exports functionality to the ThingiverseIO network.
type Output struct {
core core.OutputCore
}
// NewOutput creates a new Output instance for a given service descriptor. Configuration is automatically determined by the thingiversio/config package.
func NewOutput(desc string) (o *Output, err error) {
o, err = NewOutputFromConfig(desc, config.Configure())
return
}
// NewOutputFromConfig creates a new Output instance for a given configuration.
func NewOutputFromConfig(desc string, cfg *config.UserConfig) (o *Output, err error) {
var d descriptor.Descriptor
if d, err = descriptor.Parse(desc); err != nil {
return
}
tracker, provider := core.DefaultBackends()
core, err := core.NewOutputCore(d, cfg, tracker, provider...)
o = &Output{
core: core,
}
return
}
// UUID returns the UUID of a Output instance.
func (o *Output) UUID() uuid.UUID {
return o.core.UUID()
}
// Remove shuts down the Output.
func (o *Output) Remove() {
o.core.Shutdown()
}
// Run starts the Output creating all connections and starting service discovery.
func (o *Output) Run() {
o.core.Run()
}
// Connected returns true if at least 1 Input is connected.
func (o *Output) Connected() bool {
return o.core.Connected()
}
// ConnectedObservable returns a eventual2go/typedevents.BoolObservable which represents the connection state.
func (o *Output) ConnectedObservable() *typedevents.BoolObservable {
return o.core.ConnectedObservable()
}
// WaitUntilConnected waits until the output is connected.
func (o *Output) WaitUntilConnected() {
f := o.ConnectedObservable().Stream().First()
if o.Connected() {
return
}
f.WaitUntilComplete()
}
// Reply reponds the given output parameter to all interested Inputs of a given request.
func (o *Output) Reply(request *Request, parameter interface{}) (err error) {
data, err := encode(parameter)
if err != nil {
return
}
o.core.Reply(request, data)
return
}
// Emit acts like a ThingiverseIO Trigger, which is initiated by the Output.
func (o *Output) Emit(function string, inparams interface{}, outparams interface{}) (err error) {
inp, err := encode(inparams)
if err != nil {
return
}
outp, err := encode(outparams)
if err != nil {
return
}
err = o.core.Emit(function, inp, outp)
return
}
// Requests returns a RequestStream, which delivers incoming requests. Although multiple listeners can be registered, multiple replies to one request can lead to undefined behaviour.
func (o *Output) Requests() *RequestStream {
return &RequestStream{o.core.RequestStream().Stream}
}
// RequestsWhereFunction returns a RequestStream only for the given function.
func (o *Output) RequestsWhereFunction(function string) *RequestStream {
return (&RequestStream{o.core.RequestStream().Stream}).Where(filterRequests(function))
}
func filterRequests(function string) RequestFilter {
return func(r *Request) bool {return r.Function==function}
}
// SetProperty sets the value of a property.
func (o *Output) SetProperty(property string, value interface{}) (err error) {
v, err := encode(value)
if err != nil {
return
}
err = o.core.SetProperty(property, v)
return
}
// AddStream adds a value on a stream.
func (o *Output) AddStream(stream string, value interface{}) (err error) {
v, err := encode(value)
if err != nil {
return
}
err = o.core.AddStream(stream, v)
return
}