youyichannel

志于道,据于德,依于仁,游于艺!

0%

MQ 如何处理消息堆积问题?

MQ 问题三剑客:消息不丢失、不重复、消息堆积问题!这篇文章来分析下最后一个问题。

消息堆积的原因和解决方案

消息堆积的本质是:消息生产的速度大于消息消费的速度

常见的消息堆积的原因:

  1. 瞬时流量
  2. 消费者性能问题
  3. 机器不够
  4. BUG导致
  5. 其他功能影响

瞬时流量

这个点非常好理解,比如一个运营活动上线,瞬时大量的用户涌进系统参与活动,请求数的激增使得对应的业务消息也会激增。持续的大流量使得消息量突增,消费者的消费能力没有加强,那么消息自然就堆积了。

但是,这时候的堆积不一定会产生问题,如果流量的突增只是瞬时性的,比如持续几分钟就结束了,那么此时消息存储在 Broker 中,由 Broker 持久化保存这些消息,消费者根据自己的消费速度按部就班的从 Broker 拉取消费消息,这就是 MQ 想要达到的目的:消峰填谷

但是如果持续的时间比较长,且消息的量级太大,那么堆积产生的影响就不一样了,会产生什么样的问题了?

比如阿里云的 RocketMQ 默认消息存储时间为 3 天,也就是说消息存储 3 天后还没有被消费那么这个消息就会被删除了,这样做的目的是为了控制存储成本。所以那些堆积时间过长被删除的消息就丢失了,需要生产者再发送一遍,同时还需要做历史数据的清洗,这就很麻烦了。

消息队列长时间的堆积影响正常业务消息的消费。对于消费端来说,后续来的正常业务数据需要等待前面的历史数据被清理完成之后才能够被消费,这就会影响到线上功能了。

解决方案:降低消息生产的频率,即不持续性地发送消息,而是发一批等待一会儿,从而使得消息发送的速率略低于消费速率,给正常的业务消息预留空间,也不会造成消息的堆积导致存储时间过长,降低消息丢失的风险。

消费者性能问题

上述的第一个问题是类似于大促活动导致的瞬时大流量的场景,这种情况不会经常发生,而「消费者性能问题」指的是平日里正常的流量也导致消息堆积的问题。

如果说是正常的业务处理也会出现消息堆积的情况,这种情况下就需要认真审视下消费逻辑了,优化消费逻辑,比如单次改批量,同步改异步,又或者是一些数据的获取可以通过缓存的手段来提高获取速度等等。

机器不够

如果说消费逻辑优化好了还是出现消息堆积,这个时候就需要加强消费者端的能力了,也就是往消费者组中加消费者,加机器,做水平扩容

注意,在 RocketMQ 场景,如果需要增加消费者,一定需要考虑队列数是否够用,在 Kafka 场景,需要检查 Partition 数量是否够用等等。

正常情况下,如果队列数大于消费者数,此时增加消费者是没有问题的,MQ 内部会进行 Rebalance 机制,给新加入的消费者分配队列数,此时消费的压力就会被分担出去。但是如果队列数已经和消费者数持平了,此时再往消费者组内增加消费者是没有任何意义的,那么这个时候就需要增加 Topic 下的队列数,再进行水平扩容

BUG 导致

现在假设消费者端消费逻辑有 BUG ,导致每条消息都消费失败,此时也会导致消息的堆积。因为消息消费失败需要重试,RocketMQ 默认重试 16 次才会进入死信队列,这等于流量会被放大 16 倍,也就是说此时 MQ 的资源都被拿来重试了,重试还是失败的。如果是顺序消息,前面的消息消费失败了,后续的消息根本无法继续进行,此时就会堆积消息。

解决方案:做好监控,及时修复 BUG

其他功能影响

还有就是需要留意消费逻辑是否会跟别的业务功能争夺共享资源。比如消息消费和正常业务都需要扣减数据,如果只用数据库来实现库存扣减的话,那么对于同一个商品的处理,它们之间就会争抢同一个数据库的行锁,自然就会相互影响消费速率;再比如其他业务执行长事务导致消费逻辑的等待等等。

总结

消息堆积的场景在业务上还是比较常见的,根本原因是消费速率跟不上生产速率,跟不上的原因有很多,比如生产速率激增、消费速率受到影响等等。

能做的一些工作:

1)在上线前尽量考虑发生堆积的处理手段,比如冗余一些队列数方便水平扩容、提前评估消息的量级、生产速率以及消费速率预先做好处理;

2)需要有批处理的思想,条件允许的话,生产的消息可以批量发送,同理消费的时候就能够批量消费,如果业务上不支持批量发送,那么在消费的时候能批量就批量处理;

3)系统内部能用上缓存的地方就用好缓存,但是也要考虑缓存的不可靠性;

4)对于突发情况可以提前预设好 planB,比如瞬时的大流量涌进,我们无法控制发送速率,且如果业务上不允许消息长时间大量堆积,那么就可以在代码中设置一个开关,将这些消息先落库存储,不处理任何业务逻辑,直接返回消息消费完成,然后后台起定时任务慢慢处理这些消息。

原则:实际生产上遇到问题的时候,不要先想着分析问题,而是及时解决问题,处理堆积最快速的方式就是水平扩容(消息失败 BUG 除外),等消息堆积解决了,后续再来分析原因,优化代码。