技术干货实战(1)- RabbitMQ死信与延迟队列的区别与实现

作者: 修罗debug
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。





摘要:对于消息中间件RabbitMQ,想必各位小伙伴并不陌生,其广泛应用程度不言而喻,此前我们也在许多课程以及诸多专栏文章中介绍了它的应用,其应用场景也是相当广泛的,像什么消息异步通信、服务模块解耦、高并发流量削峰、订单超时未支付自动失效等等都是实际项目中最为常见的场景。本文我们将重点介绍并实现RabbitMQ的死信与延时队列,并将两者做一个简单的对比!


内容:对于RabbitMQ的死信队列,此前我们在“Java秒杀系统”这一技术专栏中已经有重点介绍过了,在那里我们是将其应用于 “订单超时未支付自动失效”这一业务场景中,简而言之,“死信队列”是一种特殊的“队列”,跟普通的队列相比,具有“延迟处理任务”的特性。


而在消息中间件RabbitMQ的架构组件中,也存在着跟“死信队列”在功能特性方面几乎相同的组件,那就是“延迟队列/延时队列”,同样也具有“延迟、延时处理任务”的功效!

当然啦,这两者还是有一丢丢区别的,最直观的当然是名字上啦,从名字上你就可以看出来两者的“处事风格”是不一样的,具体体现在:


一、创建上的差异:

(1)RabbitMQ的死信队列DeadQueue是由“死信交换机DLX”+“死信路由DLK”组成的,当然,可能还会有“TTL”,而DLX和DLK又可以绑定指向真正的队列RealQueue,这个队列RealQueue便是“消费者”真正监听的对象.

(2)而RabbitMQ的延迟/延时队列DelayedQueue 则是由普通的队列来创建即可,唯一不同的地方在于其绑定的交换机为自定义的交换机,即“CustomExchange”,在创建该交换机时只需要指定其消息的类型为 “x-delayed-message”即可.“消费者”真正监听的队列也是它本人,即DelayedQueue

画外音:从这一点上看,延迟/延时队列的创建相对而言简单一些!)


二、功能特性上的差异:

(1)死信队列在实际应用时虽然可以实现“延时、延迟处理任务”的功效,但进入死信中的消息却依然保留了队列的特性,即“FIFO” ~ 先进先出,而不管先后进入队列中消息的TTL的值. 即假设先后进入死信的消息为A、B、C,各自的TTL分别为:10s、3s、5s,理论上TTL先后到达的顺序是:B、C、A,然后从死信出来,最终被路由到真正的队列中,即消息被消费的先后顺序应该为:B、C、A,然而现实却是残酷的,其最终消费的消息的顺序为:A、B、C,即“消息是怎么进去的,就怎么出来”,保留了所谓的FIFO特性.

(2)或许是因为死信有这种缺陷,所以RabbitMQ提供了另一种组件,即“延迟队列”,它可以很完美的解决上面死信出现的问题,即最终消费的消息的顺序为:B、C、A,我们将在下面用实际的代码进行实战实现与演练.


三、插件安装上的差异:

(1)死信不需要额外的插件

(2)但是延迟队列在实际项目使用时却需要在Mq Server中安装一个插件,它的名字叫做:“rabbitmq_delayed_message_exchange”,其安装过程可以参考链接:https://www.cnblogs.com/isunsine/p/11572457.html  里面就提供了Windows环境和Linux环境下的插件的安装过程(很简单,只需要不到3步的步骤.)


四、代码的实战实现~RabbitMQ的死信队列

       说了这么多,想必有些小伙伴有点不耐烦了,下面我将采用实际的代码对上面所介绍的几点区别进行实现与演练(代码都是基于Spring Boot2.0搭建的项目环境实现与测试的)

(1)首先,我们需要创建死信队列以及真正的队列,并实现相关的绑定:   

   //构建订单超时未支付的死信队列消息模型
@Bean
public Queue successKillDeadQueue(){
Map<String, Object> argsMap= Maps.newHashMap();
argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}

//基本交换机
@Bean
public TopicExchange successKillDeadProdExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}

//创建基本交换机+基本路由 -> 死信队列 的绑定
@Bean
public Binding successKillDeadProdBinding(){
return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}

//真正的队列
@Bean
public Queue successKillRealQueue(){
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}

//死信交换机
@Bean
public TopicExchange successKillDeadExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}

//死信交换机+死信路由->真正队列 的绑定
@Bean
public Binding successKillDeadBinding(){
return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}


(2)将项目运行起来,登录RabbitMQ的后端控制台,可以看到成功创建了相应的死信队列和真正的队列等组件,如下图所示:



(3)紧接着,我们在Controller中建立一个请求方法,用于接收前端请求过来的消息,并将该消息附以TTL值,塞入死信队列中,如下所示:

    //死信队列-生产者
@RequestMapping(value = "dead/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDQMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
Message realMsg=MessageBuilder.withBody(msg.getBytes("UTF-8")).build();

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"), env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"), realMsg, message -> {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

//TODO:动态设置TTL
mp.setExpiration(String.valueOf(ttl));

log.info("死信队列生产者-发出消息:{} TTL:{}",msg,ttl);
return message;
});
}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}


(4)最后是写一个Spring Bean类充当消费者,在其中监听“实际队列”的消息:  

@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(@Payload byte[] msg){
try {
log.info("死信队列-监听者-接收消息:{}",new String(msg,"UTF-8"));

}catch (Exception e){
log.error("死信队列-监听者-发生异常:",e.fillInStackTrace());
}
}


最后,我们进入测试环节,打开Postman,前后输入3次不同的请求信息,其中各自的TTL也不尽相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最终在Console控制台等待,你会发现消费者监听的消息的顺序为:A、B、C,而不是C、B、A,如下图所示:




五、代码的实战实现~RabbitMQ的延迟/延时队列

很明显,由于死信存在的这个缺陷,故而其在上面的应用场景中是不太适用的!即死信队列在 消息的TTL不一致,且后入死信的消息TTL小于前入的消息TTL的应用场景中是不适用的,而像“订单超时未支付”的应用场景,因为大家都一样,都是固定的30min或者 1h,故而这种场景,死信是相当适合的

因此,为了解决实际项目中“TTL不一致且不固定”的应用场景,我们需要搬上“延迟/延时队列”(当然啦,Redisson的延迟/延迟队列也是可以实现的!),下面我们用代码加以实现!

(1)首先是创建“延迟/延时队列”等相关的组件,如下所示:

    //TODO:RabbitMQ延迟队列
@Bean
public Queue delayQueue(){
return QueueBuilder.durable(env.getProperty("mq.kill.delay.queue")).build();
}

@Bean
public CustomExchange delayExchange(){
Map<String,Object> map=Maps.newHashMap();
map.put("x-delayed-type","direct");
return new CustomExchange(env.getProperty("mq.kill.delay.exchange"),"x-delayed-message",true,false,map);
}

@Bean
public Binding delayBinding(){
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(env.getProperty("mq.kill.delay.routingKey")).noargs();
}

(2)其生产者发送消息的代码我们仍然是放在一个Controller的请求方法中,如下所示:  

//延迟队列-生产者
@RequestMapping(value = "delay/msg/send",method = RequestMethod.GET)
@ResponseBody
public BaseResponse sendDelayMsg(@RequestParam String msg,@RequestParam Long ttl){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
String info=msg;

Message realMsg=MessageBuilder.withBody(info.getBytes("UTF-8")).build();
rabbitTemplate.convertAndSend(env.getProperty("mq.kill.delay.exchange"),env.getProperty("mq.kill.delay.routingKey"),
realMsg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader("x-delay",ttl);

log.info("延迟队列生产者-发出消息:{} TTL:{}",msg,ttl);
return message;
}
});

}catch (Exception e){
response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage());
}
return response;
}

(3)最后是用于监听延迟队列中消息的消费者的代码,如下所示:  

/**
* 延时队列-消息监听器-消费者
* @Author:debug (SteadyJack)
* @Link: weixin-> debug0868 qq-> 1948831260
**/
@Component
public class DelayQueueMqListener {
private static final Logger log= LoggerFactory.getLogger(DelayQueueMqListener.class);

//消息监听
@RabbitListener(queues = {"${mq.kill.delay.queue}"})
public void consumeMsg(@Payload byte[] msg){
try {
String info=new String(msg,"UTF-8");

log.info("延时队列监听到消息:{} ",info);
}catch (Exception e){
log.error("延时队列-消息监听器-消费者-消息监听-发生异常:",e.fillInStackTrace());
}
}
}

(4)将项目跑起来,可以看到RabbitMQ的后端控制台已经建立了该队列,如下图所示:


(5)最后,我们打开postman,前后输入3次不同的请求信息,其中各自的TTL也不尽相同,即消息A的TTL=50s,消息B的TTL=20s,消息C的TTL=10s,最终在Console控制台等待,你会发现消费者监听的消息的顺序为:C、B、A,而不是A、B、C,,即按照消息的TTL来决定消费的先后顺序,如下图所示:


从该运行结果上看,会发现这才是我们真正想要的结果,即按照时间TTL的大小来决定消息被消费的先后顺序,而且,你可以看出消费时的时间跟发出的时间刚好差 TTL !

在文章的最后的,我们简单总结一下本文所讲的内容,即主要介绍、对比并实战了RabbitMQ中两款具有“延时、延迟处理任务”功效的组件,即“死信队列”和“延迟队列”,其差异性主要体现在:创建上的不同、功能特性的不同、插件安装上的不同等方面。

总体来说,如果是想追求消息传输的稳定性、可靠性且TTL是固定的话,那么建议选择“死信队列”,因为消息从一开始就在队列中待着,等到TTL一到才被路由到真正的队列!而“延迟队列”则不同,即发送出去的消息需要等待 TTL 的时间才进入“延迟队列”,如果在等待的期间,Mq Server 宕机了,那很可能消息就丢失了…..

好了,本文我们就介绍到这里了,最后打个小广告,Debug最近上新了一门新课:Java分布式中间件大汇聚实战第一季 (SpringBoot2.0+点赞系统+面试),课程所介绍的内容正是基于企业级项目真实的应用案例为出发点,来实战各种典型的主流技术栈,目前课程还处于优惠期,原价是129,目前仍然是59.9而已哦(下个月就要涨价喽…)


课程观看:https://www.fightjava.com/web/index/course/detail/15

其他相关的技术,感兴趣的小伙伴可以关注底部Debug的技术公众号,或者加Debug的微信,拉你进“微信版”的真正技术交流群!一起学习、共同成长!


补充

1、若想学习其他的技术干货,可以前往Debug自建的技术社区进行学习观看,包括技术专栏、博客和课程哦:https://www.fightjava.com/

2、关注一下Debug的技术微信公众号,最新的技术文章、课程以及技术专栏将会第一时间在公众号发布哦!