在我们日常业务使用 MQ 时,如何保证消息不丢失失一个特别需要关注的点。在某些业务场景下消息的丢失造成的影响可能不大,比如购物车,下单后删除购物车对应商品的消息丢失了还可以容忍,但是在金融场景下,消息丢失了往往代表着资产的损失,这是无法容忍的。
当然,即使在丢失影响不大的场景下,我们也要尽可能的保证消息不丢失,不然业务流程就会不完整。
综述,用到了 MQ,保证消息不丢失就很关键。那么如何保证呢?我们需要分析下 MQ 的整个链路。
消息从生产者发送至 Broker,Broker 存储消息,消费者从 Broker 拉取消息消费。
因此,如果需要保证消息不丢失,从整个链路来看需要保证三大流程中消息都不丢失,即:
- 发送:生产者需要保证消息被完整的发送并存储至 Broker 中;
- 存储:Broker 需要保证已经存储的消息不丢失,即需要持久化;
- 消费:消费者需要保证拉取的消息一定被消费。
生产者发送消息
消息在生产者侧产生,而且消息的生成往往伴随着某个业务,比如【下单后给用户加积分】场景,在代码中,我们会先保存订单,然后发送消息让积分系统给对应的用户加积分,伪代码:
void addOrder(params) { |
我们需要确保订单保存成功后,积分消息一定要发送成功。此处就涉及到请求确认机制,即 ACK。简单来说,就是生产者与 Broker 交互通过 ACK 来确认消息的成功接收。
当生产者发送消息给 Broker 之后,如果 Broker 接收到这条消息,那么就返回 ACK 给生产者,一旦生产者接收到了 ACK,就表明消息已经被 Broker 成功接收了,因此保证发送阶段消息不回丢失。
如果 Broker 没有返回 ACK,该如何处理?
不管是因为网络还是其他原因,只要此时生产者超时等待没有接收到 ACK,那么就需要重试。重试也是需要一定限制的,不可能无限制重试,这样就会阻塞后续的业务逻辑,在 RocketMQ 中默认重试 3 次失败后就会返回错误或者直接抛出异常,这时候就需要人工介入了。
因此,在使用 MQ 时,我们需要关注发送结果或者异常情况。
void addOrder(params) { |
这种情况下消息发送异常,在代码中也不能够影响正常的下单流程,因此记录下错误日志,然后把发送积分的消息保存到 DB 中,后续再通过定时任务等方式来做补偿。
原则:不能让非主流程的功能影响主流程的功能。在这里,下单是主流程,加积分是非主流程,
同步发送可以通过 try-catch
来捕获异常,如果是异步发送,需要处理 onException
的逻辑:
// 异步发送消息,发送结果需要通过 callback 通知客户端 |
Broker 存储流程
当 Broker 返回 ACK 给生产者时,生产者就认为消息已经被存储至 Broker 了,后续不会再发送这条消息了。因此 Broker 返回 ACK 给生产者之前需要确保消息真的已经被成功存储了。
RocketMQ 的消息默认是异步刷盘的,也就是说消息先写入 PageCache
中,然后等待 OS
或定时刷盘任务将消息刷盘。也就是说默认情况下,当消息写入到缓存中,Broker
就给生产者返回 ACK 了。如果 Broker
正常运行肯定是没有问题的,但是如果突然断电了,那 PageCache
中没有刷盘的消息就丢失了。因此,如果一定要确保消息不丢失,需要将
Broker 的刷盘配置改成同步刷盘,即
flushDiskType=SYNC_FLUSH
。
这样生产者收到 ACK 的消息都是已经被刷到磁盘上的。
问题:假设磁盘损坏了呢?
默认情况下如果磁盘损坏了,消息确实丢失了。不过现在有很多磁盘阵列可以保证磁盘上内容的可靠性,简单理解就是有备份。
同理,集群架构需要保证同步复制,这样当 slave 被消费时提供的消息才是完整的。
消费者消费流程
消息是存在消费点位提交的,consumer 需要上报给 Broker 已经消费到的位置,这样即使 consumer 重启后,也可以从 Broker 处获取之前的消费位置,然后往后消费消息。因此我们需要保证,上报给 Broker 的消费点位必须是已经被消费过的消息。
【栗子🌰】
消费者直接从 Broker 拉取了 50 条消息,分别是 1~50,当消息到达消费者内存后,消费者直接将消息都提交给线程池进行异步处理,然后直接返回消费成功,即提交消费点位。
consumer.registerMessageListener(new MessageListenerConcurrently() { |
这样实现的话,消费者客户端会认为这些消息已经被消费了,然后给 Broker 提交了点位,已经消费到 50 了,下一次将从 51 开始拉取。
假设此时消费者宕机了,线程池中的任务队列可是内存队列,那么线程池中还未消费的消息就丢失了,重启后也不会拉取前 50 条消息了,而是从 51 开始拉取了,所以之前那些在线程池任务队列排队的还未被消费的消息就丢失了。
因此,我们必须要确保只有在对应消息的业务流程处理完毕后,再给 Broker 返回消息确认,提交点位。
总结
可以看出,如果要保证消息不丢失,没有那么简单,需要多方配合,且需要补偿机制。并且可靠性往往意味着损失性能,因为很多地方需要等待执行成功,无法异步。