youyichannel

志于道,据于德,依于仁,游于艺!

0%

啥?RocketMQ事务消息咋实现的?

这篇文章来介绍下 RocketMQ 事务消息的作用以及他的实现原理。

事务消息也是分布式事务的一种解决方案,在生产上也很常用。

事务是什么?

简单来说,事务就是指一系列操作要么全部执行成功,要么全部执行失败,不会出现一些成功一些失败的情形。

【栗子🌰】转账:A 给 B 转账 1000 元,那么 A 账户扣款 1000 和 B 账户进账 1000 这两个操作必须全部成功,或者全部失败,否则就破坏了数据的一致性。

严格来说,事务必须满足 ACID 特性,这里不再赘述。

分布式事务又是什么?

在微服务架构中,因为扩展性和性能原因,很多情况下多个操作涉及的数据都存放在不同的数据库中,这个时候就无法使用数据库自身的事务来保证多个操作数据的一致性了,只能够上分布式事务了。

【栗子🌰】订单服务和商品服务,它们都有各自的数据库。

如果一个用户下单成功,那么就需要同时扣减对应的商品库存,这两个操作需要保证一致性。但是因为订单和商品隶属于不同的数据库,这个时候就能够使用分布式事务来保证数据的一致性了。

事务消息

实现分布式事务的方案有很多,2PC、3PC、TCC、本地消息、事务消息等等。

事务消息的适用场景:异步更新,保证最终一致性。也就是说事务消息的实时性要求不高。

【栗子🌰】点外卖的场景。

每次点外卖需要先选择食物加入到购物车,然后下单、付款。下单之后,之前加入到购物车的数据,是需要被清除掉的。

这个需求最简单的方式就是在下单的时候同步调用购物车清除接口来清除数据,但是这会增加下单操作的RT,而且购物车所属接口可能是另一个服务,这就可能因为网络原因导致购物车清除成功了,但是返回请求超时了,这个时候同步的下单流程就报错了,下单失败,这个时候给用户的体验就是下单失败,购物车里面的东西还被清除了,体验极差。

那么这个场景就很适合异步的事务消息,保证下单成功之后,购物车的数据会被清理并且不增加下单接口的RT。

为什么一定要使用事务消息,使用普通消息不可以吗?

数据库插入订单数据后,发送一条消息让购物车清除数据,从流程上来来看貌似没有问题,但是这里仅仅考虑到正常流程,一旦出现异常的情况,数据就有可能出现不一致。

saveXxx();
saveOrder();
sendMsg2ClearCart();

这是因为数据库的相关操作可以利用数据库自身的实现保证数据库层面的数据一致性。sendXxx()sendOrder()操作的表如果在一个库中,那么可以保证数据库层面的数据一致性。但是发送消息完全不属于数据库的范畴,这时候就无法保证了。

存在一种情况,三个步骤都执行成功了,但是最后提交事务的时候报错了,这个时候数据库的操作回滚了,但是发出去的消息无法回滚,也就是说购物车已经被清除了。

因此普通消息无法满足这个需求,这个时候就需要使用事务消息,来保证下单和发送清除购物车数据的消息要么都成功,要么都失败。

如何实现事务消息

根据前面的描述,主要就是异常的场景才容易导致事务的不一致,因此主要解决的矛盾就是异常的情况。

RocketMQ 事务消息的实现原理:

1)在事务开始时,发送一条半消息 (half message)给 Broker,半消息字面理解就是不完整的消息,这类消息不会被消费者消费到;

2)然后执行本地事务,在此处的场景中就是下单的一系列操作;

3)最后根据本地事务的执行结果来决定是向 Broker 发送提交消息还是发送回滚消息。

  • 发送提交信息,之前发送的半消息就会变得完整,即可以被消费者消费,在这条消息被消费者消费后,整个分布式事务就完整了,保证了最终一致性;
  • 发送回滚消息,那么这条半消息就作废了,不会被消费者消费到,这就跟本地事务结果保持一致了。

如果出现什么意外,导致执行本地事务后,没有进一步发送提交或者回滚消息该如何处理,如果本地事务执行失败了还不会有什么影响,但是如果执行成功了,那么这条半消息不就一直不会被消费者消费了?

针对于这种情况,RocketMQ 设计了一个反查机制,也就是 Broker 会向发送的生产者来反查这个事务是否成功。

具体来说,就是生产者暴露一个接口,通过这个接口 Broker 可以得知事务是否执行成功,没有成功就回滚。当然也就可能事务还在执行中,这个时候可以返回 UNKNOWN,这样一来,Broker 后续会继续查询,可以在接口逻辑上实现多次查询还没有结果再返回回滚。

小结:生产者只需要暴露一个查询订单是否存在的接口,如果存在,说明下单成功,那么就提交事务,如果订单不存在那么可能是还未生成或者生成失败,查询多次无果后回滚即可。

引申下生产者组的概念。

Broker 会反查生产者提供的接口,如果发送的生产者挂了,还有同一个生产者组的其他生产者可以供 Broker 反查。

使用 RocketMQ 事务消息

首先需要实现 RocketMQ 提供的 TransactionListener 接口

public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

源码中提供了示例,使用随机数生成状态模拟事务的成功和失败,然后将结果存储在本地,供反查的时候使用:

public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);

private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

RocketMQ 实现原理

偷梁换柱

RocketMQ 在发送半消息的时候,发往的不是原先的 Topic,而是发往特定的 Topic: RMQ_SYS_TRANS_HALF_TOPIC

将原先的 Topic 和队列存储在属性中,替换 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC,队列默认是 0。这样一来消息被存储后也不会被消费者消费。

然后等待生产者的提交或者回滚事务的请求:

  • 如果收到提交请求,那么从属性中获取消息原先的 Topic 和队列,将消息发往原 Topic,即往 commitlog 里面存储这条消息,这样消费者就能够消费到了。
  • 如果收到回滚请求,那么就不往 commitlog 里面存储,这样依赖消费者不会消费到,等同于事务回滚。

同时 Broker 起一个定时线程 TransactionalMessageCheckService 服务,它会定时扫描 RMQ_SYS_TRANS_HALF_TOPIC 这个 Topic 下的消息,区请求生产者提供的反查接口查看事务是否成功,如果成功就恢复原先的 Topic 供消费者消费,失败的话就不重新投递。