-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathworkflow.go
130 lines (105 loc) · 3.18 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package process
import (
"fmt"
"github.com/buchanae/cwl"
)
type scope struct {
prefix string
links map[string][]string
}
func (n scope) child(prefix string) scope {
return scope{prefix: n.prefix + "/" + prefix, links: n.links}
}
func (n scope) link(name string, val string) {
key := n.key(name)
n.links[key] = append(n.links[key], val)
}
func (n scope) key(name string) string {
return n.prefix + "/" + name
}
// DebugWorkflow is a temporary placeholder for workflow processing code.
func DebugWorkflow(wf *cwl.Workflow, vals cwl.Values) {
root := scope{prefix: "", links: map[string][]string{}}
inputs := root.child("inputs")
for k, _ := range vals {
root.link(k, inputs.key(k))
}
exports := linkWorkflow(wf, root)
outputs := root.child("outputs")
for _, out := range wf.Outputs {
outputs.link(out.ID, exports.key(out.ID))
}
walk(root.links, []string{"/outputs/count_output"})
}
func walk(links map[string][]string, keys []string) {
for _, key := range keys {
fmt.Println(key)
walk(links, links[key])
}
}
func linkWorkflow(wf *cwl.Workflow, parent scope) scope {
internal := parent.child("workflow")
for _, in := range wf.Inputs {
internal.link(in.ID, parent.key(in.ID))
}
for _, step := range wf.Steps {
stepScope := internal.child("step/" + step.ID)
for _, in := range step.In {
for _, src := range in.Source {
stepScope.link(in.ID, internal.key(src))
}
}
stepExports := linkDoc(step.Run, stepScope)
for _, out := range step.Out {
id := step.ID + "/" + out.ID
internal.link(id, stepExports.key(out.ID))
}
}
exports := internal.child("exports")
for _, out := range wf.Outputs {
for _, src := range out.OutputSource {
exports.link(out.ID, internal.key(src))
}
}
return exports
}
func linkTool(in []cwl.CommandInput, out []cwl.CommandOutput, parent scope) scope {
internal := parent.child("tool")
for _, in := range in {
internal.link(in.ID, parent.key(in.ID))
internal.link("toolexec", internal.key(in.ID))
}
exports := internal.child("exports")
for _, out := range out {
exports.link(out.ID, internal.key("toolexec"))
}
return exports
}
func linkDoc(doc cwl.Document, parent scope) scope {
switch z := doc.(type) {
case *cwl.Workflow:
return linkWorkflow(z, parent)
case *cwl.Tool:
return linkTool(z.Inputs, z.Outputs, parent)
case *cwl.ExpressionTool:
return linkTool(z.Inputs, z.Outputs, parent)
}
return scope{}
}
/*
TODO goals
- validate that links are correct, not missing any links, etc
- have (un)marshal-able workflow state
- validate value bindings, mid workflow
- resolve inputs to step in nested workflow, mid workflow
- encode links between steps directly, without intermediate layers
implementation thoughts:
- major element of name translation over many layers
- end result is link between two Process objects and/or
Step objects.
- possibly want global Start/End steps, or maybe only End;
End.Done() is true when the workflow is done. need to
also have link between workflow outputs and last steps
- want to query value of value by name at any layer?
e.g. query for workflow.step0.count_output mid workflow
*/