-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlifecycle.go
194 lines (162 loc) · 4.76 KB
/
lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package zkafka
import (
"context"
"errors"
"time"
)
type LifecyclePostReadMeta struct {
Topic string
GroupID string
// Message that was read (will be non nil)
Message *Message
}
type LifecyclePostReadImmediateMeta struct {
// Message that was read (could be nil)
Message *Message
Err error
}
type LifecyclePreProcessingMeta struct {
Topic string
GroupID string
VirtualPartitionIndex int
// Time since the message was sent to the topic
TopicLag time.Duration
// Message containing being processed
Message *Message
}
type LifecyclePostProcessingMeta struct {
Topic string
GroupID string
VirtualPartitionIndex int
// Time taken to process the message
ProcessingTime time.Duration
// Message processed
Msg *Message
// Response code returned by the processor
ResponseErr error
}
type LifecyclePostAckMeta struct {
Topic string
// Time when the message was published to the queue
ProduceTime time.Time
}
type LifecyclePreWriteMeta struct{}
type LifecyclePreWriteResp struct {
Headers map[string][]byte
}
type LifecycleHooks struct {
// Called by work after reading a message (guaranteed non nil), offers the ability to customize the context object (resulting context object passed to work processor)
PostRead func(ctx context.Context, meta LifecyclePostReadMeta) (context.Context, error)
// Called by work immediately after an attempt to read a message. Msg might be nil, if there was an error
// or no available messages.
PostReadImmediate func(ctx context.Context, meta LifecyclePostReadImmediateMeta)
// Called after receiving a message and before processing it.
PreProcessing func(ctx context.Context, meta LifecyclePreProcessingMeta) (context.Context, error)
// Called after processing a message
PostProcessing func(ctx context.Context, meta LifecyclePostProcessingMeta) error
// Called after sending a message to the queue
PostAck func(ctx context.Context, meta LifecyclePostAckMeta) error
// Called prior to executing write operation
PreWrite func(ctx context.Context, meta LifecyclePreWriteMeta) (LifecyclePreWriteResp, error)
// Call after the reader attempts a fanOut call.
PostFanout func(ctx context.Context)
}
// ChainLifecycleHooks chains multiple lifecycle hooks into one. The hooks are
// called in the order they are passed. All hooks are called, even when
// errors occur. Errors are accumulated in a wrapper error and returned to the
// caller.
func ChainLifecycleHooks(hooks ...LifecycleHooks) LifecycleHooks {
if len(hooks) == 0 {
return LifecycleHooks{}
}
if len(hooks) == 1 {
return hooks[0]
}
return LifecycleHooks{
PostRead: func(ctx context.Context, meta LifecyclePostReadMeta) (context.Context, error) {
var allErrs error
hookCtx := ctx
for _, h := range hooks {
if h.PostRead != nil {
var err error
hookCtx, err = h.PostRead(hookCtx, meta)
if err != nil {
allErrs = errors.Join(allErrs, err)
}
}
}
return hookCtx, allErrs
},
PostReadImmediate: func(ctx context.Context, meta LifecyclePostReadImmediateMeta) {
for _, h := range hooks {
if h.PostReadImmediate != nil {
h.PostReadImmediate(ctx, meta)
}
}
},
PreProcessing: func(ctx context.Context, meta LifecyclePreProcessingMeta) (context.Context, error) {
var allErrs error
hookCtx := ctx
for _, h := range hooks {
if h.PreProcessing != nil {
var err error
hookCtx, err = h.PreProcessing(hookCtx, meta)
if err != nil {
allErrs = errors.Join(allErrs, err)
}
}
}
return hookCtx, allErrs
},
PostProcessing: func(ctx context.Context, meta LifecyclePostProcessingMeta) error {
var allErrs error
for _, h := range hooks {
if h.PostProcessing != nil {
err := h.PostProcessing(ctx, meta)
if err != nil {
allErrs = errors.Join(allErrs, err)
}
}
}
return allErrs
},
PostAck: func(ctx context.Context, meta LifecyclePostAckMeta) error {
var allErrs error
for _, h := range hooks {
if h.PostAck != nil {
err := h.PostAck(ctx, meta)
if err != nil {
allErrs = errors.Join(allErrs, err)
}
}
}
return allErrs
},
PreWrite: func(ctx context.Context, meta LifecyclePreWriteMeta) (LifecyclePreWriteResp, error) {
var allErrs error
out := LifecyclePreWriteResp{
Headers: make(map[string][]byte),
}
for _, h := range hooks {
if h.PreWrite != nil {
var err error
resp, err := h.PreWrite(ctx, meta)
if err != nil {
allErrs = errors.Join(allErrs, err)
}
for k, v := range resp.Headers {
out.Headers[k] = v
}
}
}
return out, allErrs
},
PostFanout: func(ctx context.Context) {
for _, h := range hooks {
if h.PostFanout != nil {
h.PostFanout(ctx)
}
}
},
}
}