-
Notifications
You must be signed in to change notification settings - Fork 18
/
ack_nack.go
66 lines (49 loc) · 1.18 KB
/
ack_nack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package middleware
import (
"context"
"github.com/makasim/amqpextra/consumer"
"github.com/makasim/amqpextra/logger"
amqp "github.com/rabbitmq/amqp091-go"
)
const Ack = "ack"
const Nack = "nack_requeue"
const Requeue = "requeue"
func AckNack() consumer.Middleware {
return func(next consumer.Handler) consumer.Handler {
fn := func(ctx context.Context, msg amqp.Delivery) interface{} {
result := next.Handle(ctx, msg)
if result == nil {
return nil
}
l, ok := GetLogger(ctx)
if !ok {
l = logger.Discard
}
switch result {
case Ack:
if err := msg.Ack(false); err != nil {
l.Printf("[ERROR] message ack errored", err)
return nil
}
l.Printf("[DEBUG] message acked")
return nil
case Nack:
if err := msg.Nack(false, false); err != nil {
l.Printf("[ERROR] message nack errored", err)
return nil
}
l.Printf("[DEBUG] message nacked")
return nil
case Requeue:
if err := msg.Nack(false, true); err != nil {
l.Printf("[ERROR] message nack requeue errored", err)
return nil
}
l.Printf("[DEBUG] message requeue")
return nil
}
return result
}
return consumer.HandlerFunc(fn)
}
}