四种基于MQ的分布式事务解决方案

在微服务的时代,分布式事务是绕不开的话题,尽管在大多数场景下,我们并不需要使用分布式事务,但是 不需要使用 不代表 可以不会使用,万一哪天真需要用到了呢?分布式事务是一个比较大的话题,今天我们来看看基于MQ的分布式事务解决方案。

在实际开发中,为了简化分布式事务,我们和其他服务交互,经常会采用MQ的方式,我们先来看下如果采用MQ的方式和其他服务进行交互,应该怎么做。

采用MQ的方式和其他服务进行交互

基于RocketMQ 事务消息+最大努力通知

RocketMQ提供了事务的消息的功能,我们来看下事务消息的原理:



Producer发送一个半消息到Broker;

Broker收到半消息后,响应Producer,Broker会将半消息存储到一个特殊的Topic,此时Consumer是不能消费此消息的;

Producer执行本地事务;

Producer根据本地事务的执行情况,告知Broker Commit或者Rollback,如果Commit,Broker会将消息投递到正常的Topic,此时Consumer可以正常消费此消息。

如果因为某种原因,Broker未收到Producer的Commit或者Rollback,Broker会发起回查,Producer收到回查请求后,根据本地事务状态,重新响应Broker Commit或者Rollback。

对于Producer而言,本地事务和发送消息是一致的,要么都成功,要么都失败,但是无法保证Consumer的一致性,有不少人称之为“单端事务”,但是RocketMQ还为Consumer提供了重试的功能,只要Consumer不返回消息消费成功,Consumer还有16次机会可以重新消费此消息,执行自身的业务操作,相当于最大努力通知。

这个方案可以说是最常用的分布式事务解决方案了,不管是实现,还是原理都比较简单,但是仔细想想此方案还是有缺点的:

和RocketMQ强绑定,因为只有RocketMQ才提供了完善的事务消息的功能;

降低了可用性,如果Producer发送半消息失败,流程就终止了;

代码侵入性强,Producer需要提供执行本地事务、回调两个方法。

基于本地消息表

此方案在RocketMQ事务消息推出之前,是采用较多的一个分布式事务解决方案,需要在库中新建一张本地消息表,此表有如下核心字段:

topic:消息需要发送到哪个topic;

state:消息状态,有三种状态:1.未发送 2.发送失败 3.发送成功;

retry_num:重试次数;

time:消息产生的时间;

last_retry_time:最近重试时间;

message:消息内容。

作为Producer来说,原本是执行完本地业务后,直接将消息投递到MQ,现在需要将消息保存至本地消息表,然后由定时任务读取本地消息表,将需要推送的消息投递到MQ,具体做法如下:

本地业务+插入本地消息表 组成一个大的本地事务,以此保证两者的原子性、一致性,本地业务+插入本地消息表两个操作要么同时成功,要么同时失败;

新增定时任务,不断的扫描本地消息表,将未成功投递的消息进行投递,投递成功,修改本地消息表的状态字段,投递失败,修改本地消息表中的状态,并且重试次数+1,等待下一次重新投递。

这个方案实现也非常简单,缺点也显而易见:

严重依赖Job;

及时性比较差,如果Job每10分钟运行一次,那可能就有10分钟的延迟。如果Job每5分钟运行一次,那可能就有5分钟的延迟。

不断的扫描本地消息表,对数据库也是一种压力;

需要定期清理本地消息表。

基于内存队列+本地消息表

本地消息表这个方案还是不错的,有没有办法改善它的缺点呢?当然有,这个方案就是对传统的本地消息表方案进行了改造,据说部分二三线互联网公司就是采用的此种方案,具体看图(如果图片看不清,可以将图片下载到本地 或者 在新标签页中查看图片):



 虽然此方案需要自己编码实现,但是整体来说,编码难度不大。

不管是基于RocketMQ事务消息的分布式事务解决方案,还是基于本地消息表的分布式事务解决方案,还是基于内存队列+本地消息表的分布式事务解决方案,都有消息表的概念,只是消息表的具体存在形式不同,一个是以数据表的形式存在(存在了数据库),一个是以Topic的形式存在(存在了Broker)。

以上三种方案都有一个局限性:和其他服务进行交互,必须比较采用MQ的方式。

不采用MQ的方式和其他服务进行交互

基于RocketMQ的延迟消息检查方案

上述三个方案,都有一个局限性:和其他服务进行交互,必须采用MQ的方式,如果不满足这个条件,如何采用MQ的方式来实现分布式事务呢:采用基于RocketMQ的延迟消息检查方案。

Producer发送延迟消息;

其他服务需要提供检查接口,重试或者回滚接口;

Producer收到延迟消息后,检查自身的业务操作是否执行成功,根据具体情况,判断是否需要重试或回滚,然后调用其他服务提供的检查接口,检查其他服务的业务操作是否执行成功,再根据具体的情况判断是否调用其他服务提供的重试、回滚接口。

此方案实现也比较简单,且容易理解,但是也有缺点:

和RocketMQ强绑定,因为只有RocketMQ才提供了完善的延迟消息的功能;

降低了可用性,如果Producer发送延迟消息失败,流程就终止了。

作者:码出宇宙


欢迎关注微信公众号 :码出宇宙