消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。

我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢。

即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一。

也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。

一个消息 M 发送到了消息中间件,消息投递到了消费程序 A。A 接受到了消息,然后进行消费。

但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。

然而这种可靠的特性会导致消息可能被多次地投递。

还是刚刚这个例子。

程序 A 接受到这个消息 M 并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以它还会继续投递。

这时候对于应用程序 A 来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

这在 RockectMQ 的场景来看,就是同一个 messageId 的消息重复投递下来了。

基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么 RocketMQ 的文档里强调的,消费逻辑需要自我实现幂等。

背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。

关于 RocketMQ 消息重复的场景,官方文档上给出了这三种情况:

1.发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

2.投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。

3.负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及消费者应用重启)

当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。

112期汇总,建议收藏

那么,有什么解决方案呢?

简单的消息去重解决方案

假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。

insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现消息的幂等,我们可能会采取这样的方案:

select * from t_order where order_no = 'order123'
if(order != null) {
    return ;//消息重复,直接返回
}

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

并发重复消息

假设这个消费的所有代码加起来需要 1 秒,有重复的消息在这 1 秒内(假设 100 毫秒)内到达。

例如生产者快速重发,Broker 重启等。

那么很可能,上面去重代码里面会发现,数据依然是空的,因为上一条消息还没消费完,还没成功更新订单状态。

具体一点就是两个线程在间隔非常短甚至是同时执行这个逻辑:

select * from t_order where order_no = 'order123'

然后发现都没有查到数据,于是走入到这个逻辑中:

if(order != null) {
    return ;//消息重复,直接返回
}

那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题,如主键冲突抛出异常、库存被重复扣减而没释放等。

要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把 select 改成 select for update 语句,把记录进行锁定:

select * from t_order where order_no = 'THIS_ORDER_NO' for update //开启事务
if(order.status != null) {
    return ;//消息重复,直接返回
}

但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费可能变长,并发度下降。

当然还有其他更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。

但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本文讨论的范围。

但无论是select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度。

一个业务系统里面很大部分的请求处理都是依赖 MQ 的,如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。

本文希望探索出一个通用的消息幂等处理的方法,从而抽象出一定的工具类用以适用各个业务场景。

Exactly Once

在消息中间件里,有一个投递语义的概念。

而这个语义里有一个叫 Exactly Once ,即消息肯定会被成功消费,并且只会被消费一次。

以下是官方文档对 Exactly Once 的解释:

Exactly-Once 是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。

在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是 Exactly Once。

但这在分布式的场景下想找一个通用的方案几乎是不可能的。

不过如果是针对基于数据库事务的消费逻辑,实际上是可行的。

另外,关于 Exactly-Once 再补充一些下。

Exactly-Once 语义是消息系统和流式计算系统中消息流转的最理想状态,但是在业界并没有太多理想的实现。

因为真正意义上的 Exactly-Once 依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。

例如,当您的消费端完成一条消息的消费处理后出现异常宕机,而消费端重启后由于消费的位点没有同步到消息系统的服务端,该消息有可能被重复消费。

业界对于 Exactly-Once 投递语义存在很大的争议,很多人会拿出“FLP不可能理论”或者其他一致性定律对此议题进行否定,但事实上,特定场景的Exactly-Once语义实现并不是非常复杂,只是因为通常大家没有精确的描述问题的本质。

如果要实现一条消息的消费结果只能在业务系统中生效一次,需要解决的只是如何保证同一条消息的消费幂等问题。

消息队列 RocketMQ 版的 Exactly-Once 语义就是解决业务中最常见的一条消息的消费结果(消息在消费端计算处理的结果)在数据库系统中有且仅生效一次的问题。

基于关系数据库事务插入消息表

假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态。

update t_order
set status =
'SUCCESS' where order_no=
'order123';

要实现 Exaclty Once 即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做。

在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。

流程看起来像是这样的:

1.开启事务 2.插入消息表(处理好主键冲突的问题) 3.更新订单表(原消费逻辑) 4.提交事务

这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了。

这时候就算 RocketMQ 还没有收到消费位点的更新,从而再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。

这保证我们消费代码只会执行一次。

如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功。

而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。

这保证了消息不丢失。

事实上,阿里云的 RocketMQ 的 EXACTLY-ONCE 语义的实现上,就是类似这个方案基于数据库的事务特性实现的:

https://help.aliyun.com/document_detail/102777.html

基于这种方式,的确这是有能力拓展到不同的应用场景,因为它的实现方案与具体业务本身无关——而是依赖一个消息表。

但是这里有它的局限性:消息的消费逻辑必须是依赖于关系型数据库事务。

如果消费的消费过程中还涉及其他数据的修改,例如 Redis 这种不支持事务特性的数据源,则这些数据是不可回滚的。

还有,数据库的数据必须是在一个库,跨库无法解决。

另外,需要特别注意的是:在业务上,消息表的设计不应该以消息 ID 作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。

更复杂的业务场景

如上所述,这种方式 Exactly Once 语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值。

且由于基于事务,可能导致锁表时间过长等性能问题。

例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步:

  • 检查库存(RPC)
  • 锁库存(RPC)
  • 开启事务,插入订单表(MySQL)
  • 调用某些其他下游服务(RPC)
  • 更新订单状态
  • commit 事务(MySQL)

这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的。

怎么说呢?

就是说有可能第一条消息在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚。

当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持“幂等”。

再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发。

所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加 select for update,或者使用乐观锁。

那我们有没有方法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢?

我们先拆解一下消息执行过程。

其中一个思路是把上面的几步,拆解成几个不同的子消息,例如:

  • 库存系统消费A:检查库存并做锁库存,发送消息B给订单服务
  • 订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费
  • 下游系统消费消息C:处理部分逻辑,发送消息D给订单系统
  • 订单系统消费消息D:更新订单状态

上述步骤需要保证本地事务和消息是一个事务的(至少是最终一致性的),这其中涉及到分布式事务消息相关的话题,不在本文论述。

可以看到这样的处理方法会使得每一步的操作都比较原子,而原子则意味着是小事务,小事务则意味着使用消息表+事务的方案显得可行。

然而,这太复杂了!

这把一个本来连续的代码逻辑割裂成多个系统多次消息交互,那还不如业务代码层面上加锁实现呢。

更通用的解决方案

上面消息表+本地事务的方案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。

如果我们能不依赖事务而实现消息的去重,那么方案就能推广到更复杂的场景例如:RPC、跨库等。

例如,我们依旧使用消息表,但是不依赖事务,而是针对消息表增加消费状态,是否可以解决问题呢?

接下来就要祭出基于消息幂等表的非事务方案了。

以上是去事务化后的消息幂等方案的流程,可以看到,此方案是无事务的。

关键在于数据的状态,消息表本身做了状态的区分:消费中、消费完成。

只有消费完成的消息才会被幂等处理掉。

而对于已有消费中的消息,后面重复的消息会触发延迟消费,比如在 RocketMQ 的场景下就是发送到 RETRY TOPIC。

之所以触发延迟消费,是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去延迟消费,而不是去直接幂等,从而去控制消息不丢。

如果直接幂等了,那么同一个消息 id 或者业务唯一标识,会丢失消息,因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉 broker 成功了,那么第一条消息这时候失败 broker 也不会重新投递了。

这里我们回头看看我们一开始想解决的问题是否解决了:

  • 问题一:消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
  • 问题二:并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
  • 问题三:支持上游业务生产者重发的业务重复的消息幂等问题。

关于第一个问题已经很明显已经解决了,在此就不讨论了。

关于第二个问题是如何解决的?

主要是依靠插入消息表的这个动作做控制的,假设我们用 MySQL 作为消息表的存储媒介,设置消息的唯一 ID 为主键,那么插入的动作只有一条消息会成功。

后面的消息插入会由于主键冲突而失败,走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。

关于第三个问题,只要我们设计去重的消息键让其支持业务的主键(例如订单号、请求流水号等),而不仅仅是 messageId 即可。所以也不是问题。

那么,此方案是否有消息丢失的风险?

如果细心的读者可能会发现这里实际上是有逻辑漏洞的。

问题出在上面聊到的个问题二,就是并发场景的那个。

在并发场景下我们依赖于消息状态是做并发控制使得第 2 条消息重复的消息会不断延迟消费,即重试。

但如果这时候第 1 条消息也由于一些异常原因,例如机器重启了、外部异常导致消费失败,没有消费成功呢?

也就是说这时候延迟消费实际上每次过来看到的都是消费中的状态,最后消费就会被视为消费失败而被投递到死信 Topic 中,比如 RocketMQ 默认可以重复消费 16 次。

对于此,我们解决的方法是,插入的消息表必须要带一个最长消费过期时间,例如 10 分钟。

意思是如果一个消息处于消费中超过 10 分钟,就需要从消息表中删除,这一点需要程序自行实现。

所以最后这个消息的流程会是这样的:

我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。

使用Redis有两个好处:

  • 1.性能上损耗更低
  • 2.上面我们讲到的超时时间可以直接利用Redis本身的ttl实现

当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。

show me code

以上方案针对 RocketMQ 的 Java 实现已经开源放到 Github 中,具体的使用文档可以参考

https://github.com/Jaskey/RocketMQDedupListener

以下仅贴一个 Readme 中利用 Redis 去重的使用样例,用以示意业务中如果使用此工具加入消息去重幂等的是多么简单:

//利用Redis做幂等表
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
consumer.subscribe("TEST-TOPIC", "*");

String appName = consumer.getConsumerGroup();// 大部分情况下可直接使用consumer group名
StringRedisTemplate stringRedisTemplate = null;// 这里省略获取StringRedisTemplate的过程
DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
DedupConcurrentListener messageListener = new SampleListener(dedupConfig);

consumer.registerMessageListener(messageListener);
consumer.start();

以上代码大部分是原始 RocketMQ 的必须代码,唯一需要修改的仅仅是创建一个 DedupConcurrentListener 示例,在这个示例中指明你的消费逻辑和去重的业务键,该值默认是messageId。

这种实现是否一劳永逸?

实现到这里,似乎方案挺完美的,所有的消息都能快速的接入去重,且与具体业务实现也完全解耦。

那么这样是否就完美的完成去重的所有任务呢?

很可惜,其实不是的。原因很简单:

因为要保证消息至少被成功消费一遍,那么消息就有机会消费到一半的时候失败触发消息重试的可能。还是以上面的订单流程为例:

  • 步骤1:检查库存(RPC)
  • 步骤2:锁库存(RPC)
  • 步骤3:开启事务,插入订单表(MySQL)
  • 步骤4:调用某些其他下游服务(RPC)
  • 步骤5:更新订单状态
  • 步骤6:commit 事务(MySQL)

当消息消费到步骤 3 的时候,我们假设 MySQL 异常导致失败了,触发消息重试。

因为在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤 1 和步骤 2 就会重新再执行一遍。

如果步骤2本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。

本实现方式的价值?

那么既然这个并不能完整的完成消息幂等,还有什么价值呢?

价值可就大了!

虽然这不是解决消息幂等的银弹(事实上,软件工程领域里基本没有银弹),但是他能以便捷的手段解决:

  • 1.各种由于Broker、负载均衡等原因导致的消息重投递的重复问题
  • 2.各种上游生产者导致的业务级别消息重复问题
  • 3.重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑

一些其他的消息去重的建议

也就是说,使用这个方法能保证正常的消费逻辑场景下(无异常,无异常退出),消息的幂等工作全部都能解决,无论是业务重复,还是 RocketMQ 特性带来的重复。

事实上,这已经能解决 99% 的消息重复问题了,毕竟异常的场景肯定是少数的。

那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率:

  • #1.消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。
  • #2.消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。
  • #3.一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)

在 #3 做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好 #1 的回滚,使得下次重试消费成功。

作者:jaskey

来源:jaskey.github.io/