spring boot实战(番外篇)整合RabbitMQ

2024-09-08 01:38

本文主要是介绍spring boot实战(番外篇)整合RabbitMQ,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言
最近几篇文章将围绕消息中间件RabbitMQ展开,对于RabbitMQ基本概念这里不阐述,主要讲解RabbitMQ的基本用法、Java客户端API介绍、spring Boot与RabbitMQ整合、

Spring Boot与RabbitMQ整合源码分析。

 

RabbitMQ安装
 

在使用消息中间件RabbitMQ之前就是安装RabbitMQ。

 

安装erlang:yum install erlang 
下载RabbitMQ安装包: https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-generic-unix-3.5.6.tar.gz
解压安装包、配置环境变量RABBITMQ_HOME
 
 
参考网址:https://www.rabbitmq.com/install-generic-unix.html
windows:  https://www.rabbitmq.com/install-windows.html
 
RabbitMQ配置
1.安装完成后需要对RabbitMQ进行配置,在etc/rabbitmq目录下创建两个文件:
rabbitmq-env.conf 环境信息配置
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1  
RABBITMQ_NODE_PORT=5672  
RABBITMQ_NODENAME=node01
 

rabbitmq.config 核心配置文件

          [{rabbit, [{loopback_users, []}]}].  
该配置表示是的默认用户guest用户可以远程访问mq(广域网不能访问,内网可以访问)
 

2.启动RabbitMQ 执行命令 rabbitmq-server

           RabbitMQ 3.5.4. Copyright (C) 2007-2015 Pivotal Software, Inc.  
##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/  
##  ##  
##########  Logs: /Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01.log  
######  ##        /Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01-sasl.log  
##########  
            Starting broker... completed with 0 plugins.  

 

3. RabbitMQ提供WEB-UI管理控制台,使用 rabbitmq-plugins enable rabbitmq_management命令启用,重启后可以看到
 
        Starting broker... completed with 6 plugins.  

表明WEB-UI控制台启动成功,访问:http://localhost:15672/
 
 
登陆进入:
 
通过该控制台可以方便管理RabbitMQ。
 
创建Test用户
RabbitMQ默认使用guest用户,下面讲述如何创建一个test用户,最快捷的做法使用web管理控制台

 
 
这里使用命令创建:
rabbitmqctl add_user test test
rabbitmqctl set_user_tags test  administrator

tag分为四种"management", "policymaker", "monitoring" "administrator" 详见 http://www.rabbitmq.com/management.html

RabbitMQ 其他
 
在实际使用RabbitMQ中还需要涉及到 RabbitMQ的集群、高可用(采用镜像队列实现)以后有机会再详细阐述,有兴趣可参考https://www.rabbitmq.com/documentation.html
 
 
RabbitMQ Java Client
 
RabbitMQ 客户端支持语言种类繁多,官方都一一举例:https://www.rabbitmq.com/getstarted.html
 
这里主要自己开发一个小的demo
 
消息消费者
操作步骤:
创建连接工厂ConnectionFactory
获取连接Connection
通过连接获取通信通道Channel
声明交换机Exchange:交换机类型分为四类:
    FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念

        HeadersExchange :通过添加属性key-value匹配

        DirectExchange:按照routingkey分发到指定队列

        TopicExchange:多关键字匹配

声明队列Queue

将队列和交换机绑定

创建消费者

执行消息的消费


package org.lkl.mq.rabbitmq.test;  
  
import java.io.IOException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.TimeoutException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.ConsumerCancelledException;  
import com.rabbitmq.client.QueueingConsumer;  
import com.rabbitmq.client.QueueingConsumer.Delivery;  
import com.rabbitmq.client.ShutdownSignalException;  
  
/**  
 * 客户端01  
 *   
 * @author liaokailin  
 * @version $Id: Receive01.java, v 0.1 2015年11月01日 下午3:47:58 liaokailin Exp $  
 */  
public class Receive01 {  
    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,  
                                          ConsumerCancelledException, InterruptedException {  
        ConnectionFactory facotry = new ConnectionFactory();  
        facotry.setUsername("test");  
        facotry.setPassword("test");  
        facotry.setVirtualHost("test");  
        facotry.setHost("localhost");  
  
        Connection conn = facotry.newConnection(); //获取一个链接  
        //通过Channel进行通信  
        Channel channel = conn.createChannel();  
        int prefetchCount = 1;  
        channel.basicQos(prefetchCount); //保证公平分发  
  
        boolean durable = true;  
        //声明交换机  
        channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", durable); //按照routingKey过滤  
        //声明队列  
        String queueName = channel.queueDeclare("queue-01", true, true, false, null).getQueue();  
        //将队列和交换机绑定  
        String routingKey = "lkl-0";  
        //队列可以多次绑定,绑定不同的交换机或者路由key  
        channel.queueBind(queueName, Send.EXCHANGE_NAME, routingKey);  
  
        //创建消费者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
          
        //将消费者和队列关联  
        channel.basicConsume(queueName, false, consumer); // 设置为false表面手动确认消息消费  
  
        //获取消息  
  
        System.out.println(" Wait message ....");  
        while (true) {  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());  
            String key = delivery.getEnvelope().getRoutingKey();  
  
            System.out.println("  Received '" + key + "':'" + msg + "'");  
            System.out.println(" Handle message");  
            TimeUnit.SECONDS.sleep(3); //mock handle message  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //确定该消息已成功消费  
        }  
  
    }  
}  

消息生产者
操作步骤:
创建连接工厂ConnectionFactory
获取连接Connection
通过连接获取通信通道Channel
发送消息
 

package org.lkl.mq.rabbitmq.test;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.ConfirmListener;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;  
  
/**  
 * 消息publish  
 *   
 * @author liaokailin  
 * @version $Id: Send.java, v 0.1 2015年10月22日 下午3:48:09 liaokailin Exp $  
 */  
public class Send {  
    public final static String EXCHANGE_NAME = "test-exchange";  
  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
        /**  
         * 配置amqp broker 连接信息  
         */  
        ConnectionFactory facotry = new ConnectionFactory();  
        facotry.setUsername("test");  
        facotry.setPassword("test");  
        facotry.setVirtualHost("test");  
        facotry.setHost("localhost");  
  
        Connection conn = facotry.newConnection(); //获取一个链接  
        //通过Channel进行通信  
        Channel channel = conn.createChannel();  
  
        // channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", true); //如果消费者已创建,这里可不声明  
        channel.confirmSelect(); //Enables publisher acknowledgements on this channel  
        channel.addConfirmListener(new ConfirmListener() {  
  
            @Override  
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {  
                System.out.println("[handleNack] :" + deliveryTag + "," + multiple);  
  
            }  
  
            @Override  
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {  
                System.out.println("[handleAck] :" + deliveryTag + "," + multiple);  
            }  
        });  
  
        String message = "lkl-";  
        //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN  
        //发送多条信息,每条消息对应routekey都不一致  
        for (int i = 0; i < 10; i++) {  
            channel.basicPublish(EXCHANGE_NAME, message + (i % 2), MessageProperties.PERSISTENT_TEXT_PLAIN,  
                (message + i).getBytes());  
            System.out.println("[send] msg " + (message + i) + " of routingKey is " + (message + (i % 2)));  
        }  
  
    }  
}  

 

在设置消息被消费的回调前需显示调用
 
channel.confirmSelect()  
 

否则回调函数无法调用
 
先执行消费者,消费者会轮询是否有消息的到来,在web控制也可以观察哦~~,再启动生产者发送消息。

================================
前言
本篇主要讲述spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar

1
2
3
<dependency> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-amqp</artifactId>  <br>/dependency>
消息生产者
不论是创建消息消费者或生产者都需要ConnectionFactory
 
ConnectionFactory配置
创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration 
public class AmqpConfig { 
   
    public static final String EXCHANGE   = "spring-boot-exchange"; 
    public static final String ROUTINGKEY = "spring-boot-routingKey"; 
   
    @Bean 
    public ConnectionFactory connectionFactory() { 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); 
        connectionFactory.setAddresses("127.0.0.1:5672"); 
        connectionFactory.setUsername("guest"); 
        connectionFactory.setPassword("guest"); 
        connectionFactory.setVirtualHost("/"); 
        connectionFactory.setPublisherConfirms(true); //必须要设置 
        return connectionFactory; 
    } 
}

这里需要显示调用
connectionFactory.setPublisherConfirms(true);  
才能进行消息的回调。
RabbitTemplate
通过使用RabbitTemplate来对开发者提供API操作

@Bean  
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
//必须是prototype类型  
public RabbitTemplate rabbitTemplate() {  
    RabbitTemplate template = new RabbitTemplate(connectionFactory());  
    return template;  
}  

 

这里设置为原型,具体的原因在后面会讲到
 
  在发送消息时通过调用RabbitTemplate中的如下方法
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)  
exchange:交换机名称
routingKey:路由关键字

object:发送的消息内容

correlationData:消息ID

因此生产者代码详单简洁
Send.java

@Component  
public class Send  {  
  
    private RabbitTemplate rabbitTemplate;  
  
    /**  
     * 构造方法注入  
     */  
    @Autowired  
    public Send(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
    }  
  
    public void sendMsg(String content) {  
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  
    }  
  
       
}  

     

       如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate

实际的ConfirmCallback为最后一次申明的ConfirmCallback。

下面给出完整的生产者代码:

 

 


package com.lkl.springboot.amqp;  
  
import java.util.UUID;  
  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.support.CorrelationData;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  
  
/**  
 * 消息生产者  
 *   
 * @author liaokailin  
 * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $  
 */  
@Component  
public class Send implements RabbitTemplate.ConfirmCallback {  
  
    private RabbitTemplate rabbitTemplate;  
  
    /**  
     * 构造方法注入  
     */  
    @Autowired  
    public Send(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容  
    }  
  
    public void sendMsg(String content) {  
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  
    }  
  
    /**  
     * 回调  
     */  
    @Override  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
        System.out.println(" 回调id:" + correlationData);  
        if (ack) {  
            System.out.println("消息成功消费");  
        } else {  
            System.out.println("消息消费失败:" + cause);  
        }  
    }  
  
}  

 

消息消费者
消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。

交换机

/**  
     * 针对消费者配置  
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
        return new DirectExchange(EXCHANGE);  
    }  

 

在Spring Boot中交换机继承AbstractExchange类
队列
@Bean  
    public Queue queue() {  
        return new Queue("spring-boot-queue", true); //队列持久  
  
    } 
 

绑定
@Bean  
  public Binding binding() {  
      return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
  } 
完成以上工作后,在spring boot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。
 
消息消费

@Bean  
  public SimpleMessageListenerContainer messageContainer() {  
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
      container.setQueues(queue());  
      container.setExposeListenerChannel(true);  
      container.setMaxConcurrentConsumers(1);  
      container.setConcurrentConsumers(1);  
      container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
      container.setMessageListener(new ChannelAwareMessageListener() {  
  
          @Override  
          public void onMessage(Message message, Channel channel) throws Exception {  
              byte[] body = message.getBody();  
              System.out.println("receive msg : " + new String(body));  
              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
          }  
      });  
      return container;  
  }  

 


下面给出完整的配置文件:

package com.lkl.springboot.amqp;  
  
import org.springframework.amqp.core.AcknowledgeMode;  
import org.springframework.amqp.core.Binding;  
import org.springframework.amqp.core.BindingBuilder;  
import org.springframework.amqp.core.DirectExchange;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  
import org.springframework.beans.factory.config.ConfigurableBeanFactory;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.context.annotation.Scope;  
  
import com.rabbitmq.client.Channel;  
  
/**  
 * Qmqp Rabbitmq  
 *   
 * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/  
 *   
 * @author lkl  
 * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $  
 */  
  
@Configuration  
public class AmqpConfig {  
  
    public static final String EXCHANGE   = "spring-boot-exchange";  
    public static final String ROUTINGKEY = "spring-boot-routingKey";  
  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
        connectionFactory.setAddresses("127.0.0.1:5672");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setVirtualHost("/");  
        connectionFactory.setPublisherConfirms(true); //必须要设置  
        return connectionFactory;  
    }  
  
    @Bean  
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    //必须是prototype类型  
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        return template;  
    }  
  
    /**  
     * 针对消费者配置  
     * 1. 设置交换机类型  
     * 2. 将队列绑定到交换机  
     *   
     *   
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
        return new DirectExchange(EXCHANGE);  
    }  
  
    @Bean  
    public Queue queue() {  
        return new Queue("spring-boot-queue", true); //队列持久  
  
    }  
  
    @Bean  
    public Binding binding() {  
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
    }  
  
    @Bean  
    public SimpleMessageListenerContainer messageContainer() {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
        container.setMessageListener(new ChannelAwareMessageListener() {  
  
            @Override  
            public void onMessage(Message message, Channel channel) throws Exception {  
                byte[] body = message.getBody();  
                System.out.println("receive msg : " + new String(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
            }  
        });  
        return container;  
    }  
  
}  

 


以上完成 Spring Boot与RabbitMQ的整合 
 
 
自动配置
在Spring Boot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息
spring.rabbitmq.host=localhost  
spring.rabbitmq.port=5672  
spring.rabbitmq.username=test  
spring.rabbitmq.password=test  
spring.rabbitmq.virtualHost=test  
后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?
自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码
connectionFactory.setPublisherConfirms(true);  
 
具体分析见后续文章的源码解读.
 

=========================================

 

前言
本篇开始讲述Spring Boot如何整合RabbitMQ(实际上Spring就整合了RabbitMQ)。
 
RabbitAdmin

 

在上篇中遗留AmqpAdmin没有讲解,现在来看下该部分代码
public AmqpAdmin amqpAdmin(CachingConnectionFactory connectionFactory) {  
        return new RabbitAdmin(connectionFactory);  
    }  
 

创建RabbitAdmin实例,调用构造方法
public RabbitAdmin(ConnectionFactory connectionFactory) {  
    this.connectionFactory = connectionFactory;  
    Assert.notNull(connectionFactory, "ConnectionFactory must not be null");  
    this.rabbitTemplate = new RabbitTemplate(connectionFactory);  

 


创建连接工厂、rabbitTemplate,其中ConnectionFactory采用上一篇中自定义bean

public ConnectionFactory connectionFactory() {  
     CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
     connectionFactory.setAddresses("127.0.0.1:5672");  
     connectionFactory.setUsername("guest");  
     connectionFactory.setPassword("guest");  
     connectionFactory.setPublisherConfirms(true); //必须要设置  
     return connectionFactory;  
 }  

 

为CachingConnectionFactory实例,其缓存模式为通道缓存
private volatile CacheMode cacheMode = CacheMode.CHANNEL;  
 

 
接下来看下RabbitAdmin类定义:
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {  
...  
}  
 

实现接口AmqpAdmin(定义若干RabbitMQ操作父接口),这里需要强调的是InitializingBean,实现该接口则会调用afterPropertiesSet方法

public void afterPropertiesSet() {  
  
        synchronized (this.lifecycleMonitor) {  
  
            if (this.running || !this.autoStartup) {  
                return;  
            }  
  
            if (this.connectionFactory instanceof CachingConnectionFactory &&  
                    ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {  
                logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");  
                return;  
            }  
  
            this.connectionFactory.addConnectionListener(new ConnectionListener() {  
  
                // Prevent stack overflow...  
                private final AtomicBoolean initializing = new AtomicBoolean(false);  
  
                @Override  
                public void onCreate(Connection connection) {  
                    if (!initializing.compareAndSet(false, true)) {  
                        // If we are already initializing, we don't need to do it again...  
                        return;  
                    }  
                    try {  
                           
                        initialize();  
                    }  
                    finally {  
                        initializing.compareAndSet(true, false);  
                    }  
                }  
  
                @Override  
                public void onClose(Connection connection) {  
                }  
  
            });  
  
            this.running = true;  
  
        }  
    }  

 

synchronized (this.lifecycleMonitor)加锁保证同一时间只有一个线程访问该代码,随后调用this.connectionFactory.addConnectionListener添加连接监听,各连接工厂关系:


实际调用为CachingConnectionFactory

public void addConnectionListener(ConnectionListener listener) {  
        super.addConnectionListener(listener);  
        // If the connection is already alive we assume that the new listener wants to be notified  
        if (this.connection != null) {  
            listener.onCreate(this.connection);  
        }  
    }  

 


此时connection为null,无法执行到listener.onCreate(this.connection); 往CompositeConnectionListener connectionListener中添加监听信息,最终保证在集合中
private List<ConnectionListener> delegates = new CopyOnWriteArrayList<ConnectionListener>();  
 

这里添加的监听代码执行,在后面调用时再来讲解。
 
至此~~ RabbitAdmin创建完成。 
 
 
Exchange

接下来继续来看AmqpConfig.java中的代码
@Bean  
  public DirectExchange defaultExchange() {  
      return new DirectExchange(EXCHANGE);  
  }  
 

以上代码创建一个交换机,交换机类型为direct
 

在申明交换机时需要指定交换机名称,默认创建可持久交换机
 
Queue
public Queue queue() {  
       return new Queue("spring-boot-queue", true); //队列持久  
   }  
 
 
默认创建可持久队列
 
Binding
@Bean  
   public Binding binding() {  
       return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
   }  
 
BindingBuilder.bind(queue()) 实现为:
public static DestinationConfigurer bind(Queue queue) {  
        return new DestinationConfigurer(queue.getName(), DestinationType.QUEUE);  
    }  
 


DestinationConfigurer通过name、type区分不同配置信息,其to()方法为重载方法,传递参数为四种交换机,分别返回XxxExchangeRoutingKeyConfigurer,其中with方法返回Bingding实例,因此在Binding信息中存储了
队列、交换机、路由key等相关信息

public class Binding extends AbstractDeclarable {  
  
    public static enum DestinationType {  
        QUEUE, EXCHANGE;  
    }  
  
    private final String destination;  
  
    private final String exchange;  
  
    private final String routingKey;  
  
    private final Map<String, Object> arguments;  
  
    private final DestinationType destinationType;  
...  
}  


以上信息理解都非常简单,下面来看比较复杂点的SimpleMessageListenerContainer

SimpleMessageListenerContainer

@Bean  
    public SimpleMessageListenerContainer messageContainer() {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
        container.setMessageListener(new ChannelAwareMessageListener() {  
  
            @Override  
            public void onMessage(Message message, Channel channel) throws Exception {  
                byte[] body = message.getBody();  
                System.out.println("receive msg : " + new String(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
            }  
        });  
        return container;  
    }  

 

查看其实现的接口,注意SmartLifecycle

 
接下来设置队列信息,在AbstractMessageListenerContainer
 
       private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();  

添加队列信息
    AbstractMessageListenerContainer#exposeListenerChannel设置为true

  
container.setMaxConcurrentConsumers(1);  
container.setConcurrentConsumers(1);  
 

设置并发消费者数量,默认情况为1
 
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
 

设置消费者成功消费消息后确认模式,分为两种
自动模式,默认模式,在RabbitMQ Broker消息发送到消费者后自动删除
手动模式,消费者客户端显示编码确认消息消费完成,Broker给生产者发送回调,消息删除
接下来设置消费者端消息监听,为privatevolatile Object messageListener 赋值
 
到这里消息监听容器也创建完成了,但令人纳闷的时,消费者如何去消费消息呢?从这里完全看不出来。那么接下来看下SmartLifecycle接口
 
SmartLifecycle
熟悉Spring都应该知道该接口,其定义为:

 
public interface SmartLifecycle extends Lifecycle, Phased {  
  
    boolean isAutoStartup();  
    void stop(Runnable callback);  
  
}
 

其中的isAutoStartup设置为true时,会自动调用Lifecycle接口中的start方法,既然我们为源码分析,也简单看下这个聪明的声明周期接口是如何实现它的聪明方法的
 

在spring boot实战(第十篇)Spring boot Bean加载源码分析中讲到执行Bean加载时,调用AbstractApplicationContext#refresh(),其中存在一个方法调用finishRefresh()
[html] view plain copy
 
protected void finishRefresh() {  
    // Initialize lifecycle processor for this context.  
    initLifecycleProcessor();  
  
    // Propagate refresh to lifecycle processor first.  
    getLifecycleProcessor().onRefresh();  
  
    // Publish the final event.  
    publishEvent(new ContextRefreshedEvent(this));  
  
    // Participate in LiveBeansView MBean, if active.  
    LiveBeansView.registerApplicationContext(this);  
}  

其中initLifecycleProcessor初始化生命周期处理器,

[html] view plain copy
 
protected void initLifecycleProcessor() {  
    ConfigurableListableBeanFactory beanFactory = getBeanFactory();  
    if (beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)) {  
        this.lifecycleProcessor =  
                beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME, LifecycleProcessor.class);  
        if (logger.isDebugEnabled()) {  
            logger.debug("Using LifecycleProcessor [" + this.lifecycleProcessor + "]");  
        }  
    }  
    else {  
        DefaultLifecycleProcessor defaultProcessor = new DefaultLifecycleProcessor();  
        defaultProcessor.setBeanFactory(beanFactory);  
        this.lifecycleProcessor = defaultProcessor;  
        beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME, this.lifecycleProcessor);  
        if (logger.isDebugEnabled()) {  
            logger.debug("Unable to locate LifecycleProcessor with name '" +  
                    LIFECYCLE_PROCESSOR_BEAN_NAME +  
                    "': using default [" + this.lifecycleProcessor + "]");  
        }  
    }  
}  

注册DefaultLifecycleProcessor对应bean
getLifecycleProcessor().onRefresh()调用DefaultLifecycleProcessor中方法onRefresh,调用startBeans(true)

 

[html] view plain copy
 
private void startBeans(boolean autoStartupOnly) {  
    Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();  
    Map<Integer, LifecycleGroup> phases = new HashMap<Integer, LifecycleGroup>();  
    for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) {  
        Lifecycle bean = entry.getValue();  
        if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {  
            int phase = getPhase(bean);  
            LifecycleGroup group = phases.get(phase);  
            if (group == null) {  
                group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);  
                phases.put(phase, group);  
            }  
            group.add(entry.getKey(), bean);  
        }  
    }  
    if (phases.size() > 0) {  
        List<Integer> keys = new ArrayList<Integer>(phases.keySet());  
        Collections.sort(keys);  
        for (Integer key : keys) {  
            phases.get(key).start();  
        }  
    }  
}  

其中
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();  

 

获取所有实现Lifecycle接口bean,执行bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()判断,如果bean同时也为Phased实例,则加入到LifecycleGroup中,随后phases.get(key).start()调用start方法

 

接下来要做的事情就很明显:要了解消费者具体如何实现,查看SimpleMessageListenerContainer中的start是如何实现的。

 

至此~~整合RabbitMQ源码分析准备工作完成,下一篇中正式解读消费者的实现。

 

==============================

 

踩坑记录
近日在用spring boot架构一个微服务框架,服务发现与治理、发布REST接口各种轻松惬意。但是服务当设计MQ入口时,就发现遇到无数地雷,现在整理成下文,供各路大侠围观与嘲笑。

版本
当前使用的spring-boot-starter-amqp版本为2016.5发布的1.3.5.RELEASE

也许若干年后,你们版本都不会有这些问题了。:(

RabbitMQ
当需要用到MQ的时候,我的第一反映就是使用RabbitMQ,猫了一眼spring boot的官方说明,上面说spring boot为rabbit准备了spring-boot-starter-amqp,并且为RabbitTemplate和RabbitMQ提供了自动配置选项。暗自窃喜~~

瞅瞅[官方文档]http://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-rabbitmq和例子,SO EASY,再看一眼GITHUB上的官方例了,也有例子。

心情愉悦的照着例子,开干~~。

踩坑
十五分钟后的代码类似这样:

@Service
@RabbitListener(queues = "merchant")
public class MQReceiver  {
    protected Logger logger = Logger.getLogger(MQReceiver.class
            .getName()); 
  
    @RabbitHandler
    public void process(@Payload UpdateMerchant request) {
        UpdateMerchantResponse response = new UpdateMerchantResponse();
        logger.info(request.getMerchantId() + "->" + response.getReturnCode());
    }
}
消费信息后,应该记录一条日志。
结果得到只有org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。。。

记得刚才官方文档好像说了异常什么的,转身去猫一眼,果然有:

If retries are not enabled and the listener throws an exception, by default the delivery will be retried indefinitely. You can modify this behavior in two ways; set the defaultRequeueRejected
 property to false
 and zero re-deliveries will be attempted; or, throw an AmqpRejectAndDontRequeueException
 to signal the message should be rejected. This is the mechanism used when retries are enabled and the maximum delivery attempts are reached.

知道了为啥会无限重试了,下面来看看为啥会抛出这个异常,google搜一下,貌似还有一个倒霉鬼遇到了这个问题。

进去看完问题和大神的解答,豁然开朗。

There are two conversions in the @RabbitListener pipeline.
The first converts from a Spring AMQP Message to a spring-messaging Message.
There is currently no way to change the first converter from SimpleMessageConverter which handles String, Serializable and passes everything else as byte[].
The second converter converts the message payload to the method parameter type (if necessary).
With method-level @RabbitListeners there is a tight binding between the handler and the method.
With class-level @RabbitListener s, the message payload from the first conversion is used to select which method to invoke. Only then, is the argument conversion attempted.
This mechanism works fine with Java Serializable objects since the payload has already been converted before the method is selected.
However, with JSON, the first conversion returns a byte[] and hence we find no matching @RabbitHandler.
We need a mechanism such that the first converter is settable so that the payload is converted early enough in the pipeline to select the appropriate handler method.
A ContentTypeDelegatingMessageConverter is probably most appropriate.
And, as stated in AMQP-574, we need to clearly document the conversion needs for a @RabbitListener, especially when using JSON or a custom conversion.

得嘞,官方示例果然是坑,试试大神的解决方案,手动新增下转换。

  @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
然后在生产和消费信息的地方使用他们:

@RabbitListener(queues = "merchant", containerFactory="rabbitListenerContainerFactory")
public void process(@Payload UpdateMerchant request) { 
     UpdateMerchantResponse response = new UpdateMerchantResponse();
    logger.info(request.getMerchantId() + "->" + response.getReturnCode());
 }
再来一次,果然可以了

c.l.s.m.service.MQReceiver : 00000001->null
--------------------- 
作者:子非鱼yy 
来源:CSDN 
原文:https://blog.csdn.net/ztx114/article/details/78328048 
版权声明:本文为博主原创文章,转载请附上博文链接!

这篇关于spring boot实战(番外篇)整合RabbitMQ的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1146749

相关文章

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

Spring MVC如何设置响应

《SpringMVC如何设置响应》本文介绍了如何在Spring框架中设置响应,并通过不同的注解返回静态页面、HTML片段和JSON数据,此外,还讲解了如何设置响应的状态码和Header... 目录1. 返回静态页面1.1 Spring 默认扫描路径1.2 @RestController2. 返回 html2

Spring常见错误之Web嵌套对象校验失效解决办法

《Spring常见错误之Web嵌套对象校验失效解决办法》:本文主要介绍Spring常见错误之Web嵌套对象校验失效解决的相关资料,通过在Phone对象上添加@Valid注解,问题得以解决,需要的朋... 目录问题复现案例解析问题修正总结  问题复现当开发一个学籍管理系统时,我们会提供了一个 API 接口去

Java操作ElasticSearch的实例详解

《Java操作ElasticSearch的实例详解》Elasticsearch是一个分布式的搜索和分析引擎,广泛用于全文搜索、日志分析等场景,本文将介绍如何在Java应用中使用Elastics... 目录简介环境准备1. 安装 Elasticsearch2. 添加依赖连接 Elasticsearch1. 创

Spring核心思想之浅谈IoC容器与依赖倒置(DI)

《Spring核心思想之浅谈IoC容器与依赖倒置(DI)》文章介绍了Spring的IoC和DI机制,以及MyBatis的动态代理,通过注解和反射,Spring能够自动管理对象的创建和依赖注入,而MyB... 目录一、控制反转 IoC二、依赖倒置 DI1. 详细概念2. Spring 中 DI 的实现原理三、