1. 前言
Spring AMQP項目將其核心Spring概念應(yīng)用于基于AMQP消息解決方案的開發(fā)中.我們提供了一個發(fā)送和接收消息的高級抽象模板.同時,我們也提供了消息驅(qū)動POJO的支持.這些包有助于AMQP資源的管理,從而提升依賴注入和聲明式配置的使用. 在所有這些情況中,你會發(fā)現(xiàn)與Spring Framework對JMS支持是相似的. 對于其它項目相關(guān)的信息,可訪問Spring AMQP項目主頁
2. 介紹
本參考文檔的第一部分是對Spring AMQP的高層次以及底層的概念的講述,還有一些可使你盡快上手運行的代碼片斷.
2.1 快速瀏覽
2.1.1 介紹
5分鐘開啟Spring AMQP的旅行.
前提:
安裝和運行RabbitMQ broker (http://www.rabbitmq.com/download.html). 然后獲取spring-rabbit JAR 以及其依賴包 - 最簡單的方式是在你的構(gòu)建工具中聲明依賴,如. 對于Maven來說:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.6.0.RELEASE</version></dependency>
對于gradle:
compile 'org.springframework.amqp:spring-rabbit:1.6.0.RELEASE'
兼容性
由于默認的Spring Framework 版本依賴是4.2.x, Spring AMQP 一般來說可兼容早期版本的Spring Framework.但基于注解的監(jiān)聽器和RabbitMessagingTemplate
需要Spring Framework 4.1+.
類似地,默認的amqp-client
版本是3.6.x,但framework 兼容3.4.0+.但依賴于新客戶端版本的功能將無法使用.注意,這里指的是java client library; 一般來說,對于老版本的broker也可以工作.
非常非常快速
使用純java來發(fā)送和接收消息:ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
注意,在原生的Java Rabbit client中也有一個ConnectionFactory
. 在上面的代碼中,我們使用的是Spring 抽象. 我們依賴的是broker中的默認交換器 (因為在發(fā)送操作中沒有指定交換器),以及默認綁定(通過其名稱作為路由鍵將所有隊列綁定到默認交換器中).那些行為是由AMQP規(guī)范來定義的.
使用 XML配置
同上面的例子一樣,只不過將外部資源配置到了XML:
ApplicationContext context =new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd"><rabbit:connection-factory id="connectionFactory"/><rabbit:template id="amqpTemplate"connection-factory="connectionFactory"/><rabbit:admin connection-factory="connectionFactory"/><rabbit:queue name="myqueue"/></beans>
<rabbit:admin/>
聲明會默認自動查找類型為Queue
,Exchange
和Binding的bean,
并宣稱他們代表的broker的用戶, 因此在簡單的Java driver中沒有必要明確的使用那個bean.
有大量的選項來配置XML schema 中的組件屬性- 你可以使用xml編輯的自動完成功能來探索它們并查看它們的相關(guān)文檔.
使用 Java 配置
同樣的例子可使用java來外部配置:ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
returnnew CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
returnnew RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
returnnew RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
returnnew Queue("myqueue");
}
}
2.2 新特性
2.2.1從1.5到1.6的變化
測試支持
提供了一個新的測試支持包. 參考Section 3.4, “Testing Support” 來了解更多信息.
提供了一個新的測試支持包. 參考Section 3.4, “Testing Support” 來了解更多信息.
Builder
現(xiàn)在Builders提供了更方便的API來配置Queue
和 Exchange
對象. 參考the section called “Builder API for Queues and Exchanges” 來了解更多信息.
現(xiàn)在Builders提供了更方便的API來配置Queue
和 Exchange
對象. 參考the section called “Builder API for Queues and Exchanges” 來了解更多信息.
命名空間變化
連接工廠
為連接工廠bean聲明增加了thread-factory
, 例如,以amqp-client包創(chuàng)建的線程命名.參考Section 3.1.2, “Connection and Resource Management” 來了解更多信息.
當(dāng)使用CacheMode.CONNECTION
時,現(xiàn)在你可以限制允許連接的總數(shù)了. 參考Section 3.1.2, “Connection and Resource Management” 來了解更多信息.
為連接工廠bean聲明增加了thread-factory
, 例如,以amqp-client包創(chuàng)建的線程命名.參考Section 3.1.2, “Connection and Resource Management” 來了解更多信息.
當(dāng)使用CacheMode.CONNECTION
時,現(xiàn)在你可以限制允許連接的總數(shù)了. 參考Section 3.1.2, “Connection and Resource Management” 來了解更多信息.
Queue 定義
現(xiàn)在為匿名隊列提供了命名策略; 參考 the section called “AnonymousQueue” 來了解更多信息.
現(xiàn)在為匿名隊列提供了命名策略; 參考 the section called “AnonymousQueue” 來了解更多信息.
監(jiān)聽器容器變化
空閑消息監(jiān)聽器檢測
當(dāng)空閑時,現(xiàn)在可以配置監(jiān)聽器容器來發(fā)布ApplicationEvent
事件了. 參考the section called “Detecting Idle Asynchronous Consumers”來了解更多信息.
當(dāng)空閑時,現(xiàn)在可以配置監(jiān)聽器容器來發(fā)布ApplicationEvent
事件了. 參考the section called “Detecting Idle Asynchronous Consumers”來了解更多信息.
不匹配的隊列檢測
默認情況下,當(dāng)監(jiān)聽器容器啟動時,如果探測到了隊列的mismatched屬性或參數(shù),容器會記錄異常但會繼續(xù)監(jiān)聽.現(xiàn)在容器有了一個mismatchedQueuesFatal
屬性,當(dāng)啟動時存在問題的話,這可以阻止容器(以及上下文)啟動.如果隨后檢測到問題,它也會停止容器,如從連接故障中恢復(fù). 參考Section 3.1.15, “Message Listener Container Configuration” 來了解更多信息.
默認情況下,當(dāng)監(jiān)聽器容器啟動時,如果探測到了隊列的mismatched屬性或參數(shù),容器會記錄異常但會繼續(xù)監(jiān)聽.現(xiàn)在容器有了一個mismatchedQueuesFatal
屬性,當(dāng)啟動時存在問題的話,這可以阻止容器(以及上下文)啟動.如果隨后檢測到問題,它也會停止容器,如從連接故障中恢復(fù). 參考Section 3.1.15, “Message Listener Container Configuration” 來了解更多信息.
監(jiān)聽器容器日志
現(xiàn)在監(jiān)聽器容器提供了它的beanName
給內(nèi)部 SimpleAsyncTaskExecutor
作為threadNamePrefix
.對于日志分析非常有用.
現(xiàn)在監(jiān)聽器容器提供了它的beanName
給內(nèi)部 SimpleAsyncTaskExecutor
作為threadNamePrefix
.對于日志分析非常有用.
默認錯誤處理器
默認錯誤處理器(ConditionalRejectingErrorHandler
) 現(xiàn)在認為無可挽救的@RabbitListener
異常是致命的. 參考Section 3.1.13, “Exception Handling” 來了解更多信息.
默認錯誤處理器(ConditionalRejectingErrorHandler
) 現(xiàn)在認為無可挽救的@RabbitListener
異常是致命的. 參考Section 3.1.13, “Exception Handling” 來了解更多信息.
AutoDeclare 與 RabbitAdmins
參考 Section 3.1.15, “Message Listener Container Configuration” (autoDeclare
) 來查看在應(yīng)用程序上下文使用RabbitAdmin
s的語法變化.
參考 Section 3.1.15, “Message Listener Container Configuration” (autoDeclare
) 來查看在應(yīng)用程序上下文使用RabbitAdmin
s的語法變化.
AmqpTemplate: receive 與 timeout
AmqpTemplate
和它的RabbitTemplate
實現(xiàn)引入了
許多新的帶有timeout的receive()
方法. 參考the section called “Polling Consumer” 來了解更多信息.
AmqpTemplate
和它的RabbitTemplate
實現(xiàn)引入了
許多新的帶有timeout的receive()
方法. 參考the section called “Polling Consumer” 來了解更多信息.
AsyncRabbitTemplate
引入了新的AsyncRabbitTemplate
. 此模塊提供了許多發(fā)送和接收的方法,其返回值為ListenableFuture
,此后可通過它來同步或異步地獲取結(jié)果. 參考 the section called “AsyncRabbitTemplate” 來了解更多信息
引入了新的AsyncRabbitTemplate
. 此模塊提供了許多發(fā)送和接收的方法,其返回值為ListenableFuture
,此后可通過它來同步或異步地獲取結(jié)果. 參考 the section called “AsyncRabbitTemplate” 來了解更多信息
RabbitTemplate 變化
1.4.1 在broker支持時,引入了Direct reply-to 的能力;它比使用臨時隊列來回應(yīng)更高效. 這個版本允許你覆蓋這種默認行為,通常設(shè)置useTemporaryReplyQueues屬性為true來使用臨時隊列. 參考 the section called “RabbitMQ Direct reply-to” 來了解更多信息.
RabbitTemplate
現(xiàn)在支持user-id-expression
(userIdExpression
當(dāng)使用Java配置時). 參考See Validated User-ID RabbitMQ documentationand the section called “Validated User Id” 來了解更多信息.
1.4.1 在broker支持時,引入了Direct reply-to 的能力;它比使用臨時隊列來回應(yīng)更高效. 這個版本允許你覆蓋這種默認行為,通常設(shè)置useTemporaryReplyQueues屬性為true來使用臨時隊列. 參考 the section called “RabbitMQ Direct reply-to” 來了解更多信息.
RabbitTemplate
現(xiàn)在支持user-id-expression
(userIdExpression
當(dāng)使用Java配置時). 參考See Validated User-ID RabbitMQ documentationand the section called “Validated User Id” 來了解更多信息.
消息屬性
CorrelationId
correlationId
消息屬性現(xiàn)在可以是字符串了.參考the section called “Message Properties Converters” 來了解更多信息.
correlationId
消息屬性現(xiàn)在可以是字符串了.參考the section called “Message Properties Converters” 來了解更多信息.
長字符串頭
以前,DefaultMessagePropertiesConverter
會將頭轉(zhuǎn)換成(其頭長度不超過長字符串限制(默認為1024) )成一個DataInputStream
(實際上它只是引用LongString的
DataInputStream
).
在輸出時,這個頭不會進行轉(zhuǎn)換(除了字符串, 例如. 在java.io.DataInputStream@1d057a39
上調(diào)用toString()方法)
.
在這個版本中, Long LongString
默認作為 LongString
s ;你可以通過其getBytes[]
, toString()
, 或 getStream()
方法來訪問它的內(nèi)容. 更大的輸入LongString
現(xiàn)在也會在輸出上正確轉(zhuǎn)換了.
參考 the section called “Message Properties Converters” 來了解更多信息.
以前,DefaultMessagePropertiesConverter
會將頭轉(zhuǎn)換成(其頭長度不超過長字符串限制(默認為1024) )成一個DataInputStream
(實際上它只是引用LongString的
DataInputStream
).
在輸出時,這個頭不會進行轉(zhuǎn)換(除了字符串, 例如. 在java.io.DataInputStream@1d057a39
上調(diào)用toString()方法)
.
在這個版本中, Long LongString
默認作為 LongString
s ;你可以通過其getBytes[]
, toString()
, 或 getStream()
方法來訪問它的內(nèi)容. 更大的輸入LongString
現(xiàn)在也會在輸出上正確轉(zhuǎn)換了.
參考 the section called “Message Properties Converters” 來了解更多信息.
Inbound Delivery Mode
deliveryMode
屬性不再映射MessageProperties.deliveryMode
;這是為了避免意外傳播,如果同一個MessageProperties
對象用來發(fā)送出站消息.
相反, 入站deliveryMode
頭映射為MessageProperties.receivedDeliveryMode
.
參考 the section called “Message Properties Converters” 來了解更多信息.
當(dāng)使用注解endpoints時,頭將在名為AmqpHeaders.RECEIVED_DELIVERY_MODE
的頭中提供
.
參考the section called “Annotated Endpoint Method Signature” 來了解更多信息.
deliveryMode
屬性不再映射MessageProperties.deliveryMode
;這是為了避免意外傳播,如果同一個MessageProperties
對象用來發(fā)送出站消息.
相反, 入站deliveryMode
頭映射為MessageProperties.receivedDeliveryMode
.
參考 the section called “Message Properties Converters” 來了解更多信息.
當(dāng)使用注解endpoints時,頭將在名為AmqpHeaders.RECEIVED_DELIVERY_MODE
的頭中提供
.
參考the section called “Annotated Endpoint Method Signature” 來了解更多信息.
Inbound User ID
user_id
屬性不再映射MessageProperties.userId
; 這是為了避免意外傳播,如果同一個MessageProperties
對象用來發(fā)送出站消息.相反, 入站userId
頭會映射到MessageProperties.receivedUserId
.
參考 the section called “Message Properties Converters” 來了解更多信息.
當(dāng)使用注解endpoints時,頭將在名為AmqpHeaders.RECEIVED_USER_ID的頭中提供
.
參考 the section called “Annotated Endpoint Method Signature” 來了解更多信息.
user_id
屬性不再映射MessageProperties.userId
; 這是為了避免意外傳播,如果同一個MessageProperties
對象用來發(fā)送出站消息.相反, 入站userId
頭會映射到MessageProperties.receivedUserId
.
參考 the section called “Message Properties Converters” 來了解更多信息.
當(dāng)使用注解endpoints時,頭將在名為AmqpHeaders.RECEIVED_USER_ID的頭中提供
.
參考 the section called “Annotated Endpoint Method Signature” 來了解更多信息.
RabbitAdmin 變化
Declaration Failures
之間,ignoreDeclarationFailures
標(biāo)志只在channel上發(fā)生IOException
才會生效(如未匹配參數(shù)). 現(xiàn)在它對于任何異常都可以生效(如:TimeoutException
).
此外,無論何時聲明失敗,都會發(fā)布DeclarationExceptionEvent
.RabbitAdmin
最后聲明事件將作為屬性lastDeclarationExceptionEvent也是可用的.
參考 Section 3.1.10, “Configuring the broker” 來了解更多信息.
之間,ignoreDeclarationFailures
標(biāo)志只在channel上發(fā)生IOException
才會生效(如未匹配參數(shù)). 現(xiàn)在它對于任何異常都可以生效(如:TimeoutException
).
此外,無論何時聲明失敗,都會發(fā)布DeclarationExceptionEvent
.RabbitAdmin
最后聲明事件將作為屬性lastDeclarationExceptionEvent也是可用的.
參考 Section 3.1.10, “Configuring the broker” 來了解更多信息.
@RabbitListener Changes
Multiple Containers per Bean
當(dāng)使用Java 8+,可以添加多個@RabbitListener
注解到 @Bean
類或它們的方法上.當(dāng)使用Java 7-,你可以使用 @RabbitListeners
容器注解來提供同樣的功能.
參考the section called “@Repeatable @RabbitListener” 來了解更多信息.
當(dāng)使用Java 8+,可以添加多個@RabbitListener
注解到 @Bean
類或它們的方法上.當(dāng)使用Java 7-,你可以使用 @RabbitListeners
容器注解來提供同樣的功能.
參考the section called “@Repeatable @RabbitListener” 來了解更多信息.
@SendTo SpEL Expressions
@SendTo
for routing replies with no replyTo
property can now be SpEL expressions evaluated against the request/reply.
參考 the section called “Reply Management”來了解更多信息.
@SendTo
for routing replies with no replyTo
property can now be SpEL expressions evaluated against the request/reply.
參考 the section called “Reply Management”來了解更多信息.
@QueueBinding Improvements
現(xiàn)在你可以在@QueueBinding
注解中為queues, exchanges 和bindings 指定參數(shù).
Header交換器可通過@QueueBinding來支持
.
參考the section called “Annotation-driven Listener Endpoints” 來了解更多信息.
現(xiàn)在你可以在@QueueBinding
注解中為queues, exchanges 和bindings 指定參數(shù).
Header交換器可通過@QueueBinding來支持
.
參考the section called “Annotation-driven Listener Endpoints” 來了解更多信息.
延遲消息交換器(Delayed Message Exchange)
Spring AMQP 現(xiàn)在有了第一個支持RabbitMQ Delayed Message Exchange 插件. 參考Section 3.1.11, “Delayed Message Exchange” 來了解更多信息.
Spring AMQP 現(xiàn)在有了第一個支持RabbitMQ Delayed Message Exchange 插件. 參考Section 3.1.11, “Delayed Message Exchange” 來了解更多信息.
交換器內(nèi)部標(biāo)志(Exchange internal flag)
任何Exchange
定義現(xiàn)在都可標(biāo)記為internal
,當(dāng)聲明交換器時,RabbitAdmin
會將值傳遞給broker.
參考 Section 3.1.10, “Configuring the broker”來了解更多信息.
任何Exchange
定義現(xiàn)在都可標(biāo)記為internal
,當(dāng)聲明交換器時,RabbitAdmin
會將值傳遞給broker.
參考 Section 3.1.10, “Configuring the broker”來了解更多信息.
CachingConnectionFactory 變化
CachingConnectionFactory Cache Statistics
CachingConnectionFactory
現(xiàn)在通過運行時和JMX提供了cache屬性.參考the section called “Runtime Cache Properties” 來了解更多信息.
CachingConnectionFactory
現(xiàn)在通過運行時和JMX提供了cache屬性.參考the section called “Runtime Cache Properties” 來了解更多信息.
訪問底層RabbitMQ連接工廠
添加了一個新的getter方法來獲取底層factory.這是很有用的,例如,可以添加自定義屬性. 參考 Section 3.1.3, “Adding Custom Client Connection Properties” 來了解更多信息.
添加了一個新的getter方法來獲取底層factory.這是很有用的,例如,可以添加自定義屬性. 參考 Section 3.1.3, “Adding Custom Client Connection Properties” 來了解更多信息.
Channel Cache
默認的channel 緩存大小已從1增加到了25. 參考Section 3.1.2, “Connection and Resource Management”來了解更多信息.
此外, SimpleMessageListenerContainer
不再設(shè)置緩存大小至少要與concurrentConsumers的數(shù)目一樣大
- 這是多余的,因為容器消費者channels是不會緩存的.
默認的channel 緩存大小已從1增加到了25. 參考Section 3.1.2, “Connection and Resource Management”來了解更多信息.
此外, SimpleMessageListenerContainer
不再設(shè)置緩存大小至少要與concurrentConsumers的數(shù)目一樣大
- 這是多余的,因為容器消費者channels是不會緩存的.
RabbitConnectionFactoryBean
factory bean現(xiàn)在暴露了一個屬性來將client連接屬性添加到由工廠產(chǎn)生的連接中.
factory bean現(xiàn)在暴露了一個屬性來將client連接屬性添加到由工廠產(chǎn)生的連接中.
Java 反序列化(Deserialization)
當(dāng)使用Java反序列化時,現(xiàn)在允許配置類的白名單. 如果你從不可信來源接收消息,考慮創(chuàng)建白名單是很重要的. 參考 the section called “Java Deserialization” 來了解更多信息.
當(dāng)使用Java反序列化時,現(xiàn)在允許配置類的白名單. 如果你從不可信來源接收消息,考慮創(chuàng)建白名單是很重要的. 參考 the section called “Java Deserialization” 來了解更多信息.
JSON MessageConverter
改善了JSON消息轉(zhuǎn)換器,現(xiàn)在允許消費消息在消息頭中可以沒有類型(type)信息. 參考the section called “Message Conversion for Annotated Methods” 和 the section called “Jackson2JsonMessageConverter”來了解更多信息.
改善了JSON消息轉(zhuǎn)換器,現(xiàn)在允許消費消息在消息頭中可以沒有類型(type)信息. 參考the section called “Message Conversion for Annotated Methods” 和 the section called “Jackson2JsonMessageConverter”來了解更多信息.
Logging Appenders
Log4j2
加入了log4j2 appender,此appenders現(xiàn)在可配置addresses
屬性來連接broker集群.
加入了log4j2 appender,此appenders現(xiàn)在可配置addresses
屬性來連接broker集群.
Client 連接屬性
現(xiàn)在你可以向RabbitMQ連接中加入自定義連接屬性了.
現(xiàn)在你可以向RabbitMQ連接中加入自定義連接屬性了.
2.2.2 早期版本
參考Section A.2, “Previous Releases” 來了解早期版本的變化.
參考Section A.2, “Previous Releases” 來了解早期版本的變化.