-
Notifications
You must be signed in to change notification settings - Fork 127
/
graph_iip.go
109 lines (87 loc) · 2.64 KB
/
graph_iip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package goflow
import (
"fmt"
"reflect"
)
// iip is the Initial Information Packet.
// IIPs are delivered to process input ports on the network start.
type iip struct {
data interface{}
addr address
}
// AddIIP adds an Initial Information packet to the network.
func (n *Graph) AddIIP(processName, portName string, data interface{}) error {
addr := parseAddress(processName, portName)
if _, exists := n.procs[processName]; exists {
n.iips = append(n.iips, iip{data: data, addr: addr})
return nil
}
return fmt.Errorf("AddIIP: could not find '%s'", addr)
}
// RemoveIIP detaches an IIP from specific process and port.
func (n *Graph) RemoveIIP(processName, portName string) error {
addr := parseAddress(processName, portName)
for i := range n.iips {
if n.iips[i].addr == addr {
// Remove item from the slice
n.iips[len(n.iips)-1], n.iips[i], n.iips = iip{}, n.iips[len(n.iips)-1], n.iips[:len(n.iips)-1]
return nil
}
}
return fmt.Errorf("RemoveIIP: could not find IIP for '%s'", addr)
}
// sendIIPs sends Initial Information Packets upon network start.
func (n *Graph) sendIIPs() error {
// Send initial IPs
for i := range n.iips {
ip := n.iips[i]
// Get the receiver port channel
channel, found := n.channelByInPortAddr(ip.addr)
if !found {
channel, found = n.channelByConnectionAddr(ip.addr)
}
if !found {
// Try to find a proc and attach a new channel to it
recvPort, err := n.getProcPort(ip.addr.proc, ip.addr.port, reflect.RecvDir)
if err != nil {
return err
}
channel, err = attachPort(recvPort, ip.addr, reflect.RecvDir, reflect.ValueOf(nil), n.conf.BufferSize)
if err != nil {
return err
}
found = true
}
if !found {
return fmt.Errorf("IIP target not found: '%s'", ip.addr)
}
// Increase reference count for the channel
n.incChanListenersCount(channel)
// Send data to the port
go func(channel, data reflect.Value) {
channel.Send(data)
if n.decChanListenersCount(channel) {
channel.Close()
}
}(channel, reflect.ValueOf(ip.data))
}
return nil
}
// channelByInPortAddr returns a channel by address from the network inports.
func (n *Graph) channelByInPortAddr(addr address) (channel reflect.Value, found bool) {
for i := range n.inPorts {
if n.inPorts[i].addr == addr {
return n.inPorts[i].channel, true
}
}
return reflect.Value{}, false
}
// channelByConnectionAddr returns a channel by address from connections.
func (n *Graph) channelByConnectionAddr(addr address) (channel reflect.Value, found bool) {
for i := range n.connections {
if n.connections[i].tgt == addr {
return n.connections[i].channel, true
}
}
return reflect.Value{}, false
}