Spring AMPQ 的SimpleMessageListenerContainer 源码简析

                Spring AMPQ 的SimpleMessageListenerContainer类源码简析

     因为工作中需要动态的订阅queue,完成后取消,印象中之前工作中做过的基本都是queue确定或者只订阅一个queue的,有点没头绪,google了下,发现 spring AMQP 的SimpleMessageListenerContainer类已经实现了这个功能,看了下实现原理,简单记录下。

    首先要明确,一个消费者consume是可以订阅多个queue的,比如我可以同时订阅queue1和queue2,当这两个queue有消息时,都会推送给我;如果一个queue有多个consume,这个queue会把自己存的消息依次轮流分给订阅它的消费者,比如queue1有两个订阅者consumer1和consumer2,queue1里现在有10 条消息,queue1会把这10条消息依次分给consumer1和consumer2,每个人5条,第一条给consumer1,第二条给consumer2,依次轮询。

   SimpleMessageListenerContainer的addQueue方法即是动态的添加queue,

@Override
public void addQueues(Queue… queue) {
  
super.addQueues(queue);
   this
.queuesChanged();
}

其中super.addQueues(queue)方法是把传的参数拷贝到一个list中,源码如下:

public void addQueues(Queue... queues) {

   Assert.notNull(queues, "'queues' cannot be null");

   Assert.noNullElements(queues, "'queues' cannot contain null elements");

   String[] queueNames = new String[queues.length];

   for (int i = 0; i < queues.length; i++) {

      queueNames[i] = queues[i].getName();

   }

   this.addQueueNames(queueNames);

}

其中最后一句this.queueNames.addAll(Arrays.asList(queueNames));就是最后实施拷贝的行为

public void addQueueNames(String... queueNames) {

   Assert.notNull(queueNames, "'queueNames' cannot be null");

   Assert.noNullElements(queueNames, "'queueNames' cannot contain null elements");

   this.queueNames.addAll(Arrays.asList(queueNames));

}

这个queueNames是一个List

private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();

现在super.addQueues(queue)方法就完事了,下面来看重头戏

this.queuesChanged();方法

源码如下:

private void queuesChanged() {

   synchronized (this.consumersMonitor) {

      if (this.consumers != null) {

         int count = 0;

         Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();

         while (consumerIterator.hasNext()) {

            BlockingQueueConsumer consumer = consumerIterator.next();

            if (logger.isDebugEnabled()) {

               logger.debug("Queues changed; stopping consumer: " + consumer);

            }

            consumer.basicCancel(true);

            consumerIterator.remove();

            count++;

         }

         this.addAndStartConsumers(count);

      }

   }

}

可以看到,用synchronized加了锁,具体逻辑就是consumers进行迭代,然后把每个consumer取消它的订阅,

consumer.basicCancel(true);

然后把它移除,(Iterator可以移除,但是不能修改元素)

consumerIterator.remove();

然后计算有多少个consumer

count++;

迭代完成后,执行下面这句再把consumer  new回来(之前统计过有多少consumer)

this.addAndStartConsumers(count);

 

到现在大概思路应该清晰了,SimpleMessageListenerContainer用一个List<String> 来存储所有queue,当要添加queue时,就往这个List添加,然后调用queuesChanged()方法,把所有的consumer都取消订阅,然后把他们都移除,然后有多少个consumer再重新new相同数量的consumer出来,然后每个consumer都把List中的每个都都订阅,到此就完成了。

 

在此留一个疑问:为什么consumer要先取消订阅然后再重新订阅queue?

 

下面看看this.addAndStartConsumers(count)里面具体做了些什么

 protected void addAndStartConsumers(int delta) {
  
synchronized (this.consumersMonitor) {
     
if (this.consumers != null) {
        
for (int i = 0; i < delta; i++) {
            BlockingQueueConsumer consumer = createBlockingQueueConsumer()
;
            this
.consumers.add(consumer);
           
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            if
(logger.isDebugEnabled()) {
              
logger.debug(“Starting a new consumer: ” + consumer);
           
}
           
this.taskExecutor.execute(processor);
            if
(this.applicationEventPublisher != null) {
              
this.applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer));
           
}
           
try {
               FatalListenerStartupException startupException = processor.getStartupException()
;
               if
(startupException != null) {
                 
this.consumers.remove(consumer);
                  throw new
AmqpIllegalStateException(“Fatal exception on listener startup”, startupException);
              
}
            }
           
catch (InterruptedException ie) {
               Thread.currentThread().interrupt()
;
           
}
           
catch (Exception e) {
               consumer.stop()
;
              
logger.error(“Error starting new consumer”, e);
               this
.cancellationLock.release(consumer);
               this
.consumers.remove(consumer);
           
}
         }
      }
   }
}

 

这里面首先创建consumer,

BlockingQueueConsumer consumer = createBlockingQueueConsumer();

然后new了一个AsyncMessageProcessingConsumer,这个类继承了Runnable,

然后用线程池来执行任务,

this.taskExecutor.execute(processor);

在AsyncMessageProcessingConsumer的run()方法里有

this.consumer.start();

BlockingQueueConsumer的start()方法里有这几句

for (String queueName : this.queues) {

   if (!this.missingQueues.contains(queueName)) {

      consumeFromQueue(queueName);

consumeFromQueue(queueName)就是在订阅queue进行监听了 

大概的逻辑到此就分析完了。

 

 

    原文作者:Spring Boot
    原文地址: https://blog.csdn.net/yangbo10086/article/details/81322139
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞