10分钟带你了解开发说的Mq到底是什么?

以下文章来源于懂Java的测试 ,作者懂Java的测试

前言

日常测试工作中,开发提到的Mq到底什么意思?本篇文章就分享一下所谓的Mq究竟是什么技术、有什么好处,以及常见的Mq组件有那些。

Mq的优点有哪些

Mq(Message Queue)是消息队列,主要有三大用途,我们拿一个电商系统的下单举例:
解耦:引入消息队列之前,下单完成之后,需要订单服务去调用库存服务减库存,调用营销服务加营销数据……引入消息队列之后,可以把订单完成的消息丢进队列里,下游服务自己去调用就行了,这样就完成了订单服务和其它服务的解耦合。



异步:订单支付之后,我们要扣减库存、增加积分、发送消息等等,这样一来这个链路就长了,链路一长,响应时间就变长了。引入消息队列,除了更新订单状态,其它的都可以异步去做,这样一来就来,就能降低响应时间。



削峰:消息队列合一用来削峰,例如秒杀系统,平时流量很低,但是要做秒杀活动,秒杀的时候流量疯狂怼进来,我们的服务器,Redis,MySQL各自的承受能力都不一样,直接全部流量照单全收肯定有问题啊,严重点可能直接打挂了。
我们可以把请求扔到队列里面,只放出我们服务能处理的流量,这样就能抗住短时间的大流量了。



解耦、异步、削峰,是消息队列最主要的三大作用。

Mq的缺点有哪些
互联网很多技术都是双刃剑,有利益有弊,同样Mq也是,它有以下几个缺点:

系统可用性降低
本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。
比如说一个核心链路里面:系统A -> 系统B -> 系统C,然后系统C是通过MQ异步调用系统D的。



看起来很好,你用这个MQ异步化的手段解决了一个核心链路执行性能过差的问题。
但是你有没有考虑另外一个问题,就是万一你依赖的那个MQ中间件突然挂掉了怎么办?这个还真的不是异想天开,MQ、Redis、MySQL这些组件都有可能会挂掉。
一旦你的MQ挂了,就导致你的系统的核心业务流程中断了。本来你要是不引入MQ中间件,那其实就是一些系统之间的调用,但是现在你引入了MQ,就导致你多了一个依赖。一旦多了一个依赖,就会导致你的可用性降低。
因此,一旦引入了MQ中间件,你就必须去考虑这个MQ是如何部署的,如何保证高可用性。
甚至在复杂的高可用的场景下,你还要考虑如果MQ一旦挂了以后,你的系统有没有备用兜底的技术方案,可以保证系统继续运行下去
系统复杂度提高
还是上面那张图,大家再来看一下:



不知道大家有没有发现一个问题,这个链路除了MQ中间件挂掉这个可能存在的隐患之外,可能还有一些其他的技术问题。
比如说,莫名其妙的,系统C发了一个消息到MQ,结果那个消息因为网络故障等问题,就丢失了。这就导致系统D没有收到那条消息。
这可就惨了,这样会导致系统D没完成自己该做的任务,此时可能整个系统会出现业务错乱,数据丢失,严重的bug,用户体验很差等各种问题。
这还只是其中之一,万一说系统C给MQ发送消息,不小心一抽风重复发了一条一模一样的,导致消息重复了,这个时候该怎么办?





可能会导致系统D一下子把一条数据插入了两次,导致数据错误,脏数据的产生,最后一样会导致各种问题。
或者说如果系统D突然宕机了几个小时,导致无法消费消息,结果大量的消息在MQ中间件里积压了很久,这个时候怎么办?
即使系统D恢复了,也需要慢慢的消费数据来进行处理。
所以这就是引入MQ中间件的第二个大问题,系统稳定性可能会下降,故障会增多,各种各样乱七八糟的问题都可能产生。而且一旦产生了一个问题,就会导致系统整体出问题。就需要为了解决各种MQ引发的技术问题,采取很多的技术方案,系统的复杂性就会提升好几个层级。
一致性问题
A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了
所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。
你们公司项目用的是什么消息中间件?
Mq的组件有哪些?

Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?



Mq的组件有哪些常见的问题?

消息的顺序问题
消息有序指的是可以按照消息的发送顺序来消费。
假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?



解决方案:
保证生产者 - MQServer - 消费者是一对一对一的关系



消息的重复问题
造成消息重复的根本原因是:网络不可达。
所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。
具体可参考之前的文章测试工程师都能看懂的幂等性问题
消息的丢失问题

每个MQ组件都有自己的方案,由于篇幅有限,我就简单聊聊RocketMq(互联网项目常用的Mq技术,经历过阿里双十一高并发的考验)防丢失方案吧。

一般RocketMq会有以下几种场景丢失消息:



1、生产者将消息发送给Rocket MQ的时候,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失
2、消息需要持久化到磁盘中,这时会有两种情况导致消息丢失
①RocketMQ为了减少磁盘的IO,会先将消息写入到os cache中,而不是直接写入到磁盘中,消费者从os cache中获取消息类似于直接从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失





②如果消息已经被刷入了磁盘中,但是数据没有做任何备份,一旦磁盘损坏,那么消息也会丢失
3、消费者成功从RocketMQ中获取到了消息,还没有将消息完全消费完的时候,就通知RocketMQ我已经将消息消费了,然后消费者宕机,但是RocketMQ认为消费者已经成功消费了数据,所以数据依旧丢失了
那么如何保证消息的零丢失呢?



1、生产者保证消息不丢失的方案是使用RocketMQ自带的事务机制来发送消息,大致流程为
①首先生产者发送half消息到RocketMQ中,此时消费者是无法消费half消息的,若half消息就发送失败了,则执行相应的回滚逻辑
②half消息发送成功之后,且RocketMQ返回成功响应,则执行生产者的核心链路
③如果生产者自己的核心链路执行失败,则回滚,并通知RocketMQ删除half消息
④如果生产者的核心链路执行成功,则通知RocketMQ commit half消息,让消费者可以消费这条数据
其中还有一些RocketMQ长时间没有收到生产者是要commit/rollback操作的响应,回调生产者接口的细节,感兴趣的可以参考我的这篇博文 RocketMQ分布式事务原理
在使用了RocketMQ事务将生产者的消息成功发送给RocketMQ,就可以保证在这个阶段消息不会丢失
消息中间件要保证消息不丢失,首先需要将os cache的异步刷盘策略改为同步刷盘,这一步需要修改Broker的配置文件,将flushDiskType改为SYNC_FLUSH同步刷盘策略,默认的是ASYNC_FLUSH异步刷盘。一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了;为了保证磁盘损坏不会丢失数据,我们需要对RocketMQ采用主从机构,集群部署,Leader中的数据在多个Follower中都存有备份,防止单点故障。
消费者如何保证不丢失消息呢?消息到达了消费者,RocketMQ在代码中就能保证消息不会丢失。
//注册消息监听器处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){                                          
//对消息进行处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
上面这段代码中,RocketMQ在消费者中注册了一个监听器,当消费者获取到了消息,就会去回调这个监听器函数,去处理里面的消息
当你的消息处理完毕之后,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
只有返回了CONSUME_SUCCESS,消费者才会告诉RocketMQ我已经消费完了,此时如果消费者宕机,消息已经处理完了,也就不会丢失消息了
如果消费者还没有返回CONSUME_SUCCESS时就宕机了,那么RocketMQ就会认为你这个消费者节点挂掉了,会自动故障转移,将消息交给消费者组的其他消费者去消费这个消息,保证消息不会丢失。
使用上面一整套的方案就可以在使用RocketMQ时保证消息零丢失,但是性能和吞吐量也将大幅下降
使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能
同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级,主从架构的话,需要Leader将数据同步给Follower,消费时无法异步消费,只能等待消费完成再通知RocketMQ消费完成。
消息零丢失是一把双刃剑,要想用好,还是要视具体的业务场景而定,选择合适的方案才是最好的

怎么测试Mq的组件?
最后一点特别关键,也是很多面试官爱问的面试题,你怎么测试mq组件的?主要分为以下几个方面:
Mq异常测试

1、消息重复发送
消息重复发送,幂等性测试,可参考文章测试工程师都能看懂的幂等性问题
2、消息到达顺序不一致
消息到达顺序不一致,导致业务异常。
比如:订单下单后再取消,如果先收到取消的消息,再收到下单消息,就会有问题。
3、消费失败生产者、消费者重试机制

4、Mq性能测试

线上即将投入使用的Mq集群,通常都会做个性能摸底,在Mq的TPS和机器的资源使用率之间取得一个平衡。

举个例子,mq在资源利用率极高的情况下可以到10万TPS,但是机器的内存、cpu、io负载特别高,濒临宕机,这个是极其不安全的,可是当TPS在8万的时候,机器的内存、cpu、io负载较高,但是还在可接受范围内,这批机器就比较安全,不至于宕机。

所以我们的目标就是综合TPS和机器负载,尽量找到一个最好的Tps,并且机器各项负载都在可接受范围内,这才是我们的测试目的。为此要性能压测要进行以下几个步骤。

1、代码中创建几个生产者,投递消息

我们可以在代码里让两个生产者,不停的往集群中发送消息,每个生成者启动多个80个线程(具体参考服务器配置),相当于每台机器有80个线程并发往Mq集群发送消息。每条消息的大小固定。

2、实时查看集群中服务器的cpu、内存负载情况、磁盘io、JVM GC频率、网卡流量等。

可以使用命令,也可以使用监控软件prometheus 等监控服务器设备的负载。

3、当设备负载任何一个指标已经超过安全阈值以后,立即查看Mq管理页面的tps峰值,譬如机器网卡是128M,网卡实际流量有100M,接近千兆网流量,我们mq的Tps峰值是7万左右,我们就认为7万tps是新集群最佳Tps.

其他

Producer(生产者)
所谓的生产者,就是产生消息的应用方,在进行生产者端的测试时,需要注意如何接入对应的MQ,需要哪些信息,可以提前确认(包括但不仅限入接入账号、接入的主题、消息格式等)。
测试注意点有:
1、数据是否真正推送到队列中
2、数据是否推送到正确的topic下
3、如果一次推送的数据过多,前面推送的数据如何处理(超过队列)
4、同时需要注意每个topic下的queue如何分布数据

Consumer(消费者) 在MQ的世界里,消费者(从MQ队列里获取数据的应用方)主要有两种,PUSH和PULL,简单来说,就是主动拉取消息和被动接收消息(还有一种消费方式是广播消息,应用场景较少)。不管哪种获取消息方式,首先都要订阅消息,即先指定需要消费哪个topic下的消息。
测试注意点:
1、确认应用的消费是哪种
2、测试消费者的消费信息源是否正确(能否从正确的topic中拿到正确的消息)
3、测试Topic的消费队列策略是什么
4、数据被消费者使用后,有没有及时的被清除
5、当消息队列过长(消费速度过慢)时,MQ会溢出的数据如何处理
6、是否会越权消费别的 topic中的信息
7、如果是PULL类型的消费者,需要测试拉取的时间间隔,如果是push的类型,需要测试当有生产者生成消息时,消费者是否能及时得到信息并消费
总结

Mq在互联网项目中使用特别广泛,如果想在面试中增加自己的亮点,拿高薪,个人觉得可以在这方面突破。

作者:懂Java的测试


欢迎关注微信公众号 :Python测试社区