-
Notifications
You must be signed in to change notification settings - Fork 127
/
Copy pathgraph_ports.go
158 lines (137 loc) · 4.28 KB
/
graph_ports.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package goflow
import (
"fmt"
"reflect"
)
// port within the network.
type port struct {
addr address // Address of the port in the graph
channel reflect.Value // Actual channel attached
info PortInfo // Runtime info
}
// MapInPort adds an inport to the net and maps it to a contained proc's port.
func (n *Graph) MapInPort(name, procName, procPort string) {
addr := parseAddress(procName, procPort)
n.inPorts[name] = port{addr: addr}
}
// // AnnotateInPort sets optional run-time annotation for the port utilized by
// // runtimes and FBP protocol clients.
// func (n *Graph) AnnotateInPort(name string, info PortInfo) bool {
// port, exists := n.inPorts[name]
// if !exists {
// return false
// }
// port.info = info
// return true
// }
// // UnmapInPort removes an existing inport mapping
// func (n *Graph) UnmapInPort(name string) bool {
// if _, exists := n.inPorts[name]; !exists {
// return false
// }
// delete(n.inPorts, name)
// return true
// }
// MapOutPort adds an outport to the net and maps it to a contained proc's port.
func (n *Graph) MapOutPort(name, procName, procPort string) {
addr := parseAddress(procName, procPort)
n.outPorts[name] = port{addr: addr}
}
// // AnnotateOutPort sets optional run-time annotation for the port utilized by
// // runtimes and FBP protocol clients.
// func (n *Graph) AnnotateOutPort(name string, info PortInfo) bool {
// port, exists := n.outPorts[name]
// if !exists {
// return false
// }
// port.info = info
// return true
// }
// // UnmapOutPort removes an existing outport mapping
// func (n *Graph) UnmapOutPort(name string) bool {
// if _, exists := n.outPorts[name]; !exists {
// return false
// }
// delete(n.outPorts, name)
// return true
// }
// SetInPort assigns a channel to a network's inport to talk to the outer world.
func (n *Graph) SetInPort(name string, channel interface{}) error {
return n.setGraphPort(name, channel, reflect.RecvDir)
}
// SetOutPort assigns a channel to a network's outport to talk to the outer world.
// It returns true on success or false if the outport cannot be set.
func (n *Graph) SetOutPort(name string, channel interface{}) error {
return n.setGraphPort(name, channel, reflect.SendDir)
}
func (n *Graph) setGraphPort(name string, channel interface{}, dir reflect.ChanDir) error {
var (
ports map[string]port
dirDescr string
)
if dir == reflect.SendDir {
ports = n.outPorts
dirDescr = "out"
} else {
ports = n.inPorts
dirDescr = "in"
}
p, ok := ports[name]
if !ok {
return fmt.Errorf("setGraphPort: %s port '%s' not defined", dirDescr, name)
}
// Try to attach it
port, err := n.getProcPort(p.addr.proc, p.addr.port, dir)
if err != nil {
return fmt.Errorf("setGraphPort: cannot set %s port '%s': %w", dirDescr, name, err)
}
if _, err = attachPort(port, p.addr, dir, reflect.ValueOf(channel), n.conf.BufferSize); err != nil {
return fmt.Errorf("setGraphPort: cannot attach %s port '%s': %w", dirDescr, name, err)
}
// Save it in inPorts to be used with IIPs if needed
p.channel = reflect.ValueOf(channel)
ports[name] = p
return nil
}
// // RenameInPort changes graph's inport name
// func (n *Graph) RenameInPort(oldName, newName string) bool {
// if _, exists := n.inPorts[oldName]; !exists {
// return false
// }
// n.inPorts[newName] = n.inPorts[oldName]
// delete(n.inPorts, oldName)
// return true
// }
// // UnsetInPort removes an external inport from the graph
// func (n *Graph) UnsetInPort(name string) bool {
// port, exists := n.inPorts[name]
// if !exists {
// return false
// }
// if proc, ok := n.procs[port.proc]; ok {
// unsetProcPort(proc, port.port, false)
// }
// delete(n.inPorts, name)
// return true
// }
// // RenameOutPort changes graph's outport name
// func (n *Graph) RenameOutPort(oldName, newName string) bool {
// if _, exists := n.outPorts[oldName]; !exists {
// return false
// }
// n.outPorts[newName] = n.outPorts[oldName]
// delete(n.outPorts, oldName)
// return true
// }
// // UnsetOutPort removes an external outport from the graph
// func (n *Graph) UnsetOutPort(name string) bool {
// port, exists := n.outPorts[name]
// if !exists {
// return false
// }
// if proc, ok := n.procs[port.proc]; ok {
// unsetProcPort(proc, port.proc, true)
// }
// delete(n.outPorts, name)
// return true
// }