消息队列:第五章:RabbitMQ的使用
第一步:使用之前先安装好RabbitMQ,建议安装在linux系统下
安装配置RabbitMQ:https://blog.csdn.net/qq_33450681/article/details/85339315
第二步:在配置文件下配置
rabbitmq:
host: 192.168.0.100
port: 5672
virtual-host: /mall
username: mall
password: mall
publisher-confirms: true #如果对异步消息需要回调必须设置为true
浏览器访问http://192.168.0.100:15672/#/
第三步:业务中使用发送消息
@Autowired
private OmsOrderSettingMapper orderSettingMapper;
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 发送检查支付结果的消息队列
* @param orderSn
* @param count
*/
@Override
public void sendDelayPaymentCheck(String orderSn, int count) {
//获取订单超时时间
OmsOrderSetting orderSetting = orderSettingMapper.selectByPrimaryKey(1L);
long delayTimes = orderSetting.getNormalOrderOvertime() * 60 * 1000;
//将需要发送的数据封装到hashmap中
HashMap<Object, Object> hashMap = new HashMap<>();
hashMap.put("out_trade_no",orderSn);
hashMap.put("count",count);
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_PAY_CANCEL.getExchange(),
QueueEnum.QUEUE_PAY_CANCEL.getRouteKey(), hashMap, new
MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
}
第四步:定义QueueEnum枚举
/**
* 支付通知队列
*/
QUEUE_PAY_CANCEL("mall.pay.direct","mall.pay.cancel","mall.pay.cancel")
/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
public String getExchange() {
return exchange;
}
public String getName() {
return name;
}
public String getRouteKey() {
return routeKey;
}
第五步:配置
RabbitMQ参数配置:
使用一个RabbitMQ需要配置以下几个重要的参数
1.虚拟主机名称(Virtual host name),这个参数不是真正的IP地址或者域名,它是RabbitMQ内部的一个虚拟主机,就像是电脑安装了N台虚拟机,对外的名称一般是“/xxxx".
2.交换机名(Exchanges name):顾名思义,就是把生产者送来的消息来进行分发给下游的多个消费者,相当一个内部软交换机。交换机的类型有fanout,direct,topic,header,fanout类型类似以太网交换机的广播模式,把送来的消息给每个下游队列。direct类似单播(使用routingkey来指定目的队列),topic交换机类似组播,把消息传递给下面同一主题的队列,header交换机则忽略掉routingkey,使用hash数据结构来进行匹配和转发。
3.routingkey :前面讲过了,交换机在进行消息转发时候,要使用routingkey为关键字进行转发。
4.队列名称:可以为不同的消费者指定不同的队列,可以对消息进行分类到不同的队列进行转发。
配置类
/**
* 消息队列配置
* Created by macro on 2018/9/14.
*/
@Configuration
public class RabbitMqConfig {
/**
* 支付队列
* @return
*/
@Bean
public Queue payQueue() {
return new Queue(QueueEnum.QUEUE_PAY_CANCEL.getName());
}
/**
* 绑定支付交互机
* @return
*/
@Bean
DirectExchange payDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_PAY_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 将支付队列绑定到支付交互机
* @param payDirect
* @param payQueue
* @return
*/
@Bean
Binding payBinding(DirectExchange payDirect,Queue payQueue){
return BindingBuilder
.bind(payQueue)
.to(payDirect)
.with(QueueEnum.QUEUE_PAY_CANCEL.getRouteKey());
}
第六步:处理支付信息
package com.macro.mall.portal.component;
import com.macro.mall.model.PaymentInfo;
import com.macro.mall.portal.service.PaymentService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
/**
* 支付的处理者
*/
@Component
@RabbitListener(queues = "mall.pay.cancel")
public class PayReceiver {
@Autowired
PaymentService paymentService;
@RabbitHandler
public void handle(HashMap mapMessage){
try {
String outTradeNo = mapMessage.get("out_trade_no").toString();
int count = Integer.parseInt(mapMessage.get("count").toString());
// 如果没有支付成功,再次发送延迟检查队列
if (count > 0) {
// 进行支付状态检查
System.out.println("正在进行第" + (6 - count) + "支付结果次检查");
//调用alipayClient接口,根据out_trade_no查询支付信息
PaymentInfo paymentInfo = paymentService.checkPaymentResult(outTradeNo);
Thread thread = new Thread();
thread.start();
Thread.sleep(10000);
//判断是否已经支付成功
if
(paymentInfo.getPaymentStatus()!=null&&(paymentInfo.getPaymentStatus().equals("TRADE_SUCCESS")
|| paymentInfo.getPaymentStatus().equals("TRADE_FINISHED"))) {
// 交易成功或者失败,记录交易状态
System.out.println("检查交易结果成功,记录交易状态。。。");// 修改支付的状态信息
// 修改支付信息
boolean b = paymentService.checkPaymentStatus(outTradeNo);
if(!b){
//修改为已支付
paymentService.updatePayment(paymentInfo.getCallbackContent(),outTradeNo,paymentInfo.getAlipayTradeNo());
// 发送系统消息,出发并发商品支付业务消息队列
paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),paymentInfo.getAlipayTradeNo());
}
} else {//未支付
// 再次进行延迟检查
System.out.println("正在进行第" + (6 - count) + "支付结果次检查,检查用户尚未付款成功,继续巡检");
paymentService.sendDelayPaymentCheck(outTradeNo, count - 1);
}
} else {
System.out.println("支付结果次检查次数耗尽,支付未果。。。");
}
} catch (Exception e) {
}
}
}