32. 消息传递
Spring框架为与消息传递系统集成提供了广泛的支持,从使用JmsTemplate
简化的JMS API到使用完整的基础设施异步接收消息,Spring AMQP为高级消息队列协议提供了类似的特性集。Spring Boot还为RabbitTemplate
和RabbitMQ提供自动配置选项,Spring WebSocket原生包括对STOMP消息的支持,Spring Boot通过启动器和少量的自动配置支持这一点,Spring Boot还支持Apache Kafka。
32.1 JMS
javax.jms.ConnectionFactory
接口提供了创建javax.jms.Connection
与JMS代理交互的标准方法,尽管Spring需要一个ConnectionFactory
来处理JMS,你通常不需要自己直接使用它,而是可以依赖于更高级别的消息传递抽象。(详见Spring Framework参考文档的相关部分。)Spring Boot还自动配置发送和接收消息所需的基础设施。
32.1.1 ActiveMQ支持
当ActiveMQ在类路径上可用时,Spring Boot还可以配置ConnectionFactory
,如果代理存在,则会自动启动和配置嵌入式代理(只要没有通过配置指定代理URL)。
如果你使用
spring-boot-starter-activemq
,那么将提供连接或嵌入ActiveMQ实例所需的依赖项,与JMS集成的Spring基础设施也是一样。
ActiveMQ配置由在spring.activemq.*
中的外部配置属性控制,例如,你可以在application.properties
中声明以下部分:
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
你还可以通过向org.apache.activemq:activemq-pool
添加一个依赖项来共享JMS资源并相应地配置PooledConnectionFactory
,如下例所示:
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
有关更多支持的选项,请参见
ActiveMQProperties,你还可以注册一个任意数量的实现
activemqconnectionfactorycustomzer
的bean进行更高级的自定义。
默认情况下,ActiveMQ创建一个目的地,如果它还不存在,那么目的地将根据它们提供的名称解析。
32.1.2 Artemis支持
Spring Boot可以自动配置ConnectionFactory
,当它检测到在类路径上可用的Artemis时,如果代理存在,则自动启动和配置嵌入式代理(除非模式属性已显式设置),所支持的模式是embedded
(要明确地说明需要一个嵌入式代理,如果代理在类路径上不可用,就会出现错误)和native
(使用netty
传输协议连接到代理),在配置后者时,Spring Boot使用默认设置配置连接到运行在本地机器上的代理的ConnectionFactory
。
如果你使用
spring-boot-starter-artemis
,则提供了连接到现有的Artemis实例的必要依赖项,以及与JMS集成的Spring基础设施,添加
org.apache.activemq:artemis-jms-server
到你的应用程序以允许你使用嵌入式模式。
Artemis配置由spring.artemis.*
的外部配置属性控制,例如,你可以在application.properties
中声明以下部分:
spring.artemis.mode=native
spring.artemis.host=192.168.1.210
spring.artemis.port=9876
spring.artemis.user=admin
spring.artemis.password=secret
在嵌入代理时,你可以选择是否启用持久性,并列出应该可用的目的地,可以将它们指定为逗号分隔的列表,以使用默认选项创建它们,或者可以定义类型为org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
或org.apache.activemq.artemis.jms.server.config.TopicConfiguration
的bean,分别用于高级队列和主题配置。
有关更多受支持的选项,请参阅ArtemisProperties。
不涉及JNDI查找,目的地根据它们的名称进行解析,使用Artemis配置中的name
属性或通过配置提供的名称。
32.1.3 使用JNDI ConnectionFactory
如果你正在应用服务器中运行应用程序,Spring Boot试图使用JNDI定位JMS ConnectionFactory
,默认情况下,检查java:/JmsXA
和java:/XAConnectionFactory
位置,如果需要指定替代位置,可以使用spring.jms.jndi-name
属性,如下例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
32.1.4 发送消息
Spring的JmsTemplate是自动配置的,你可以将其自动连接到你自己的bean中,如下面的示例所示:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
@Autowired
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// ...
}
JmsMessagingTemplate可以以类似的方式注入,如果定义了
DestinationResolver
或
MessageConverter
bean,则它将自动关联到自动配置的
JmsTemplate
。
32.1.5 接收消息
当出现JMS基础设施时,可以使用@JmsListener
注解任何bean,以创建监听器端点,如果没有定义JmsListenerContainerFactory
,则会自动配置默认工厂,如果定义了DestinationResolver
或MessageConverter
bean,则它将自动关联到默认工厂。
默认情况下,默认工厂是事务性的,如果你运行的基础设施中存在JtaTransactionManager
,那么它默认与侦听器容器相关联,如果没有,则启用sessionTransacted
标志。在后一个场景中,你可以通过在监听器方法(或委托)上添加@Transactional
,将本地数据存储事务与接收消息的处理相关联,这确保在本地事务完成之后,传入消息得到确认,这还包括发送在相同JMS会话上执行的响应消息。
以下组件在someQueue
目的地上创建监听器端点:
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
有关更多细节,请参见
@EnableJms的Javadoc。
如果你需要创建更多的JmsListenerContainerFactory
实例,或者希望重写默认的实例,Spring Boot提供了一个DefaultJmsListenerContainerFactoryConfigurer
,你可以使用它来初始化一个DefaultJmsListenerContainerFactory
,其设置与自动配置的工厂相同。
例如,下面的示例公开了另一个使用特定MessageConverter
的工厂:
@Configuration
static class JmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory());
factory.setMessageConverter(myMessageConverter());
return factory;
}
}
然后你可以在任何@JmsListener
注解的方法中使用工厂,如下所示:
32.2 AMQP
高级消息队列协议(AMQP)是面向消息的中间件的一种平台无关的、有线级别的协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发,Spring Boot为通过RabbitMQ使用AMQP提供了一些方便,包括spring-boot-starter-amqp
“启动器”。
32.2.1 RabbitMQ支持
RabbitMQ是一个轻量级的、可靠的、可伸缩的、可移植的消息代理,基于AMQP协议,Spring使用RabbitMQ
通过AMQP协议进行通信。
RabbitMQ配置由spring.rabbitmq.*
的外部配置属性控制,例如,你可以在application.properties
中声明以下部分:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
如果上下文中存在ConnectionNameStrategy
bean,那么它将自动用于命名由自动配置的ConnectionFactory
创建的连接。有关更多受支持的选项,请参阅RabbitProperties。
有关详细信息,请参阅
RabbitMQ使用的协议AMQP
32.2.2 发送消息
Spring的AmqpTemplate
和AmqpAdmin
是自动配置的,你可以将它们自动连接到你自己的bean中,如下面的示例所示:
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
@Autowired
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
}
RabbitMessagingTemplate可以以类似的方式注入,如果定义了
MessageConverter
bean,它会自动关联到自动配置的
AmqpTemplate
。
如果有必要,任何定义为bean的org.springframework.amqp.core.Queue
自动用于在RabbitMQ实例上声明相应的队列。
重试操作,可以对AmqpTemplate
启用重试(例如,如果代理连接丢失了),默认情况下禁用重试。
32.2.3 接收消息
当Rabbit基础设施存在时,可以使用@RabbitListener
对任何bean进行注解,以创建监听器端点,如果没有定义RabbitListenerContainerFactory
,则会自动配置默认的SimpleRabbitListenerContainerFactory
,你可以使用spring.rabbitmq.listener.type
属性切换到直接容器。如果定义了MessageConverter
或MessageRecoverer
bean,则它将自动与默认工厂相关联。
以下示例组件在someQueue
队列上创建监听器端点:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
有关更多细节,请参见
@EnableRabbit的Javadoc。
如果你需要创建更多的RabbitListenerContainerFactory
实例,或者你想要覆盖缺省值,Spring Boot提供了一个SimpleRabbitListenerContainerFactoryConfigurer
和一个DirectRabbitListenerContainerFactoryConfigurer
,你可以使用它们初始化一个SimpleRabbitListenerContainerFactory
,以及一个DirectRabbitListenerContainerFactory
,其设置与自动配置使用的工厂相同。
选择哪种容器类型并不重要,这两个bean通过自动配置公开。
例如,下面的configuration类公开另一个使用特定MessageConverter
的工厂:
@Configuration
static class RabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(myMessageConverter());
return factory;
}
}
然后你可以在任何@RabbitListener
注解的方法中使用工厂,如下所示:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory="myFactory")
public void processMessage(String content) {
// ...
}
}
你可以启用重试来处理监听器抛出异常的情况,默认情况下,使用RejectAndDontRequeueRecoverer
,但是可以定义自己的MessageRecoverer
,当重试结束时,消息将被拒绝,如果将代理配置为这样做,则消息将被删除或路由到死信交换,默认情况下,重试被禁用。
重要
默认情况下,如果重试被禁用并且监听器抛出异常,该递送被无限期地重试,你可以用两种方式修改此行为:将
defaultRequeueRejected
属性设置为
false
,以便尝试零重复发送,或者抛出
AmqpRejectAndDontRequeueException
来通知消息应该被拒绝,后者是在启用重试并达到最大提交尝试次数时使用的机制。
32.3 Apache Kafka支持
通过提供spring-kafka
项目的自动配置来支持Apache Kafka。
Kafka配置由spring.kafka.*
中的外部配置属性控制,例如,你可以在application.properties
中声明以下部分:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
要在启动时创建主题,请添加一个类型
NewTopic
的bean,如果主题已经存在,则忽略bean。
有关更多受支持的选项,请参阅KafkaProperties。
32.3.1 发送消息
Spring的KafkaTemplate
是自动配置的,你可以直接在你自己的bean中自动连接它,如下面的示例所示:
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
如果定义了一个
RecordMessageConverter
bean,它将自动关联到自动配置的
KafkaTemplate
32.3.2 接收消息
当存在Apache Kafka基础设施时,任何bean都可以使用@KafkaListener
进行注解,以创建监听器端点,如果没有定义KafkaListenerContainerFactory
,默认设置为使用spring.kafka.listener.*
中定义的键,此外,如果定义了一个RecordMessageConverter
bean,它将自动关联到默认的工厂。
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
32.3.3 附加的Kafka属性
自动配置所支持的属性显示在附录A中,通用的应用程序属性。注意,在大多数情况下,这些属性(连字符或驼峰式大小写)直接映射到Apache Kafka *属性,有关详细信息,请参阅Apache Kafka文档。
前几个属性同时适用于生产者和消费者,但是如果你希望对每个属性使用不同的值,可以在生产者或消费者级别指定,Apache Kafka设计具有高、中或低重要性的属性,Spring Boot自动配置支持所有重要属性、一些选定的中属性和低属性,以及任何没有默认值的属性。
Kafka支持的属性中只有一部分是可以通过KafkaProperties
类获得的,如果你希望为生产者或消费者配置不受直接支持的其他属性,请使用以下属性:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
这将设置通用的prop.one
Kafka属性为first
(适用于生产者、消费者和管理员),prop.two
管理员属性为second
,prop.three
消费者属性为third
并且prop.four
生产者属性为fourth
。
你还可以配置Spring Kafka JsonDeserializer
,如下所示:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
同样,可以禁用JsonSerializer
在header中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。