消息队列:第三章:在Java中使用消息队列

在项目中导入依赖坐标

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
 
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

[点击并拖拽以移动]

使用队列queue的情况

producer端

    public static void main(String[] args) {
        ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616");
        try {
            //创建连接对象
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //使用队列queue
            Queue testqueue = session.createQueue("boss drink");
            //创建提供者
            MessageProducer producer = session.createProducer(testqueue);
            TextMessage textMessage=new ActiveMQTextMessage();
            textMessage.setText("我渴了,我要喝水!");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.send(textMessage);
            session.commit();// 事务型消息,必须提交后才生效
            connection.close();
 
        } catch (JMSException e) {
            e.printStackTrace();
        }
 
    }

consumer端1

    public static void main(String[] args) {
        // 连接
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
        try {
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("boss drink");
 
            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text+",员工1马上拿起杯子打水。。。");
 
                            //session.rollback();
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
 
 
        }catch (Exception e){
            e.printStackTrace();;
        }
 
    }

consumer端2

    public static void main(String[] args) {
        // 连接
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
        try {
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Destination testqueue = session.createQueue("boss drink");
 
            MessageConsumer consumer = session.createConsumer(testqueue);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text+",员工2马上拿起杯子打水。。。");
 
                            //session.rollback();
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
 
 
        }catch (Exception e){
            e.printStackTrace();;
        }
 
    }

使用topic话题的情况:

producer端:

    public static void main(String[] args) {
        ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.0.100:61616");
        try {
            //创建连接对象
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //使用topic话题(订阅)
            Topic topic = session.createTopic("boss speak");
            //创建提供者
            MessageProducer producer = session.createProducer(topic);
            TextMessage textMessage=new ActiveMQTextMessage();
            textMessage.setText("我们要为中国的伟大复兴而努力奋斗!");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.send(textMessage);
            session.commit();// 事务型消息,必须提交后才生效
            connection.close();
 
        } catch (JMSException e) {
            e.printStackTrace();
        }
 
    }

consumer端1:

    public static void main(String[] args) {
        // 连接
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
        try {
            Connection connection = connect.createConnection();
            //设置客户端id
            connection.setClientID("userOne");
 
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("boss speak");
            //在消费端标记,Session.createDurableSubscriber(Topic topic, String name)
            //是发布-订阅持久化的接收端的设置。
            //参数  topic -> 与发送端的topic 对应,建立连接
            //参数 name -> 为订阅者的标识(相当于id)
            MessageConsumer consumer =session.createDurableSubscriber(topic,"userOne");
            //MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text+",员工1这个月工资不要了。。。");
 
                            //session.rollback();
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
 
 
        }catch (Exception e){
            e.printStackTrace();;
        }
 
    }

consumer端2:

    public static void main(String[] args) {
        // 连接
        ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.0.100:61616");
        try {
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //Destination :目标
            Destination topic = session.createTopic("boss speak");
 
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message instanceof TextMessage){
                        try {
                            String text = ((TextMessage) message).getText();
                            System.out.println(text+",员工2周末主动来加班。。。");
 
                            //session.rollback();
                            session.commit();
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
        }catch (Exception e){
            e.printStackTrace();;
        }
    }

关于事务控制

在这里插入图片描述在这里插入图片描述

持久化与非持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。
与springboot整合

在application.properties文件中加入spring.activemq.broker-url=tcp://mq.server.com:61616
配置类ActiveMQConfig:项目启动的时候加载并执行里面所有的方法

@Configuration
public class ActiveMQConfig {
 
    @Value("${spring.activemq.broker-url:disabled}")
    String brokerURL ;
 
    @Value("${activemq.listener.enable:disabled}")
    String listenerEnable;
 
    @Bean
    public ActiveMQUtil getActiveMQUtil() throws JMSException {
        if(brokerURL.equals("disabled")){
            return null;
        }
        ActiveMQUtil activeMQUtil=new ActiveMQUtil();
        activeMQUtil.init(brokerURL);
        return  activeMQUtil;
    }
 
    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        if(!listenerEnable.equals("true")){
            return null;
        }
 
        factory.setConnectionFactory(activeMQConnectionFactory);
        //设置并发数
        factory.setConcurrency("5");
 
        //重连间隔时间
       factory.setRecoveryInterval(5000L);
       factory.setSessionTransacted(false);
       factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }
 
 
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
/*        if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
            url=brokerURL;
        }*/
        ActiveMQConnectionFactory activeMQConnectionFactory =
                new ActiveMQConnectionFactory(  brokerURL);
        return activeMQConnectionFactory;
    }
 
}

工具类ActiveMQUtil

public class ActiveMQUtil {
    PooledConnectionFactory pooledConnectionFactory=null;
 
    public ConnectionFactory init(String brokerUrl) {
 
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
         //加入连接池
        pooledConnectionFactory=new PooledConnectionFactory(factory);
        //出现异常时重新连接
        pooledConnectionFactory.setReconnectOnException(true);
        //
        pooledConnectionFactory.setMaxConnections(5);
        pooledConnectionFactory.setExpiryTimeout(10000);
        return pooledConnectionFactory;
    }
 
    public ConnectionFactory getConnectionFactory(){
        return pooledConnectionFactory;
    }
}

案例:

controller:

@RequestMapping("/alipay/callback/return")
    public String callBackReturn(HttpServletRequest request,Map<String,String> paramsMap){// 页面同步反转的回调
        String out_trade_no = request.getParameter("out_trade_no");
        String trade_no = request.getParameter("trade_no");
        String sign = request.getParameter("sign");
        try {
            boolean b = AlipaySignature.rsaCheckV1(paramsMap,AlipayConfig.alipay_public_key,AlipayConfig.charset,AlipayConfig.sign_type);// 对支付宝回调签名的校验
        } catch (AlipayApiException e) {
            e.printStackTrace();
        }
        // 修改支付信息
        PaymentInfo paymentInfo = new PaymentInfo();
        paymentInfo.setPaymentStatus("已支付");
        paymentInfo.setCallbackContent(request.getQueryString());
        paymentInfo.setOutTradeNo(out_trade_no);
        paymentInfo.setAlipayTradeNo(trade_no);
        paymentInfo.setCallbackTime(new Date());
 
        //这里使用Queue队列
        // 发送系统消息,出发并发商品支付业务服务O2O消息队列
        paymentService.sendPaymentSuccess(paymentInfo.getOutTradeNo(),paymentInfo.getPaymentStatus(),trade_no);
 
        paymentService.updatePayment(paymentInfo);
 
        return "finish";
    }

servcieimpl:

@Override
    public void sendPaymentSuccess(String outTradeNo, String paymentStatus,String trackingNo) {
        try {
            // 连接消息服务器
            ConnectionFactory connect = activeMQUtil.getConnectionFactory();
            Connection connection = connect.createConnection();
            connection.start();
            //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            // 发送消息
            Queue testqueue = session.createQueue("PAYMENT_SUCCESS_QUEUE");
 
            MessageProducer producer = session.createProducer(testqueue);
            MapMessage mapMessage=new ActiveMQMapMessage();
            mapMessage.setString("out_trade_no",outTradeNo);
            mapMessage.setString("payment_status",paymentStatus);
            mapMessage.setString("tracking_no",trackingNo);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.send(mapMessage);
            session.commit();// 事务型消息,必须提交后才生效
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }