-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
274 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,264 @@ | ||
--- | ||
layout: doc | ||
--- | ||
|
||
# 工作队列 | ||
|
||
## 循环分发消息 | ||
|
||
> 默认情况下,`RabbitMQ`将按顺序将每条消息发送给下一个消费者,每个`Consumer`将收到相同数量的消息,这种分发消息的方式称为`循环轮询` | ||
> | ||
> 我们可以同时开启多个协程去消费消息来测试这一特性,消费消息的我们称之为`worker` | ||
> 写一个通用的接收消息的通用函数`consumeMessage`(worker.go),然后开启`3`个协程去消费消息 | ||
> | ||
> 可以看到,每个message被消费后,会等待一段时间,才能去消费下一个消息(消息文案中包含几个`.`,就等待`10-.的个数`秒) | ||
```Go | ||
for i := 1; i <= 3; i++ { | ||
go consumeMessage(ch, queue.Name, i) | ||
} | ||
func consumeMessage(ch *amqp.Channel, queueName string, serialNumber int) { | ||
// 4、consume message | ||
message, err := ch.Consume( | ||
queueName, | ||
"", // consumer | ||
true, // auto-ack | ||
false, // exclusive | ||
false, // no-local | ||
false, // no-wait | ||
nil, // args | ||
) | ||
shared.FailOnError(err, "consume queue message error") | ||
for msg := range message { | ||
fmt.Printf("【NO %d】 Received a message: %s", serialNumber, msg.Body) | ||
dotCount := bytes.Count(msg.Body, []byte(".")) | ||
duration := time.Duration(10 - dotCount) | ||
time.Sleep(duration * time.Second) | ||
fmt.Printf("Done \n") | ||
} | ||
} | ||
``` | ||
|
||
> 而在生产消息时(task.go),一次性发送多条 | ||
```Go | ||
for i := 0; i < 10; i++ { | ||
messageBody := fmt.Sprintf("我是第[%d]条消息%s \n", i+1, strings.Repeat(".", i+1)) | ||
err = ch.PublishWithContext( | ||
withTimeoutCtx, | ||
"", //exchange name | ||
queue.Name, // routing key | ||
false, // mandatory | ||
false, //immediate | ||
amqp.Publishing{ | ||
Body: []byte(messageBody), | ||
ContentType: "text/plain", | ||
}, | ||
) | ||
shared.FailOnError(err, "Publish message error") | ||
fmt.Printf(" [x] Sent %s\n", messageBody) | ||
} | ||
``` | ||
|
||
> 先运行消费者代码`worker.go`,然后执行生产者代码`task.go`,打印如下: | ||
```shell | ||
# workQueues/task.go 打印 | ||
[x] Sent 我是第[1]条消息. | ||
[x] Sent 我是第[2]条消息.. | ||
[x] Sent 我是第[3]条消息... | ||
[x] Sent 我是第[4]条消息.... | ||
[x] Sent 我是第[5]条消息..... | ||
[x] Sent 我是第[6]条消息...... | ||
[x] Sent 我是第[7]条消息....... | ||
[x] Sent 我是第[8]条消息........ | ||
[x] Sent 我是第[9]条消息......... | ||
[x] Sent 我是第[10]条消息.......... | ||
|
||
# go run workQueues/worker.go 打印 | ||
【NO 1】 Received a message: 我是第[3]条消息..., Will wait [7s] | ||
【NO 3】 Received a message: 我是第[1]条消息., Will wait [9s] | ||
【NO 2】 Received a message: 我是第[2]条消息.., Will wait [8s] | ||
【NO 1】Done | ||
【NO 1】 Received a message: 我是第[6]条消息......, Will wait [4s] | ||
【NO 2】Done | ||
【NO 2】 Received a message: 我是第[5]条消息....., Will wait [5s] | ||
【NO 3】Done | ||
【NO 3】 Received a message: 我是第[4]条消息...., Will wait [6s] | ||
【NO 1】Done | ||
【NO 1】 Received a message: 我是第[9]条消息........., Will wait [1s] | ||
【NO 1】Done | ||
【NO 2】Done | ||
【NO 2】 Received a message: 我是第[8]条消息........, Will wait [2s] | ||
【NO 2】Done | ||
【NO 3】Done | ||
【NO 3】 Received a message: 我是第[7]条消息......., Will wait [3s] | ||
【NO 3】Done | ||
【NO 3】 Received a message: 我是第[10]条消息.........., Will wait [0s] | ||
【NO 3】Done | ||
``` | ||
|
||
> 可以看到上面的打印,消息是轮训发放的,就算要等很久才能消费,也会按轮训的顺序分配 | ||
> | ||
> 序号【`NO1`】消费第`3、6、9`条消息 | ||
> | ||
> 序号【`NO2`】消费第`2、5、8`条消息 | ||
> | ||
> 序号【`NO3`】消费第`1、4、7、10`条消息 | ||
> 但是其实`第4条`消息`【NO1】`来消费是比较合适的,因为它等待的时间最短,可以更早地去处理下一条消息 | ||
> | ||
> | ||
> 要想达到这样的效果,可以通过`ch.Qos`方法,将`prefetch`设置为`1`,它表示在消费者确认消息之前,`RabbitMQ`可以`发送给消费者的最大消息数`,也就是告诉`RabbutMQ`一次不要给一个工作者多于`1条`消息,这通常用于确保消费者逐条处理消息,避免消息堆积。 | ||
```Go | ||
err = ch.Qos( | ||
1, // prefetch count | ||
0, // prefetch size | ||
false, // global | ||
) | ||
``` | ||
|
||
- `prefetchCount(int)`: | ||
|
||
> 解释: 预取计数(Prefetch Count)定义了在消费者确认消息之前,RabbitMQ 可以发送给消费者的最大消息数。简单来说,就是`消费者在确认之前最多可以收到多少条消息`。 | ||
> 示例中的值: 1 表示消费者在确认一条消息之前最多只能收到一条消息。这通常用于确保消费者逐条处理消息,避免消息堆积。 | ||
- `prefetchSize(int)`: | ||
|
||
> 解释: 预取大小(Prefetch Size)定义了在消费者确认消息之前,RabbitMQ可以发送给消费者的最大字节数。通常情况下,这个值设置为`0`,表示`不限制大小`。 | ||
> 示例中的值: 0 表示没有预取大小的限制。 | ||
- `global(bool)`: | ||
|
||
> 解释: 全局标志(Global Flag)决定了`QoS`设置是应用于整个通道还是仅应用于当前消费者。 | ||
> 示例中的值: `false`表示`QoS`设置仅应用于`当前消费者` | ||
> | ||
> 如果设置为`true`,则QoS设置应用于`整个channel上所有的消费者` | ||
> 调整后输出如下: | ||
```shell | ||
【NO 2】 Received a message: 我是第[3]条消息..., Will wait [7s] | ||
【NO 3】 Received a message: 我是第[1]条消息., Will wait [9s] | ||
【NO 1】 Received a message: 我是第[2]条消息.., Will wait [8s] | ||
【NO 2】Done | ||
【NO 2】 Received a message: 我是第[4]条消息...., Will wait [6s] | ||
【NO 1】Done | ||
【NO 1】 Received a message: 我是第[5]条消息....., Will wait [5s] | ||
【NO 3】Done | ||
【NO 3】 Received a message: 我是第[6]条消息......, Will wait [4s] | ||
【NO 3】Done | ||
【NO 2】Done | ||
【NO 1】Done | ||
【NO 2】 Received a message: 我是第[8]条消息........, Will wait [2s] | ||
【NO 3】 Received a message: 我是第[7]条消息......., Will wait [3s] | ||
【NO 1】 Received a message: 我是第[9]条消息........., Will wait [1s] | ||
【NO 1】Done | ||
【NO 1】 Received a message: 我是第[10]条消息.........., Will wait [0s] | ||
【NO 1】Done | ||
【NO 2】Done | ||
【NO 3】Done | ||
``` | ||
|
||
> 如上:【NO2】先消费第`3`条消息,如果没有上面的设置那么它完成后会先去消费第`6`条消息,但是通过`Qos`方法设置后,先消费第`4`条消息 | ||
## 消息确认 | ||
|
||
> 现在这个场景下,如果我们启动`worker.go`后,然后启动`task.go`,此时消息如果还没有接收完,就断开`worker.go`,那么此时刚才未处理的消息就会消失,再次 | ||
> 执行`worker.go`也不会接收到任何消息了 | ||
:::tip | ||
- 上面的现象是因为在调用`ch.Consume`函数时,将`第3个参数auto-ack`设置为了`true`,这表明只要`Producer`成功发送消息后,就将该消息标记为`已确认`,默认该消息已经完成处理,不会管`Consumer`是否收到 | ||
::: | ||
|
||
> 要处理上面的问题,需要以下步骤 | ||
> | ||
> 1、将`auto-ack`设置为`false` | ||
> | ||
> 2、消息处理完成后手动调用`Ack`函数确认已收到消息 | ||
> `worker.go`修改如下: | ||
```Go | ||
func consumeMessage(ch *amqp.Channel, queueName string, serialNumber int) { | ||
// 4、consume message | ||
message, err := ch.Consume( | ||
queueName, | ||
"", // consumer | ||
false, // auto-ack 不自动确认消息 | ||
false, // exclusive | ||
false, // no-local | ||
false, // no-wait | ||
nil, // args | ||
) | ||
shared.FailOnError(err, "consume queue message error") | ||
go func() { | ||
for msg := range message { | ||
dotCount := bytes.Count(msg.Body, []byte(".")) | ||
duration := time.Duration(10 - dotCount) | ||
fmt.Printf("【NO %d】 Received a message: %s, Will wait [%s]\n", serialNumber, msg.Body, duration*time.Second) | ||
time.Sleep(duration * time.Second) | ||
fmt.Printf("Done \n") | ||
// Manually confirm that the message has been delivered | ||
msg.Ack(false) | ||
} | ||
}() | ||
} | ||
``` | ||
|
||
## 消息持久化 | ||
|
||
> 现在的代码中,如果在`发送/接收`消息的过程中,`RabbitMQ`服务挂了,或者`出错退出了`,那么消息就会丢失 | ||
> | ||
> 为了解决这个问题,可以设置`消息持久化`,对于`Producer`和`Consumer`都要设置 | ||
> 1、发送方 | ||
> - 调用`ch.PublishWithContext`函数时,设置`消息body(amqp.Publishing)`时,将模式`DeliveryMode`改为持久化 | ||
> - 声明队列调用`ch.QueueDeclare`函数时,将第二个参数`durable`设置为`true` | ||
> `task.go`修改如下: | ||
```Go | ||
// 3. declare a queue,it will only be created if it doesn't exist already | ||
queue, err := ch.QueueDeclare( | ||
"second_queue", // queue name | ||
true, // durable | ||
false, //delete when unused | ||
false, //exclusive | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
err = ch.PublishWithContext( | ||
withTimeoutCtx, | ||
"", //exchange name | ||
queue.Name, // routing key | ||
false, // mandatory | ||
false, //immediate | ||
amqp.Publishing{ | ||
DeliveryMode: amqp.Persistent, // DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) | ||
Body: []byte(messageBody), | ||
ContentType: "text/plain", | ||
}, | ||
) | ||
``` | ||
|
||
|
||
> 2、接收方 | ||
> - 声明队列调用`ch.QueueDeclare`函数时,将第二个参数`durable`设置为`true` | ||
```Go | ||
// 3. declare a queue,it will only be created if it doesn't exist already | ||
queue, err := ch.QueueDeclare( | ||
"second_queue", // queue name | ||
true, // durable | ||
false, //delete when unused | ||
false, //exclusive | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
``` |