###可靠性投递的解决方案
-
消息落库, 对消息状态进行标记
-
消息入库
-
消息发送
-
消费端消息确认
-
更新库中消息状态为已确认
-
定时任务读取数据库中未确认的消息
-
未收到确认结果的消息重新发送
-
如果重试几次之后仍然失败, 则将消息状态更改为投递失败的终态, 后面需要人工介入
-
-
消息的延迟投递, 做二次确认, 回调检查
-
第一次消息发送, 必须业务数据落库之后才能进行消息发送
-
第二次消息延迟发送, 设定延迟一段时间发送第二次check消息
-
消费端监听Broker, 进行消息消费
-
更新库中消息状态为已确认
-
消费成功之后, 发送确认消息到确认消息队列
-
Callback Service监听4中的确认消息队列, 维护消息状态, 是否消费成功等状态
-
Callback Service监听2发送的Delay Check的消息队列, 检测内部的消息状态, 如果消息是发送成功状态, 则流程结束, 如果消息是失败状态, 或者查不到当前消息状态时, 会通知生产者, 进行消息重发, 重新上述步骤
-
###幂等性保障
幂等性 : 多次执行, 结果保持一致
-
唯一ID + 指纹码机制, 利用数据库主键去重
-
好处 : 实现简单
-
坏处 : 高并发下有数据库写入的性能瓶颈
-
解决方案 : 根据ID进行分库分表进行算法路由
-
-
利用Redis的原子性实现
-
是否要进行数据落库, 如果落库的话, 数据库和缓存如何做到原子性
-
如果不落库, 数据都存储到缓存中, 如何设置定时同步的策略
-
###Confirm确认消息
-
消息的确认, 是指生产者投递消息后, 如果Broker收到消息, 则会给我们产生一个应答
-
生产者进行接收应答, 用来确定这条消息是否正常发送到Broker, 这种方式也是消息的可靠性投递的核心保障
-
如何实现:
-
在channel上开启确认模式 : channel.confirmSelect()
-
在channel上添加监听 : addConfirmListener, 监听成功和失败的返回结果, 根据具体的结果对消息进行重新发送, 或记录日志等后续处理
-
###Return消息机制
-
Return Listener用于处理一些不可路由的消息
-
正常情况下消息生产者通过指定一个Exchange和RoutingKey, 把消息送到某一个队列中去, 然后消费者监听队列, 进行消费
-
但在某些情况下, 如果在发送消息的时候, 当前的exchange不存在或者指定的路由key路由不到, 这个时候如果我们需要监听这种不可达的消息, 就要使用Return Listener
-
在基础API中有一个关键的配置项Mandatory : 如果为true, 则监听器会接收到路由不可达的消息, 然后进行后续处理, 如果为false, 那么broker端自动删除该消息
###消费端限流
-
消息队列中囤积了大量的消息, 或者某些时刻生产的消息远远大于消费者处理能力的时候, 这个时候如果消费者一次取出大量的消息, 但是客户端又无法处理, 就会出现问题, 甚至可能导致服务崩溃, 所以需要对消费端进行限流
-
RabbitMQ提供了一种qos(服务质量保证)功能, 即在非自动确认消息的前提下, 如果一定数目的消息(通过consumer或者channel设置qos的值)未被确认前, 不进行消费新的消息
###消费端ACK与重回队列
-
消费端ACK
-
消费端的手工ACK和NACK, ACK是确认成功消费, NACK表示消息处理失败, 会重发消息
-
消费端进行消费的时候, 如果由于业务异常我们可以进行日志的记录, 然后进行补偿
-
如果由于服务器宕机等严重问题, 就需要手工进行ACK保障消费端消费成功
-
-
重回队列
-
消费端重回队列是为了对没有处理成功的消息, 把消息重新回递给Broker
-
一般在实际应用中, 都会关闭重回队列, 也就是设置为False
-
###TTL队列/消息
-
TTL是Time To Live的缩写, 也就是生存时间
-
RabbitMQ支持消息的过期时间, 在消息发送时可以进行
-
RabbitMQ支持队列的过期时间, 从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除
-
消费者中设置队列超时时间为10秒, 启动之后关闭消费者
-
生产者发送两条消息, 一条消息不设置超时时间, 一条消息设置5秒后超时
-
启动生产者之后, 监控RabbitMQ控制台, 发现5秒后设置了消息超时时间的消息先超时清除, 然后10秒后另外一条消息也超时清除
-
###死信队列(DLX)
-
Dead-Letter-Exchange
-
利用DLX, 当消息在一个队列中变成死信(dead message)之后, 它能被重新publish到另一个Exchange, 这个Exchange就是DLX
-
DLX也是一个正常的Exchange, 和一般的Exchange没有区别, 它能在任何队列上被指定, 实际上就是设置某个队列的属性为死信队列
-
当这个队列中有死信时, RabbitMQ就会自动将这个消息重新发布到设置的Exchange上去, 进而被路由到另一个队列
-
可以监听这个队列中消息做相应的处理, 这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能
####消息变成死信有以下几种情况 :
消息被拒绝(basic.reject/basic.nack) 并且requeue重回队列设置成false
消息TTL过期
队列达到最大长度
####死信队列的设置 :
-
首先要设置死信队列的exchange和queue, 然后进行绑定
- Exchange : dlx.exchange
- Queue : dlx.queue
- RoutingKey : #
-
然后正常声明交换机, 队列, 绑定, 只不过需要在队列加上一个扩展参数即可 : arguments.put(“x-dead-letter-exchange”, “dlx.exchange”)
-
这样消息在过期, reject或nack(requeue要设置成false), 队列在达到最大长度时, 消息就可以直接路由到死信队列
示例:
1. 消费者中设置死信队列和正常队列, 启动之后关闭消费
2. 生产者生产的消息一个超时时间, 使消息超时之后变为死信