-
Notifications
You must be signed in to change notification settings - Fork 5
/
channel.go
113 lines (95 loc) · 2.49 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package rabbitmq
import (
"context"
"errors"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
)
type rabbitmqChannel struct {
connection *amqp.Connection
rawChannel *amqp.Channel
}
func newRabbitChannel(conn *amqp.Connection) (*rabbitmqChannel, error) {
rabbitCh := &rabbitmqChannel{
connection: conn,
}
if err := rabbitCh.Connect(); err != nil {
return nil, err
}
return rabbitCh, nil
}
func (r *rabbitmqChannel) Connect() (err error) {
r.rawChannel, err = r.connection.Channel()
return
}
func (r *rabbitmqChannel) DeclareExchange(ex ExchangeOptions) error {
return r.rawChannel.ExchangeDeclare(
ex.Name, // name
string(ex.Type), // kind
ex.Durable, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args
)
}
// DeclareDurableQueue 持久化队列
// rabbitmq服务重启后,队列数据不会丢失;消费者连接时,队列也不会被删除
func (r *rabbitmqChannel) DeclareDurableQueue(queue string, args amqp.Table) (err error) {
_, err = r.rawChannel.QueueDeclare(
queue, // name
true, // durable
false, // autoDelete
false, // exclusive
false, // noWait
args, // args
)
return
}
// DeclareQueue 非持久化队列
// rabbitmq服务重启后,队列数据会丢失;消费者连接时,队列会自动删除
func (r *rabbitmqChannel) DeclareQueue(queue string, args amqp.Table) (err error) {
_, err = r.rawChannel.QueueDeclare(
queue, // name
false, // durable
true, // autoDelete
false, // exclusive
false, // noWait
args, // args
)
return
}
func (r *rabbitmqChannel) ConsumeQueue(queue string, autoAck bool) (<-chan amqp.Delivery, error) {
id, err := uuid.NewRandom()
if err != nil {
return nil, err
}
return r.rawChannel.Consume(
queue, // queue
id.String(), // consumer
autoAck, // autoAck
false, // exclusive
false, // noLocal
false, // nowait
nil, // args
)
}
func (r *rabbitmqChannel) BindQueue(queue, key, exchange string, args amqp.Table) error {
return r.rawChannel.QueueBind(
queue, // name
key, // key
exchange, // exchange
false, // noWait
args, // args
)
}
func (r *rabbitmqChannel) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error {
if r.rawChannel == nil {
return errors.New("rawChannel is nil")
}
err := r.rawChannel.PublishWithContext(ctx, exchange, key, false, false, msg)
if err != nil {
return err
}
return nil
}