-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
397 lines (355 loc) · 12.5 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
package rmq
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/danlock/rmq/internal"
amqp "github.com/rabbitmq/amqp091-go"
)
type PublisherArgs struct {
Args
// NotifyReturn will receive amqp.Return's from any amqp.Channel this rmq.Publisher sends on.
// Recommended to use a buffered channel. Closed after the publisher's context is done.
NotifyReturn chan<- amqp.Return
// LogReturns without their amqp.Return.Body using Args.Log when true
LogReturns bool
// DontConfirm means the Publisher's amqp.Channel won't be in Confirm mode. Methods except for Publish will throw an error.
DontConfirm bool
}
type Publisher struct {
ctx context.Context
config PublisherArgs
in chan *Publishing
}
// NewPublisher creates a rmq.Publisher that will publish messages to AMQP on a single amqp.Channel at a time.
// On error it reconnects via rmq.Connection. Shuts down when it's context is finished.
func NewPublisher(ctx context.Context, rmqConn *Connection, config PublisherArgs) *Publisher {
if ctx == nil || rmqConn == nil {
panic("rmq.NewPublisher called with nil ctx or rmqConn")
}
config.setDefaults()
pub := &Publisher{
ctx: ctx,
config: config,
in: make(chan *Publishing),
}
go pub.connect(rmqConn)
return pub
}
// connect grabs an amqp.Channel from rmq.Connection. It does so repeatedly on any error until it's context finishes.
func (p *Publisher) connect(rmqConn *Connection) {
internal.Retry(p.ctx, p.config.Delay, func(delay time.Duration) (time.Duration, bool) {
logPrefix := "rmq.Publisher.connect"
mqChan, err := rmqConn.Channel(p.ctx)
if err != nil {
p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to get amqp.Channel, retrying in %s due to %+v", delay.String(), err)
return 0, false
}
if !p.config.DontConfirm {
if err := mqChan.Confirm(false); err != nil {
p.config.Log(p.ctx, slog.LevelError, logPrefix+" failed to put amqp.Channel in confirm mode, retrying in %s due to %+v", delay.String(), err)
return 0, false
}
}
start := time.Now()
p.handleReturns(mqChan)
p.listen(mqChan)
return time.Since(start), true
})
}
const dropReturnsAfter = 10 * time.Millisecond
// handleReturns echos the amqp.Channel's Return's until it closes
func (p *Publisher) handleReturns(mqChan *amqp.Channel) {
logPrefix := "rmq.Publisher.handleReturns"
if p.config.NotifyReturn == nil && !p.config.LogReturns {
return
}
notifyReturns := mqChan.NotifyReturn(make(chan amqp.Return))
go func() {
dropTimer := time.NewTimer(0)
<-dropTimer.C
for r := range notifyReturns {
if p.config.LogReturns {
// A Body can be arbitrarily large and/or contain sensitve info. Don't log it by default.
rBody := r.Body
r.Body = nil
p.config.Log(p.ctx, slog.LevelWarn, logPrefix+" got %+v", r)
r.Body = rBody
}
if p.config.NotifyReturn == nil {
continue
}
dropTimer.Reset(dropReturnsAfter)
// Try not to repeat streadway/amqp's mistake of deadlocking if a client isn't listening to their Notify* channel.
// (https://github.com/rabbitmq/amqp091-go/issues/18)
// If they aren't listening to p.config.NotifyReturn, just drop the amqp.Return instead of deadlocking and leaking goroutines
select {
case p.config.NotifyReturn <- r:
// Why is reusing a timer so bloody complicated... It's almost worth the timer leak just to reduce complexity
if !dropTimer.Stop() {
<-dropTimer.C
}
case <-dropTimer.C:
}
}
// Close when the context is done, since we wont be sending anymore returns
if p.config.NotifyReturn != nil && p.ctx.Err() != nil {
close(p.config.NotifyReturn)
}
}()
}
// listen sends publishes on a amqp.Channel until it's closed.
func (p *Publisher) listen(mqChan *amqp.Channel) {
logPrefix := "rmq.Publisher.listen"
finishedPublishing := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(p.ctx)
defer cancel()
// Handle publishes in a separate goroutine so a slow publish won't lock up listen()
go func() {
for {
select {
case <-ctx.Done():
close(finishedPublishing)
return
case pub := <-p.in:
pub.publish(mqChan)
}
}
}()
notifyClose := mqChan.NotifyClose(make(chan *amqp.Error, 2))
for {
select {
case <-p.ctx.Done():
// Wait for publishing to finish since closing the channel in the middle of another channel request
// tends to kill the entire connection with a "504 CHANNEL ERROR expected 'channel.open'"
<-finishedPublishing
if err := mqChan.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) {
p.config.Log(p.ctx, slog.LevelError, logPrefix+" got an error while closing channel %v", err)
return
}
case err, ok := <-notifyClose:
if !ok {
return
} else if err != nil {
p.config.Log(p.ctx, slog.LevelError, logPrefix+" got an amqp.Channel close err %v", err)
}
}
}
}
type Publishing struct {
amqp.Publishing
Exchange string
RoutingKey string
Mandatory bool
Immediate bool
// req is internal and private, which means it can't be set by callers.
// This means it has the nice side effect of forcing callers to set struct fields when instantiating Publishing
req internal.ChanReq[*amqp.DeferredConfirmation]
}
func (p *Publishing) publish(mqChan *amqp.Channel) {
var resp internal.ChanResp[*amqp.DeferredConfirmation]
resp.Val, resp.Err = mqChan.PublishWithDeferredConfirmWithContext(
p.req.Ctx, p.Exchange, p.RoutingKey, p.Mandatory, p.Immediate, p.Publishing)
p.req.RespChan <- resp
}
// Publish send a Publishing on rmq.Publisher's current amqp.Channel.
// Returns amqp.DefferedConfirmation's only if the rmq.Publisher has Confirm set.
// If an error is returned, rmq.Publisher will grab another amqp.Channel from rmq.Connection, which itself will redial AMQP if necessary.
// This means simply retrying Publish on errors will send Publishing's even on flaky connections.
func (p *Publisher) Publish(ctx context.Context, pub Publishing) (*amqp.DeferredConfirmation, error) {
pub.req.Ctx = ctx
pub.req.RespChan = make(chan internal.ChanResp[*amqp.DeferredConfirmation], 1)
select {
case <-ctx.Done():
return nil, fmt.Errorf("rmq.Publisher.Publish context done before publish sent %w", context.Cause(ctx))
case <-p.ctx.Done():
return nil, fmt.Errorf("rmq.Publisher context done before publish sent %w", context.Cause(p.ctx))
case p.in <- &pub:
}
select {
case <-ctx.Done():
return nil, fmt.Errorf("rmq.Publisher.Publish context done before publish completed %w", context.Cause(ctx))
case <-p.ctx.Done():
return nil, fmt.Errorf("rmq.Publisher context done before publish completed %w", context.Cause(p.ctx))
case r := <-pub.req.RespChan:
return r.Val, r.Err
}
}
// PublishUntilConfirmed calls Publish and waits for Publishing to be confirmed.
// It republishes if a message isn't confirmed after confirmTimeout, or if Publish returns an error.
// Returns *amqp.DeferredConfirmation so the caller can check if it's Acked().
// confirmTimeout defaults to 1 minute. Recommended to call with context.WithTimeout.
func (p *Publisher) PublishUntilConfirmed(ctx context.Context, confirmTimeout time.Duration, pub Publishing) (*amqp.DeferredConfirmation, error) {
logPrefix := "rmq.Publisher.PublishUntilConfirmed"
if p.config.DontConfirm {
return nil, fmt.Errorf(logPrefix + " called on a rmq.Publisher that's not in Confirm mode")
}
if confirmTimeout <= 0 {
confirmTimeout = 15 * time.Second
}
var pubDelay time.Duration
attempt := 0
errs := make([]error, 0)
for {
defConf, err := p.Publish(ctx, pub)
if err != nil {
pubDelay = p.config.Delay(attempt)
attempt++
errs = append(errs, err)
select {
case <-ctx.Done():
err = fmt.Errorf(logPrefix+" context done before the publish was sent %w", context.Cause(ctx))
return defConf, errors.Join(append(errs, err)...)
case <-time.After(pubDelay):
continue
}
}
attempt = 0
confirmTimeout := time.NewTimer(confirmTimeout)
defer confirmTimeout.Stop()
select {
case <-confirmTimeout.C:
errs = append(errs, errors.New(logPrefix+" timed out waiting for confirm, republishing"))
continue
case <-ctx.Done():
err = fmt.Errorf("rmq.Publisher.PublishUntilConfirmed context done before the publish was confirmed %w", context.Cause(ctx))
return defConf, errors.Join(append(errs, err)...)
case <-defConf.Done():
return defConf, nil
}
}
}
// PublishUntilAcked is like PublishUntilConfirmed, but it also republishes nacks. User discretion is advised.
//
// Nacks can happen for a variety of reasons, ranging from user error (mistyped exchange) to RabbitMQ internal errors.
//
// PublishUntilAcked will republish a Mandatory Publishing with a nonexistent exchange forever (until the exchange exists), as one example.
// RabbitMQ acks Publishing's so monitor the NotifyReturn chan to ensure your Publishing's are being delivered.
//
// PublishUntilAcked is intended for ensuring a Publishing with a known destination queue will get acked despite flaky connections or temporary RabbitMQ node failures.
// Recommended to call with context.WithTimeout.
func (p *Publisher) PublishUntilAcked(ctx context.Context, confirmTimeout time.Duration, pub Publishing) error {
logPrefix := "rmq.Publisher.PublishUntilAcked"
nacks := 0
for {
defConf, err := p.PublishUntilConfirmed(ctx, confirmTimeout, pub)
if err != nil {
if nacks > 0 {
return fmt.Errorf(logPrefix+" resent nacked Publishings %d time(s) and %w", nacks, err)
}
return err
}
if defConf.Acked() {
return nil
}
nacks++
}
}
// PublishBatchUntilAcked Publishes all of your Publishings at once, and then wait's for the DeferredConfirmation to be Acked,
// resending if it's been longer than confirmTimeout or if they've been nacked.
// confirmTimeout defaults to 1 minute. Recommended to call with context.WithTimeout.
func (p *Publisher) PublishBatchUntilAcked(ctx context.Context, confirmTimeout time.Duration, pubs ...Publishing) error {
logPrefix := "rmq.Publisher.PublishBatchUntilAcked"
if len(pubs) == 0 {
return nil
}
if p.config.DontConfirm {
return fmt.Errorf(logPrefix + " called on a rmq.Publisher that's not in Confirm mode")
}
if confirmTimeout == 0 {
confirmTimeout = time.Minute
}
errs := make([]error, 0)
pendingPubs := make([]*amqp.DeferredConfirmation, len(pubs))
ackedPubs := make([]bool, len(pubs))
for {
select {
case <-p.ctx.Done():
err := fmt.Errorf(logPrefix+"'s Publisher timed out because %w", context.Cause(p.ctx))
return errors.Join(append(errs, err)...)
case <-ctx.Done():
err := fmt.Errorf(logPrefix+" timed out because %w", context.Cause(ctx))
return errors.Join(append(errs, err)...)
default:
}
err := p.publishBatch(ctx, confirmTimeout, pubs, pendingPubs, ackedPubs, errs)
if err == nil {
return nil
}
clear(pendingPubs)
}
}
// publishBatch publishes a slice of pubs once, waiting for them all to get acked.
// republishes on failure, returns after they've confirmed.
// blocks until context ends or confirmTimeout
func (p *Publisher) publishBatch(
ctx context.Context,
confirmTimeout time.Duration,
pubs []Publishing,
pendingPubs []*amqp.DeferredConfirmation,
ackedPubs []bool,
errs []error,
) (err error) {
logPrefix := "rmq.Publisher.publishBatch"
var delay time.Duration
attempt := 0
published := 0
remaining := 0
for _, acked := range ackedPubs {
if !acked {
remaining++
}
}
for published != remaining {
for i, pub := range pubs {
// Skip if it's been previously acked or published
if ackedPubs[i] || pendingPubs[i] != nil {
continue
}
pendingPubs[i], err = p.Publish(ctx, pub)
if err != nil {
errs = append(errs, err)
delay = p.config.Delay(attempt)
attempt++
select {
case <-p.ctx.Done():
return fmt.Errorf(logPrefix+"'s Publisher timed out because %w", context.Cause(p.ctx))
case <-ctx.Done():
return fmt.Errorf(logPrefix+" timed out because %w", context.Cause(ctx))
case <-time.After(delay):
}
} else {
published++
attempt = 0
}
}
}
confirmTimer := time.After(confirmTimeout)
confirmed := 0
for confirmed != remaining {
for i, pub := range pendingPubs {
// Skip if it's been previously confirmed
if ackedPubs[i] || pendingPubs[i] == nil {
continue
}
select {
case <-p.ctx.Done():
return fmt.Errorf(logPrefix+"'s Publisher timed out because %w", context.Cause(p.ctx))
case <-ctx.Done():
return fmt.Errorf(logPrefix+" timed out because %w", context.Cause(ctx))
case <-confirmTimer:
return fmt.Errorf(logPrefix + " timed out waiting on confirms")
case <-pub.Done():
if pub.Acked() {
ackedPubs[i] = true
}
confirmed++
pendingPubs[i] = nil
default:
}
}
}
return nil
}