spring boot实战(第十四篇)整合RabbitMQ源码分析前言

前言

本篇开始讲述Spring Boot如何整合RabbitMQ(实际上Spring就整合了RabbitMQ)。

RabbitAdmin

在上篇中遗留AmqpAdmin没有讲解,现在来看下该部分代码
[html] 
view plain
 copy

  1. public AmqpAdmin amqpAdmin(CachingConnectionFactory connectionFactory) {  
  2.         return new RabbitAdmin(connectionFactory);  
  3.     }  

创建RabbitAdmin实例,调用构造方法
[html] 
view plain
 copy

  1. public RabbitAdmin(ConnectionFactory connectionFactory) {  
  2.     this.connectionFactory = connectionFactory;  
  3.     Assert.notNull(connectionFactory, “ConnectionFactory must not be null”);  
  4.     this.rabbitTemplate = new RabbitTemplate(connectionFactory);  
  5. }  

创建连接工厂、rabbitTemplate,其中ConnectionFactory采用上一篇中自定义bean
[html] 
view plain
 copy

  1. public ConnectionFactory connectionFactory() {  
  2.      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
  3.      connectionFactory.setAddresses(“127.0.0.1:5672”);  
  4.      connectionFactory.setUsername(“guest”);  
  5.      connectionFactory.setPassword(“guest”);  
  6.      connectionFactory.setPublisherConfirms(true); //必须要设置  
  7.      return connectionFactory;  
  8.  }  

为CachingConnectionFactory实例,其缓存模式为通道缓存
[html] 
view plain
 copy

  1. private volatile CacheMode cacheMode = CacheMode.CHANNEL;  

接下来看下RabbitAdmin类定义:
[html] 
view plain
 copy

  1. public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {  
  2. …  
  3. }  

实现接口AmqpAdmin(定义若干RabbitMQ操作父接口),这里需要强调的是InitializingBean,实现该接口则会调用afterPropertiesSet方法
[html] 
view plain
 copy

  1. public void afterPropertiesSet() {  
  2.   
  3.         synchronized (this.lifecycleMonitor) {  
  4.   
  5.             if (this.running || !this.autoStartup) {  
  6.                 return;  
  7.             }  
  8.   
  9.             if (this.connectionFactory instanceof CachingConnectionFactory &&  
  10.                     ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {  
  11.                 logger.warn(“RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION”);  
  12.                 return;  
  13.             }  
  14.   
  15.             this.connectionFactory.addConnectionListener(new ConnectionListener() {  
  16.   
  17.                 // Prevent stack overflow…  
  18.                 private final AtomicBoolean initializing = new AtomicBoolean(false);  
  19.   
  20.                 @Override  
  21.                 public void onCreate(Connection connection) {  
  22.                     if (!initializing.compareAndSet(false, true)) {  
  23.                         // If we are already initializing, we don’t need to do it again…  
  24.                         return;  
  25.                     }  
  26.                     try {  
  27.                            
  28.                         initialize();  
  29.                     }  
  30.                     finally {  
  31.                         initializing.compareAndSet(true, false);  
  32.                     }  
  33.                 }  
  34.   
  35.                 @Override  
  36.                 public void onClose(Connection connection) {  
  37.                 }  
  38.   
  39.             });  
  40.   
  41.             this.running = true;  
  42.   
  43.         }  
  44.     }  


synchronized
(
this
.
lifecycleMonitor
)加锁保证同一时间只有一个线程访问该代码,随后调用
this
.
connectionFactory
.addConnectionListener添加连接监听,各连接工厂关系:




实际调用为CachingConnectionFactory

[html] 
view plain
 copy

  1. public void addConnectionListener(ConnectionListener listener) {  
  2.         super.addConnectionListener(listener);  
  3.         // If the connection is already alive we assume that the new listener wants to be notified  
  4.         if (this.connection != null) {  
  5.             listener.onCreate(this.connection);  
  6.         }  
  7.     }  

此时connection为null,无法执行到
listener
.onCreate(
this
.
connection
); 往
CompositeConnectionListener
connectionListener中添加监听信息,最终保证在集合中

[html] 
view plain
 copy

  1. private List<ConnectionListener> delegates = new CopyOnWriteArrayList<ConnectionListener>();  




这里添加的监听代码执行,在后面调用时再来讲解。

至此~~ RabbitAdmin创建完成。 

Exchange

接下来继续来看AmqpConfig.java中的代码
[html] 
view plain
 copy

  1. @Bean  
  2.   public DirectExchange defaultExchange() {  
  3.       return new DirectExchange(EXCHANGE);  
  4.   }  

以上代码创建一个交换机,交换机类型为direct

在申明交换机时需要指定交换机名称,默认创建可持久交换机

Queue


[html] 
view plain
 copy

  1. public Queue queue() {  
  2.        return new Queue(“spring-boot-queue”, true); //队列持久  
  3.    }  

默认创建可持久队列

Binding


[html] 
view plain
 copy

  1. @Bean  
  2.    public Binding binding() {  
  3.        return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
  4.    }  


BindingBuilder.bind(queue()) 实现为:

[html] 
view plain
 copy

  1. public static DestinationConfigurer bind(Queue queue) {  
  2.         return new DestinationConfigurer(queue.getName(), DestinationType.QUEUE);  
  3.     }  


DestinationConfigurer通过name、type区分不同配置信息,其to()方法为重载方法,传递参数为四种交换机,分别返回Xxx
ExchangeRoutingKeyConfigurer,其中with方法返回Bingding实例,因此在Binding信息中存储了
队列、交换机、路由key等相关信息

[html] 
view plain
 copy

  1. public class Binding extends AbstractDeclarable {  
  2.   
  3.     public static enum DestinationType {  
  4.         QUEUE, EXCHANGE;  
  5.     }  
  6.   
  7.     private final String destination;  
  8.   
  9.     private final String exchange;  
  10.   
  11.     private final String routingKey;  
  12.   
  13.     private final Map<String, Object> arguments;  
  14.   
  15.     private final DestinationType destinationType;  
  16. …  
  17. }  

以上信息理解都非常简单,下面来看比较复杂点的
SimpleMessageListenerContainer


SimpleMessageListenerContainer


[html] 
view plain
 copy

  1. @Bean  
  2.     public SimpleMessageListenerContainer messageContainer() {  
  3.         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
  4.         container.setQueues(queue());  
  5.         container.setExposeListenerChannel(true);  
  6.         container.setMaxConcurrentConsumers(1);  
  7.         container.setConcurrentConsumers(1);  
  8.         container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
  9.         container.setMessageListener(new ChannelAwareMessageListener() {  
  10.   
  11.             @Override  
  12.             public void onMessage(Message message, Channel channel) throws Exception {  
  13.                 byte[] body = message.getBody();  
  14.                 System.out.println(“receive msg : ” + new String(body));  
  15.                 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
  16.             }  
  17.         });  
  18.         return container;  
  19.     }  

查看其实现的接口,注意SmartLifecycle

接下来设置队列信息,在AbstractMessageListenerContainer
[html] 
view plain
 copy

  1. private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();  

添加队列信息
AbstractMessageListenerContainer#
exposeListenerChannel设置为true


[html] 
view plain
 copy

  1. container.setMaxConcurrentConsumers(1);  
  2. container.setConcurrentConsumers(1);  

设置并发消费者数量,默认情况为1

[html] 
view plain
 copy

  1. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  

设置消费者成功消费消息后确认模式,分为两种

  • 自动模式,默认模式,在RabbitMQ Broker消息发送到消费者后自动删除
  • 手动模式,消费者客户端显示编码确认消息消费完成,Broker给生产者发送回调,消息删除

接下来设置消费者端消息监听,为private
volatile
 Object
messageListener 赋值

到这里消息监听容器也创建完成了,但令人纳闷的时,消费者如何去消费消息呢?从这里完全看不出来。那么接下来看下SmartLifecycle接口

SmartLifecycle

熟悉Spring都应该知道该接口,其定义为:
[html] 
view plain
 copy

  1. public interface SmartLifecycle extends Lifecycle, Phased {  
  2.   
  3.     boolean isAutoStartup();  
  4.     void stop(Runnable callback);  
  5.   
  6. }  

其中的isAutoStartup设置为true时,会自动调用Lifecycle接口中的start方法,既然我们为源码分析,也简单看下这个聪明的声明周期接口是如何实现它的聪明方法的


spring boot实战(第十篇)Spring boot Bean加载源码分析中讲到执行Bean加载时,调用AbstractApplicationContext#refresh(),其中存在一个方法调用finishRefresh()
[html] 
view plain
 copy

  1. protected void finishRefresh() {  
  2.     // Initialize lifecycle processor for this context.  
  3.     initLifecycleProcessor();  
  4.   
  5.     // Propagate refresh to lifecycle processor first.  
  6.     getLifecycleProcessor().onRefresh();  
  7.   
  8.     // Publish the final event.  
  9.     publishEvent(new ContextRefreshedEvent(this));  
  10.   
  11.     // Participate in LiveBeansView MBean, if active.  
  12.     LiveBeansView.registerApplicationContext(this);  
  13. }  

其中initLifecycleProcessor初始化生命周期处理器,

[html] 
view plain
 copy

  1. protected void initLifecycleProcessor() {  
  2.     ConfigurableListableBeanFactory beanFactory = getBeanFactory();  
  3.     if (beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)) {  
  4.         this.lifecycleProcessor =  
  5.                 beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME, LifecycleProcessor.class);  
  6.         if (logger.isDebugEnabled()) {  
  7.             logger.debug(“Using LifecycleProcessor [” + this.lifecycleProcessor + “]”);  
  8.         }  
  9.     }  
  10.     else {  
  11.         DefaultLifecycleProcessor defaultProcessor = new DefaultLifecycleProcessor();  
  12.         defaultProcessor.setBeanFactory(beanFactory);  
  13.         this.lifecycleProcessor = defaultProcessor;  
  14.         beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME, this.lifecycleProcessor);  
  15.         if (logger.isDebugEnabled()) {  
  16.             logger.debug(“Unable to locate LifecycleProcessor with name ‘” +  
  17.                     LIFECYCLE_PROCESSOR_BEAN_NAME +  
  18.                     “‘: using default [” + this.lifecycleProcessor + “]”);  
  19.         }  
  20.     }  
  21. }  

注册DefaultLifecycleProcessor对应bean

getLifecycleProcessor().onRefresh()调用DefaultLifecycleProcessor中方法onRefresh,调用startBeans(true)

[html] 
view plain
 copy

  1. private void startBeans(boolean autoStartupOnly) {  
  2.     Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();  
  3.     Map<Integer, LifecycleGroup> phases = new HashMap<Integer, LifecycleGroup>();  
  4.     for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) {  
  5.         Lifecycle bean = entry.getValue();  
  6.         if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {  
  7.             int phase = getPhase(bean);  
  8.             LifecycleGroup group = phases.get(phase);  
  9.             if (group == null) {  
  10.                 group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);  
  11.                 phases.put(phase, group);  
  12.             }  
  13.             group.add(entry.getKey(), bean);  
  14.         }  
  15.     }  
  16.     if (phases.size() > 0) {  
  17.         List<Integer> keys = new ArrayList<Integer>(phases.keySet());  
  18.         Collections.sort(keys);  
  19.         for (Integer key : keys) {  
  20.             phases.get(key).start();  
  21.         }  
  22.     }  
  23. }  

其中

[html] 
view plain
 copy

  1. Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();  

获取所有实现Lifecycle接口bean,执行bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup()判断,如果bean同时也为Phased实例,则加入到LifecycleGroup中,随后phases.get(key).start()调用start方法

接下来要做的事情就很明显:要了解消费者具体如何实现,查看SimpleMessageListenerContainer中的start是如何实现的。

至此~~整合RabbitMQ源码分析准备工作完成,下一篇中正式解读消费者的实现。



本文转自http://blog.csdn.net/liaokailin/article/details/49559951

    原文作者:Spring Boot
    原文地址: https://blog.csdn.net/tongtong_use/article/details/78651796
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞