youyichannel

📌志于道,据于德,依于仁,游于艺!

0%

MQ 如何保证消息不丢失?

在我们日常业务使用 MQ 时,如何保证消息不丢失失一个特别需要关注的点。在某些业务场景下消息的丢失造成的影响可能不大,比如购物车,下单后删除购物车对应商品的消息丢失了还可以容忍,但是在金融场景下,消息丢失了往往代表着资产的损失,这是无法容忍的。

当然,即使在丢失影响不大的场景下,我们也要尽可能的保证消息不丢失,不然业务流程就会不完整。

综述,用到了 MQ,保证消息不丢失就很关键。那么如何保证呢?我们需要分析下 MQ 的整个链路。

消息从生产者发送至 Broker,Broker 存储消息,消费者从 Broker 拉取消息消费。

因此,如果需要保证消息不丢失,从整个链路来看需要保证三大流程中消息都不丢失,即:

  1. 发送:生产者需要保证消息被完整的发送并存储至 Broker 中;
  2. 存储:Broker 需要保证已经存储的消息不丢失,即需要持久化;
  3. 消费:消费者需要保证拉取的消息一定被消费。

生产者发送消息

消息在生产者侧产生,而且消息的生成往往伴随着某个业务,比如【下单后给用户加积分】场景,在代码中,我们会先保存订单,然后发送消息让积分系统给对应的用户加积分,伪代码:

void addOrder(params) {
// do something...
saveOrder(); // 保存订单
sendMsg(); // 发送加积分消息
}

我们需要确保订单保存成功后,积分消息一定要发送成功。此处就涉及到请求确认机制,即 ACK。简单来说,就是生产者与 Broker 交互通过 ACK 来确认消息的成功接收。

当生产者发送消息给 Broker 之后,如果 Broker 接收到这条消息,那么就返回 ACK 给生产者,一旦生产者接收到了 ACK,就表明消息已经被 Broker 成功接收了,因此保证发送阶段消息不回丢失。

如果 Broker 没有返回 ACK,该如何处理?

不管是因为网络还是其他原因,只要此时生产者超时等待没有接收到 ACK,那么就需要重试。重试也是需要一定限制的,不可能无限制重试,这样就会阻塞后续的业务逻辑,在 RocketMQ 中默认重试 3 次失败后就会返回错误或者直接抛出异常,这时候就需要人工介入了。

因此,在使用 MQ 时,我们需要关注发送结果或者异常情况。

void addOrder(params) {
// do something...
saveOrder(); // 保存订单
try {
SendResult sendRes = sendMsg(); // 发送加积分消息
if(sendRes ...) {
recordSend(); // 记录失败消息
}
} catch(Exception e) {
log.error("send msg error, ", e);
recordSend(); // 记录失败消息
}
}

这种情况下消息发送异常,在代码中也不能够影响正常的下单流程,因此记录下错误日志,然后把发送积分的消息保存到 DB 中,后续再通过定时任务等方式来做补偿

原则:不能让非主流程的功能影响主流程的功能。在这里,下单是主流程,加积分是非主流程,

同步发送可以通过 try-catch 来捕获异常,如果是异步发送,需要处理 onException 的逻辑:

// 异步发送消息,发送结果需要通过 callback 通知客户端
producer.send(msg, new SendCallBack() {
@Override
public void onSuccess(SendResult sendReult) {
// do something...
}

@Override
public void onException(Throwable e) {
// 处理发送失败的消息
log.error("send msg error, ", e);
recordMsg();
}
});

Broker 存储流程

当 Broker 返回 ACK 给生产者时,生产者就认为消息已经被存储至 Broker 了,后续不会再发送这条消息了。因此 Broker 返回 ACK 给生产者之前需要确保消息真的已经被成功存储了

RocketMQ 的消息默认是异步刷盘的,也就是说消息先写入 PageCache 中,然后等待 OS 或定时刷盘任务将消息刷盘。也就是说默认情况下,当消息写入到缓存中,Broker 就给生产者返回 ACK 了。如果 Broker 正常运行肯定是没有问题的,但是如果突然断电了,那 PageCache 中没有刷盘的消息就丢失了。因此,如果一定要确保消息不丢失,需要将 Broker 的刷盘配置改成同步刷盘,即 flushDiskType=SYNC_FLUSH

这样生产者收到 ACK 的消息都是已经被刷到磁盘上的。

问题:假设磁盘损坏了呢?

默认情况下如果磁盘损坏了,消息确实丢失了。不过现在有很多磁盘阵列可以保证磁盘上内容的可靠性,简单理解就是有备份。

同理,集群架构需要保证同步复制,这样当 slave 被消费时提供的消息才是完整的。

消费者消费流程

消息是存在消费点位提交的,consumer 需要上报给 Broker 已经消费到的位置,这样即使 consumer 重启后,也可以从 Broker 处获取之前的消费位置,然后往后消费消息。因此我们需要保证,上报给 Broker 的消费点位必须是已经被消费过的消息

【栗子🌰】

消费者直接从 Broker 拉取了 50 条消息,分别是 1~50,当消息到达消费者内存后,消费者直接将消息都提交给线程池进行异步处理,然后直接返回消费成功,即提交消费点位。

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context) {
// 将消息提交到线程池
executorPool.execute(xxxx);
// 返回消息消费状态,消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
})

这样实现的话,消费者客户端会认为这些消息已经被消费了,然后给 Broker 提交了点位,已经消费到 50 了,下一次将从 51 开始拉取。

假设此时消费者宕机了,线程池中的任务队列可是内存队列,那么线程池中还未消费的消息就丢失了,重启后也不会拉取前 50 条消息了,而是从 51 开始拉取了,所以之前那些在线程池任务队列排队的还未被消费的消息就丢失了。

因此,我们必须要确保只有在对应消息的业务流程处理完毕后,再给 Broker 返回消息确认,提交点位

总结

可以看出,如果要保证消息不丢失,没有那么简单,需要多方配合,且需要补偿机制。并且可靠性往往意味着损失性能,因为很多地方需要等待执行成功,无法异步。