-
Notifications
You must be signed in to change notification settings - Fork 8
/
emitter_stack.go
178 lines (152 loc) · 4.58 KB
/
emitter_stack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package cff
import (
"context"
"time"
)
type emitterStack []Emitter
// EmitterStack combines multiple emitters together into one.
//
// Events are sent to the emitters in an unspecified order.
// Emitters should not assume the ordering of events.
func EmitterStack(emitters ...Emitter) Emitter {
switch len(emitters) {
case 0:
return NopEmitter()
case 1:
return emitters[0]
default:
var stack emitterStack
for _, e := range emitters {
if s, ok := e.(emitterStack); ok {
// Flatten nested stacks.
stack = append(stack, s...)
} else {
stack = append(stack, e)
}
}
return stack
}
}
type taskEmitterStack []TaskEmitter
// TaskInit returns a TaskEmitter which could be memoized based on task name.
func (es emitterStack) TaskInit(taskInfo *TaskInfo, dInfo *DirectiveInfo) TaskEmitter {
emitters := make(taskEmitterStack, 0, len(es))
for _, e := range es {
emitters = append(emitters, e.TaskInit(taskInfo, dInfo))
}
return emitters
}
// TaskSuccess is called when a task runs successfully.
func (ts taskEmitterStack) TaskSuccess(ctx context.Context) {
for _, e := range ts {
e.TaskSuccess(ctx)
}
}
// TaskError is called when a task fails due to a task error.
func (ts taskEmitterStack) TaskError(ctx context.Context, err error) {
for _, e := range ts {
e.TaskError(ctx, err)
}
}
// TaskError is called when a task fails due to a task error.
func (ts taskEmitterStack) TaskErrorRecovered(ctx context.Context, err error) {
for _, e := range ts {
e.TaskErrorRecovered(ctx, err)
}
}
// TaskSkipped is called when a task is skipped due to predicate or an
// earlier task error.
func (ts taskEmitterStack) TaskSkipped(ctx context.Context, err error) {
for _, e := range ts {
e.TaskSkipped(ctx, err)
}
}
// TaskPanic is called when a task panics.
func (ts taskEmitterStack) TaskPanic(ctx context.Context, pv interface{}) {
for _, e := range ts {
e.TaskPanic(ctx, pv)
}
}
// TaskRecovered is called when a task errors but it was recovered by a
// RecoverWith annotation.
func (ts taskEmitterStack) TaskPanicRecovered(ctx context.Context, pv interface{}) {
for _, e := range ts {
e.TaskPanicRecovered(ctx, pv)
}
}
// TaskDone is called when a task finishes.
func (ts taskEmitterStack) TaskDone(ctx context.Context, d time.Duration) {
for _, e := range ts {
e.TaskDone(ctx, d)
}
}
type flowEmitterStack []FlowEmitter
// FlowInit returns a FlowEmitter which could be memoized based on flow name.
func (es emitterStack) FlowInit(info *FlowInfo) FlowEmitter {
emitters := make(flowEmitterStack, 0, len(es))
for _, e := range es {
emitters = append(emitters, e.FlowInit(info))
}
return emitters
}
// FlowSuccess is called when a flow runs successfully.
func (fs flowEmitterStack) FlowSuccess(ctx context.Context) {
for _, e := range fs {
e.FlowSuccess(ctx)
}
}
// FlowError is called when a flow fails due to a task error.
func (fs flowEmitterStack) FlowError(ctx context.Context, err error) {
for _, e := range fs {
e.FlowError(ctx, err)
}
}
// FlowDone is called when a flow finishes.
func (fs flowEmitterStack) FlowDone(ctx context.Context, d time.Duration) {
for _, e := range fs {
e.FlowDone(ctx, d)
}
}
type parallelEmitterStack []ParallelEmitter
// ParallelInit returns a ParallelEmitter which could be memoized based on parallel name.
func (es emitterStack) ParallelInit(info *ParallelInfo) ParallelEmitter {
emitters := make(parallelEmitterStack, 0, len(es))
for _, e := range es {
emitters = append(emitters, e.ParallelInit(info))
}
return emitters
}
// ParallelSuccess is called when a parallel runs successfully.
func (ps parallelEmitterStack) ParallelSuccess(ctx context.Context) {
for _, e := range ps {
e.ParallelSuccess(ctx)
}
}
// ParallelError is called when a parallel fails due to a task error.
func (ps parallelEmitterStack) ParallelError(ctx context.Context, err error) {
for _, e := range ps {
e.ParallelError(ctx, err)
}
}
// ParallelDone is called when a parallel finishes.
func (ps parallelEmitterStack) ParallelDone(ctx context.Context, d time.Duration) {
for _, e := range ps {
e.ParallelDone(ctx, d)
}
}
// SchedulerInit builds a SchedulerEmitter backed by the SchedulerEmitters of
// the underlying Emitters.
func (es emitterStack) SchedulerInit(info *SchedulerInfo) SchedulerEmitter {
emitters := make(schedulerEmitterStack, len(es))
for i, e := range es {
emitters[i] = e.SchedulerInit(info)
}
return emitters
}
type schedulerEmitterStack []SchedulerEmitter
// EmitScheduler emits the state of the cff scheduler.
func (ses schedulerEmitterStack) EmitScheduler(s SchedulerState) {
for _, e := range ses {
e.EmitScheduler(s)
}
}