Spring AMPQ 的SimpleMessageListenerContainer类源码简析
因为工作中需要动态的订阅queue,完成后取消,印象中之前工作中做过的基本都是queue确定或者只订阅一个queue的,有点没头绪,google了下,发现 spring AMQP 的SimpleMessageListenerContainer类已经实现了这个功能,看了下实现原理,简单记录下。
首先要明确,一个消费者consume是可以订阅多个queue的,比如我可以同时订阅queue1和queue2,当这两个queue有消息时,都会推送给我;如果一个queue有多个consume,这个queue会把自己存的消息依次轮流分给订阅它的消费者,比如queue1有两个订阅者consumer1和consumer2,queue1里现在有10 条消息,queue1会把这10条消息依次分给consumer1和consumer2,每个人5条,第一条给consumer1,第二条给consumer2,依次轮询。
SimpleMessageListenerContainer的addQueue方法即是动态的添加queue,
@Override
public void addQueues(Queue… queue) {
super.addQueues(queue);
this.queuesChanged();
}
其中super.addQueues(queue)方法是把传的参数拷贝到一个list中,源码如下:
public void addQueues(Queue... queues) { Assert.notNull(queues, "'queues' cannot be null"); Assert.noNullElements(queues, "'queues' cannot contain null elements"); String[] queueNames = new String[queues.length]; for (int i = 0; i < queues.length; i++) { queueNames[i] = queues[i].getName(); } this.addQueueNames(queueNames); }
其中最后一句this.queueNames.addAll(Arrays.asList(queueNames));就是最后实施拷贝的行为
public void addQueueNames(String... queueNames) { Assert.notNull(queueNames, "'queueNames' cannot be null"); Assert.noNullElements(queueNames, "'queueNames' cannot contain null elements"); this.queueNames.addAll(Arrays.asList(queueNames)); }
这个queueNames是一个List
private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();
现在super.addQueues(queue)方法就完事了,下面来看重头戏
this.queuesChanged();方法
源码如下:
private void queuesChanged() { synchronized (this.consumersMonitor) { if (this.consumers != null) { int count = 0; Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator(); while (consumerIterator.hasNext()) { BlockingQueueConsumer consumer = consumerIterator.next(); if (logger.isDebugEnabled()) { logger.debug("Queues changed; stopping consumer: " + consumer); } consumer.basicCancel(true); consumerIterator.remove(); count++; } this.addAndStartConsumers(count); } } }
可以看到,用synchronized加了锁,具体逻辑就是consumers进行迭代,然后把每个consumer取消它的订阅,
consumer.basicCancel(true);
然后把它移除,(Iterator可以移除,但是不能修改元素)
consumerIterator.remove();
然后计算有多少个consumer
count++;
迭代完成后,执行下面这句再把consumer new回来(之前统计过有多少consumer)
this.addAndStartConsumers(count);
到现在大概思路应该清晰了,SimpleMessageListenerContainer用一个List<String> 来存储所有queue,当要添加queue时,就往这个List添加,然后调用queuesChanged()方法,把所有的consumer都取消订阅,然后把他们都移除,然后有多少个consumer再重新new相同数量的consumer出来,然后每个consumer都把List中的每个都都订阅,到此就完成了。
在此留一个疑问:为什么consumer要先取消订阅然后再重新订阅queue?
下面看看this.addAndStartConsumers(count)里面具体做了些什么
protected void addAndStartConsumers(int delta) {
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
for (int i = 0; i < delta; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
if (logger.isDebugEnabled()) {
logger.debug(“Starting a new consumer: ” + consumer);
}
this.taskExecutor.execute(processor);
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
try {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
this.consumers.remove(consumer);
throw new AmqpIllegalStateException(“Fatal exception on listener startup”, startupException);
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
catch (Exception e) {
consumer.stop();
logger.error(“Error starting new consumer”, e);
this.cancellationLock.release(consumer);
this.consumers.remove(consumer);
}
}
}
}
}
这里面首先创建consumer,
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
然后new了一个AsyncMessageProcessingConsumer,这个类继承了Runnable,
然后用线程池来执行任务,
this.taskExecutor.execute(processor);
在AsyncMessageProcessingConsumer的run()方法里有
this.consumer.start();
BlockingQueueConsumer的start()方法里有这几句
for (String queueName : this.queues) { if (!this.missingQueues.contains(queueName)) { consumeFromQueue(queueName);
consumeFromQueue(queueName)就是在订阅queue进行监听了
大概的逻辑到此就分析完了。