Queue Affinity 和 LocalizedQueueConnectionFactory
當在集群中使用HA隊列時,為了獲取最佳性能,可以希望連接到主隊列所在的物理broker. 雖然CachingConnectionFactory
可以配置為使用多個broker 地址; 這會失敗的,client會嘗試按順序來連接. LocalizedQueueConnectionFactory
使用管理插件提供的 REST API來確定包含master隊列的節點.然后,它會創建(或從緩存中獲取)一個只連接那個節點的CachingConnectionFactory
.如果連接失敗了,將會確定一個新的消費者可連接的master節點. LocalizedQueueConnectionFactory
使用默認的連接工廠進行配置,在隊列物理位置不能確定的情況下,它會按照正常情況來連接集群.
LocalizedQueueConnectionFactory
是一個RoutingConnectionFactory
, SimpleMessageListenerContainer
會使用隊列名稱作為其lookup key ,這些已經在上面的 the section called “Routing Connection Factory” 討論過了.
基于這個原因(使用隊列名稱來作查找鍵),LocalizedQueueConnectionFactory
只在容器配置為監聽某個單一隊列時才可使用.
RabbitMQ 管理插件應該在每個節點上開啟.
警告
這種連接工廠用于長連接,如用在SimpleMessageListenerContainer的連接
.它的目的不是用于短連接, 如在 RabbitTemplate中使用,這是因為在連接前,它要調用
REST API. 此外,對于發布操作來說,隊列是未知的,不管如何, 消息會發布到所有集群成員中,因此查找節點的邏輯幾乎沒有什么意義。
這里有一個樣例配置,使用了Spring Boot的RabbitProperties來配置工廠:
@Autowired private RabbitProperties props; private final String[] adminUris = { "http://host1:15672", "http://host2:15672" }; private final String[] nodes = { "rabbit@host1", "rabbit@host2" }; @Bean public ConnectionFactory defaultConnectionFactory() { CachingConnectionFactory cf = new CachingConnectionFactory(); cf.setAddresses(this.props.getAddresses()); cf.setUsername(this.props.getUsername()); cf.setPassword(this.props.getPassword()); cf.setVirtualHost(this.props.getVirtualHost()); return cf; } @Bean public ConnectionFactory queueAffinityCF( @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) { return new LocalizedQueueConnectionFactory(defaultCF, StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()), this.adminUris, this.nodes, this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(), false, null); }
注意,三個參數是 addresses
, adminUris
和 nodes的數組
. 當一個容器試圖連接一個隊列時,它們是有位置性的,它決定了哪個節點上的隊列是mastered,并以同樣數組位置來連接其地址.
發布者確認和返回
確認和返回消息可通過分別設置CachingConnectionFactory
的 publisherConfirms
和publisherReturns
屬性為ture來完成.
當設置了這些選項時,由工廠創建的通道將包裝在PublisherCallbackChannel中
,這用來方便回調. 當獲取到這樣的通道時,client可在channel上注冊一個 PublisherCallbackChannel.Listener
. PublisherCallbackChannel
實現包含一些邏輯來路由確認/返回給適當的監聽器. 這些特性將在下面的章節中進一步解釋.
對于一些更多的背景信息, 可以參考下面的博客:Introducing Publisher Confirms.
記錄通道關閉事件
1.5版本中引入了允許用戶控制日志級別的機制.
CachingConnectionFactory
使用默認的策略來記錄通道關閉事件:
- 不記錄通道正常關閉事件 (200 OK).
- 如果通道是因為失敗的被動的隊列聲明關閉的,將記錄為debug級別.
- 如果通道關閉是因為basic.consume因專用消費者條件而拒絕引起的,將被記錄為INFO級別.
- 所有其它的事件將記錄為ERROR級別.
要修改此行為,需要在CachingConnectionFactory的closeExceptionLogger屬性中注入一個自定義的ConditionalExceptionLogger.
也可參考the section called “Consumer Failure Events”.
運行時緩存屬性
從1.6版本開始, CachingConnectionFactory
通過getCacheProperties()方法提供了緩存統計. 這些統計數據可用來在生產環境中優化緩存.例如, 最高水位標記可用來確定是否需要加大緩存.如果它等于緩存大小,你也許應該考慮進一步加大.
Table 3.1. CacheMode.CHANNEL的緩存屬性
Property | Meaning |
---|---|
channelCacheSize | 當前配置的允許空閑的最大通道數量. |
localPort | 連接的本地端口(如果可用的話). 在可以在RabbitMQ 管理界面中關聯 connections/channels. |
idleChannelsTx | 當前空閑(緩存的)的事務通道的數目. |
idleChannelsNotTx | 當前空閑(緩存的)的非事務通道的數目. |
idleChannelsTxHighWater | 同時空閑(緩存的)的事務通道的最大數目 |
idleChannelsNotTxHighWater | 同時空閑(緩存的)的非事務通道的最大數目. |
Table 3.2. CacheMode.CONNECTION的緩存屬性
Property | Meaning |
---|---|
openConnections | 表示連接到brokers上連接對象的數目. |
channelCacheSize | 當前允許空閑的最大通道數目 |
connectionCacheSize | 當前允許空閑的最大連接數目. |
idleConnections | 當前空閑的連接數目. |
idleConnectionsHighWater | 目前已經空閑的最大連接數目. |
idleChannelsTx:<localPort> | 在當前連接上目前空閑的事務通道的數目. 屬性名的localPort部分可用來在RabbitMQ 管理界面中關聯connections/channels. |
idleChannelsNotTx:<localPort> | 在當前連接上目前空閑和非事務通道的數目.屬性名的localPort部分可用來在RabbitMQ管理界面中關聯connections/channels |
idleChannelsTxHighWater: <localPort> | 已同時空閑的事務通道的最大數目. 屬性名的 localPort部分可用來在RabbitMQ管理界面中關聯connections/channels. |
idleChannelsNotTxHighWater: <localPort> | 憶同時空閑的非事務通道的最大數目.屬性名的localPort部分可用來RabbitMQ管理界面中關聯connections/channels. |
cacheMode
屬性 (包含CHANNEL
或 CONNECTION
).
Figure 3.1. JVisualVM Example

3.1.3 添加自定義Client 連接屬性
CachingConnectionFactory
現在允許你訪問底層連接工廠,例如, 設置自定義client 屬性:
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");
當在RabbitMQ管理界面中查看連接時,將會看到這些屬性.
3.1.4 AmqpTemplate
介紹
像其它Spring Framework提供的高級抽象一樣, Spring AMQP 提供了扮演核心角色的模板. 定義了主要操作的接口稱為AmqpTemplate
. 這些操作包含了發送和接收消息的一般行為.換句話說,它們不是針對某個特定實現的,從其名稱"AMQP"就可看出.另一方面,接口的實現會盡量作為AMQP協議的實現.不像JMS,它只是接口級別的API實現, AMQP是一個線路級協議.協議的實現可提供它們自己的client libraries, 因此模板接口的實現都依賴特定的client library.目前,只有一個實現:RabbitTemplate
. 在下面的例子中,你會經常看到"AmqpTemplate",但當你查看配置例子或者任何實例化或調用setter方法的代碼時,你都會看到實現類型(如."RabbitTemplate").
正如上面所提到的, AmqpTemplate
接口定義了所有發送和接收消息的基本操作. 我們將分別在以下兩個部分探索消息發送和接收。
也可參考the section called “AsyncRabbitTemplate”.
添加重試功能
從1.3版本開始, 你可為RabbitTemplate
配置使用 RetryTemplate
來幫助處理broker連接的問題. 參考spring-retry 項目來了解全部信息;下面就是一個例子,它使用指數回退策略(exponential back off policy)和默認的 SimpleRetryPolicy
(向調用者拋出異常前,會做三次嘗試).
使用XML命名空間:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval"value="10000" />
</bean>
</property>
</bean>
使用 @Configuration
:
@Bean
public AmqpTemplate rabbitTemplate(); RabbitTemplate template = new RabbitTemplate(connectionFactory()); RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); template.setRetryTemplate(retryTemplate); return template; }
從1.4版本開始,除了retryTemplate
屬性外,RabbitTemplate 上也支持recoveryCallback
選項. 它可用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T>recoveryCallback)第二個參數
.
RecoveryCallback
會有一些限制,因為在retry context只包含lastThrowable
字段.在更復雜的情況下,你應該使用外部RetryTemplate,這樣你就可以通過上下文屬性傳遞更多信息給
RecoveryCallback
.
retryTemplate.execute( new RetryCallback<Object, Exception>() { @Override
public Object doWithRetry(RetryContext context) throws Exception { context.setAttribute("message", message); return rabbitTemplate.convertAndSend(exchange, routingKey, message); } }, new RecoveryCallback<Object>() { @Overridepublic Object recover(RetryContext context) throws Exception { Object message = context.getAttribute("message"); Throwable t = context.getLastThrowable(); // Do something with message
return null;
}
});
}
在這種情況下,你不需要在RabbitTemplate中注入RetryTemplate
.
發布者確認和返回
AmqpTemplate的RabbitTemplate
實現支持發布者確認和返回.
對于返回消息,模板的 mandatory
屬性必須設置為true
, 或者對于特定消息,其 mandatory-expression
必須評估為true
.
此功能需要將CachingConnectionFactory
的publisherReturns
屬性設置為true (參考 the section called “Publisher Confirms and Returns”).
返回是通過注冊在RabbitTemplate.ReturnCallback(通過調用setReturnCallback(ReturnCallback callback))來返回給客戶端的
. 回調必須實現下面的方法:
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
每個RabbitTemplate只支持一個ReturnCallback
.也可參考the section called “Reply Timeout”.
對于發布者確認(又名發布者應答), 模板需要將 CachingConnectionFactory
中的publisherConfirms
屬性設置為true.
確認是通過注冊在RabbitTemplate.ConfirmCallback(通過調用setConfirmCallback(ConfirmCallback callback))
發送給client的. 回調必須實現下面的方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData
對象是在發送原始消息的時候,由client提供的. ack
為true 表示確認,為false時,表示不確認(nack). 對于nack
, cause可能會包含nack的原因(如果生成nack時,它可用的話).
一個例子是當發送消息到一個不存在的交換器時.在那種情況下,broker會關閉通道; 關閉的原因會包含在cause中
. cause
是1.4版本中加入的.
RabbitTemplate中只支持一個ConfirmCallback
.
當rabbit模板完成發送操作時,會關閉通道; 這可以排除當連接工廠緩存滿時(緩存中還有空間,通道沒有物理關閉,返回/確認正常處理)確認和返回的接待問題.
當緩存滿了的時候, 框架會延遲5秒來關閉,以為接收確認/返回消息留有時間.當使用確認時,通道會在收到最后一個確認時關閉.
當使用返回時,通道會保持5秒的打開狀態.一般建議將連接工廠的channelCacheSize
設為足夠大,這樣發布消息的通道就會返回到緩存中,而不是被關閉.
你可以使用RabbitMQ管理插件來監控通道的使用情況;如果你看到通道打開/關閉的非常迅速,那么你必須考慮加大緩存,從而減少服務器的開銷.
Messaging 集成
從1.4版本開始, 構建于RabbitTemplate上的RabbitMessagingTemplate提供了與
Spring Framework消息抽象的集成(如.org.springframework.messaging.Message)
.
This allows you to create the message to send in generic manner.
驗證 User Id
從1.6版本開始,模板支持user-id-expression
(當使用Java配置時,為userIdExpression
). 如果發送消息,user id屬性的值將在評估表達式后進行設置.評價的根對象是要發送的消息。
例子:
<rabbit:template...user-id-expression="'guest'" />
<rabbit:template...user-id-expression="@myConnectionFactory.username" />
第一個示例是一個文本表達式;第二個例子將獲取上下文中連接工廠bean的username
屬性.
3.1.5 發送消息
介紹
當發送消息時,可使用下面的任何一種方法:
void send(Message message) throws AmqpException; void send(String routingKey, Message message) throws AmqpException; void send(String exchange, String routingKey, Message message) throws AmqpException;
我們將使用上面列出的最后一個方法來討論,因為它實際是最清晰的.它允許在運行時提供一個AMQP Exchange 名稱和路由鍵(routing key).最后一個參數是負責初建創建Message實例的回調.使用此方法來發送消息的示例如下:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果你打算使用模板實例來多次(或多次)向同一個交換器發送消息時,"exchange" 可設置在模板自已身上.在這種情況中,可以使用上面列出的第二個方法. 下面的例子在功能上等價于前面那個:
amqpTemplate.setExchange("marketData.topic"); amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果在模塊上設置"exchange"和"routingKey"屬性,那么方法就只接受Message
參數:
amqpTemplate.setExchange("marketData.topic"); amqpTemplate.setRoutingKey("quotes.nasdaq.FOO"); amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
關于交換器和路由鍵更好的想法是明確的參數將總是會覆蓋模板默認值.事實上, 即使你不在模板上明確設置這些屬性, 總是有默認值的地方. 在兩種情況中,默認值是空字符串,這是合情合理的.
就路由鍵而言,它并不總是首先需要的 (如. Fanout 交換器). 此外,綁定的交換器上的隊列可能會使用空字符串. 這些在模板的路由鍵中都是合法的.
就交換器名稱而言,空字符串也是常常使用的,因為AMQP規范定義了無名稱的"默認交換器".
由于所有隊列可使用它們的隊列名稱作為路由鍵自動綁定到默認交換器上(它是Direct交換器e) ,上面的第二個方法可通過默認的交換器將簡單的點對點消息傳遞到任何隊列.
只需要簡單的將隊列名稱作為路由鍵-在運行時提供方法參數:
RabbitTemplate template = new RabbitTemplate(); // 使用默認的無名交換器 template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,如果你喜歡創建一個模板用于主要或專門向一個隊列發送消息, 以下是完全合理的:
RabbitTemplate template = new RabbitTemplate(); // 使用默認無名交換器 template.setRoutingKey("queue.helloWorld"); // 但我們總是向此隊列發送消息 template.send(new Message("Hello World".getBytes(), someProperties));Message Builder API
從1.3版本開始,通過
MessageBuilder
和MessagePropertiesBuilder提供了消息構建API
; 它們提供了更加方便地創建消息和消息屬性的方法:Message message = MessageBuilder.withBody("foo".getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build();或
MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build(); Message message = MessageBuilder.withBody("foo".getBytes()) .andProperties(props) .build();每個MessageProperies上定義的屬性都可以被設置. 其它方法包括
setHeader(String key, String value)
,removeHeader(String key)
,removeHeaders()
, 和copyProperties(MessageProperties properties)
.
每個屬性方法都有一個set*IfAbsent()
變種. 在默認的初始值存在的情況下, 方法名為set*IfAbsentOrDefault()
.提供了五個靜態方法來創建初始message builder:
public static MessageBuilder withBody(byte[] body)public static MessageBuilder withClonedBody(byte[] body)public static MessageBuilder withBody(byte[] body, int from, int to)public static MessageBuilder fromMessage(Message message)public static MessageBuilder fromClonedMessage(Message message)
builder創建的消息body是參數的直接引用.
builder創建的消息body是包含拷貝原字節數組的新數組.
build創建的消息body是包含原字節數組范圍的新數組.查看
Arrays.copyOfRange()
來了解更多信息.builder創建的消息body是原body參數的直接引用. 參數的屬性將拷貝到新
MessageProperties對象中
.builer創建的消息body包含參數body的新數組.參數的屬性將拷貝到新的
MessageProperties
對象中.public static MessagePropertiesBuilder newInstance()public static MessagePropertiesBuilder fromProperties(MessageProperties properties)public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties)
新消息屬性將使用默認值進行初始化
builder會使用提供的properties對象進行初始化,
build()
方法也會返回參數properties對象.參數的屬性會拷貝到新的M
essageProperties對象中
.在AmqpTemplate的
RabbitTemplate
實現中, 每個send()
方法的重載版本都接受一個額外的CorrelationData
對象.
當啟用了發布者確認時,此對象會在3.1.4, “AmqpTemplate”的回調中返回.這允許發送者使用確認(ack或nack)來關聯發送的消息.發布者返回
當模板的
mandatory
屬性為true時,返回消息將由 Section 3.1.4, “AmqpTemplate”描述的回調來返回.從1.4版本開始,
RabbitTemplate
支持 SpELmandatoryExpression
屬性,它將對每個請求消息進行評估,作為根評估對象來解析成布爾值. Bean引用,如"@myBean.isMandatory(#root)"
可用在此表達式中.發布者返回內部也可用于
RabbitTemplate
的發送和接收操作中. 參考the section called “Reply Timeout” 來了解更多信息.批量
從1.4.2版本開始,引入了
BatchingRabbitTemplate
.它是RabbitTemplate
的子類,覆蓋了send
方法,此方法可根據BatchingStrategy來批量發送消息
; 只有當一個批次完成時才會向RabbitMQ發送消息。public interface BatchingStrategy { MessageBatch addToBatch(String exchange, String routingKey, Message message); Date nextRelease(); Collection<MessageBatch> releaseBatches(); }
警告
成批的數據是保存在內存中的,如果出現系統故障,未發送的消息將會丟失.
這里提供了一個 SimpleBatchingStrategy
.它支持將消息發送到單個 exchange/routing key.它有下面的屬性:
batchSize
- 發送前一個批次中消息的數量bufferLimit
- 批量消息的最大大小;如果超過了此值,它會取代batchSize
, 并導致要發送的部分批處理timeout
- 當沒有新的活動添加到消息批處理時之后,將發送部分批處理的時間(a time after which a partial batch will be sent when there is no new activity adding messages to the batch)
SimpleBatchingStrategy
通過在每個消息的前面嵌入4字節二進制長度來格式化批次消息. 這是通過設置springBatchFormat消息屬性為lengthHeader4向接收系統傳達的.
重要
批量消息自動由監聽器容器來分批(de-batched)(使用springBatchFormat
消息頭).拒絕批量消息中的任何一個會將導致拒絕整個批次消息.
3.1.6 接收消息
介紹
Message 接收總是比發送稍顯復雜.有兩種方式來接收Message
. 最簡單的選擇是在輪詢方法調用中一次只接收一個消息. 更復雜的更常見的方法是注冊一個偵聽器,按需異步的接收消息。
在下面兩個子章節中,我們將看到這兩種方法的示例.
Polling Consumer
AmqpTemplate
自身可用來輪詢消息接收.默認情況下,如果沒有可用消息,將會立即返回 null
;它是無阻塞的.
從1.5版本開始,你可以設置receiveTimeout
,以毫秒為單位, receive方法會阻塞設定的時間來等待消息.小于0的值則意味著無限期阻塞 (或者至少要等到與broker的連接丟失).
1.6版本引入了receive
方法的變種,以允許在每個調用上都可設置超時時間.
警告
由于接收操作會為每個消息創建一個新的QueueingConsumer
,這種技術并不適用于大容量環境,可考慮使用異步消費者,或將receiveTimeout
設為0來應對這種情況.
這里有四個簡單可用的receive 方法.同發送方的交換器一樣, 有一種方法需要直接在模板本身上設置的默認隊列屬性, 還有一種方法需要在運行接受隊列參數.
版本1.6 引入了接受timeoutMillis
的變種,基于每個請求重寫了receiveTimeout
方法.
Message receive() throws AmqpException; Message receive(String queueName) throws AmqpException; Message receive(long timeoutMillis) throws AmqpException; Message receive(String queueName, long timeoutMillis) throws AmqpException;
與發送消息的情況類似, AmqpTemplate
有一些便利的方法來接收POJOs 而非Message
實例, 其實現可提供一種方法來定制MessageConverter
以用于創建返回的Object
:
Object receiveAndConvert() throws AmqpException; Object receiveAndConvert(String queueName) throws AmqpException; Message receiveAndConvert(long timeoutMillis) throws AmqpException; Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
類似于sendAndReceive
方法,從1.3版本開始, AmqpTemplate
有多個便利的receiveAndReply
方法同步接收,處理,以及回應消息:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException; <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException; <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate
實現會負責receive 和 reply 階段.在大多數情況下,如果有必要,你只需要提供ReceiveAndReplyCallback
的實現來為收到的消息執行某些業務邏輯或為收到的消息構建回應對象.
注意,ReceiveAndReplyCallback
可能返回null
. 在這種情況下,將不會發送回應,receiveAndReply
的工作類似于receive
方法. 這允許相同的隊列用于消息的混合物,其中一些可能不需要答復。
自動消息(請求和應答)轉換只能適應于提供的回調不是ReceiveAndReplyMessageCallback 實例的情況下- 它提供了一個原始的消息交換合同。
ReplyToAddressCallback
只在這種情況中有用,需要根據收到的信息通過自定義邏輯來決定replyTo
地址,并在ReceiveAndReplyCallback中進行回應的情況
. 默認情況下,請求消息中的 replyTo
信息用來路由回復.
下面是一個基于POJO的接收和回復…
boolean received = this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() { public Invoice handle(Order order) { return processOrder(order); } }); if (received) { log.info("We received an order!"); }
異步消費者
@RabbitListener
注解)并提供了一個開放的基礎設施,編程注冊端點。這是目前為止建立一個異步消費者的最方便方式, 參考the section called “Annotation-driven Listener Endpoints”來了解更多詳情.
消息監聽器
對于異步消息接收, 會涉及到一個專用組件(不是AmqpTemplate
).此組件可作為消息消費回調的容器.
稍后,我們會講解這個容器和它的屬性,但首先讓我們來看一下回調,因為這里是你的應用程序代碼與消息系統集成的地方. MessageListener
接口:
public interface MessageListener { void onMessage(Message message); }
如果出于任何理由,你的回調邏輯需要依賴于AMQP Channel實例,那么你可以使用ChannelAwareMessageListener
. 它看起來是很相似的,但多了一個額外的參數:
public interface ChannelAwareMessageListener { void onMessage(Message message, Channel channel) throws Exception; }
MessageListenerAdapter
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo); listener.setDefaultListenerMethod("myMethod");
你也可以繼承適配器,并實現getListenerMethodName()
方法(基于消息來動態選擇不同的方法). 這個方法有兩個參數:originalMessage
和extractedMessage
, 后者是轉換后的結果.默認情況下,需要配置SimpleMessageConverter
;
參考the section called “SimpleMessageConverter” 來了解更多信息以及其它轉換器的信息.
從1.4.2開始,原始消息包含consumerQueue
和 consumerTag
屬性,這些屬性可用來確定消息是從那個隊列中收到的.
從1.5版本開始,你可以配置消費者queue/tag到方法名稱的映射(map)以動態選擇要調用的方法.如果map中無條目,我們將退回到默認監聽器方法.
容器
你已經看過了消息監聽回調上的各種各樣的選項,現在我們將注意力轉向容器. 基本上,容器處理主動(active)的職責,這樣監聽器回調可以保持被動(passive). 容器是“生命周期”組件的一個例子。
它提供了啟動和停止的方法.當配置容器時,你本質上縮短了AMQP Queue和 MessageListener
實例之間的距離.你必須提供一個ConnectionFactory
的引用,隊列名稱或隊列實例.
下面是使用默認實現SimpleMessageListenerContainer
的最基礎的例子:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory); container.setQueueNames("some.queue"); container.setMessageListener(new MessageListenerAdapter(somePojo));
作為一個主動組件, 最常見的是使用bean定義來創建監聽器容器,這樣它就可以簡單地運行于后臺.這可以通過XML來完成:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
或者你可以@Configuration 風格:
@Configuration
public class ExampleAmqpConfiguration { @Bean
public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory()); container.setQueueName("some.queue"); container.setMessageListener(exampleListener()); return container; } @Bean
public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean
public MessageListener exampleListener() { returnnew MessageListener() { publicvoid onMessage(Message message) { System.out.println("received: " + message); } }; } }
從RabbitMQ Version 3.2開始, broker支持消費者優先級了(參考 Using Consumer Priorities with RabbitMQ).
這可以通過在消費者設置x-priority
參數來啟用.
SimpleMessageListenerContainer
現在支持設置消費者參數:
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
為了方便,命名空間在listener元素上提供了priority
屬性:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
從1.3版本開始,容器監聽的隊列可在運行時進行修改,參考 Section 3.1.18, “Listener Container Queues”.
auto-delete 隊列
當容器配置為監聽auto-delete
隊列或隊列有x-expires
選項或者broker配置了Time-To-Live 策略,隊列將在容器停止時(最后的消費者退出時)由broker進行刪除.
在1.3版本之前,容器會因隊列缺失而不能重啟; 當連接關閉/打開時,RabbitAdmin
只能自動重新聲明隊列.
從1.3版本開始, 在啟動時,容器會使用RabbitAdmin
來重新聲明缺失的隊列.
您也可以使用條件聲明(the section called “Conditional Declaration”) 與auto-startup="false"
來管理隊列的延遲聲明,直到容器啟動.
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo"queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />
在這種情況下,隊列和交換器是由 containerAdmin
來聲明的,auto-startup="false"
因此在上下文初始化期間不會聲明元素.同樣,出于同樣原因,容器也不會啟動.當容器隨后啟動時,它會使用containerAdmin引用來聲明元素
.
批量消息
批量消息會自動地通過監聽器容器 (使用springBatchFormat
消息頭)來解批(de-batched). 拒絕批量消息中的任何一個都將導致整批消息被拒絕. 參考the section called “Batching” 來了解更多關于批量消息的詳情.
消費者失敗事件
從1.5版本開始,無論時候,當監聽器(消費者)經歷某種失敗時,SimpleMessageListenerContainer
會發布應用程序事件. 事件ListenerContainerConsumerFailedEvent
有下面的屬性:
container
- 消費者經歷問題的監聽容器.reason
- 失敗的文本原因。fatal
- 一個表示失敗是否是致命的boolean值;對于非致命異常,容器會根據retryInterval值嘗試重新啟動消費者.throwable
-捕捉到的Throwable
.
這些事件能通過實現ApplicationListener<ListenerContainerConsumerFailedEvent>來消費
.
當 concurrentConsumers
大于1時,系統級事件(如連接失敗)將發布到所有消費者.
如果消費者因隊列是專有使用而失敗了,默認情況下,在發布事件的時候,也會發出WARN
日志. 要改變日志行為,需要在SimpleMessageListenerContainer的exclusiveConsumerExceptionLogger屬性中提供自定義的ConditionalExceptionLogger
.
也可參考the section called “Logging Channel Close Events”.