Queue Affinity 和 LocalizedQueueConnectionFactory
當(dāng)在集群中使用HA隊列時,為了獲取最佳性能,可以希望連接到主隊列所在的物理broker. 雖然CachingConnectionFactory
可以配置為使用多個broker 地址; 這會失敗的,client會嘗試按順序來連接. LocalizedQueueConnectionFactory
使用管理插件提供的 REST API來確定包含master隊列的節(jié)點.然后,它會創(chuàng)建(或從緩存中獲取)一個只連接那個節(jié)點的CachingConnectionFactory
.如果連接失敗了,將會確定一個新的消費者可連接的master節(jié)點. LocalizedQueueConnectionFactory
使用默認的連接工廠進行配置,在隊列物理位置不能確定的情況下,它會按照正常情況來連接集群.
LocalizedQueueConnectionFactory
是一個RoutingConnectionFactory
, SimpleMessageListenerContainer
會使用隊列名稱作為其lookup key ,這些已經(jīng)在上面的 the section called “Routing Connection Factory” 討論過了.
基于這個原因(使用隊列名稱來作查找鍵),LocalizedQueueConnectionFactory
只在容器配置為監(jiān)聽某個單一隊列時才可使用.
RabbitMQ 管理插件應(yīng)該在每個節(jié)點上開啟.
警告
這種連接工廠用于長連接,如用在SimpleMessageListenerContainer的連接
.它的目的不是用于短連接, 如在 RabbitTemplate中使用,這是因為在連接前,它要調(diào)用
REST API. 此外,對于發(fā)布操作來說,隊列是未知的,不管如何, 消息會發(fā)布到所有集群成員中,因此查找節(jié)點的邏輯幾乎沒有什么意義。
這里有一個樣例配置,使用了Spring Boot的RabbitProperties來配置工廠:
@Autowired private RabbitProperties props; private final String[] adminUris = { "http://host1:15672", "http://host2:15672" }; private final String[] nodes = { "rabbit@host1", "rabbit@host2" }; @Bean public ConnectionFactory defaultConnectionFactory() { CachingConnectionFactory cf = new CachingConnectionFactory(); cf.setAddresses(this.props.getAddresses()); cf.setUsername(this.props.getUsername()); cf.setPassword(this.props.getPassword()); cf.setVirtualHost(this.props.getVirtualHost()); return cf; } @Bean public ConnectionFactory queueAffinityCF( @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) { return new LocalizedQueueConnectionFactory(defaultCF, StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()), this.adminUris, this.nodes, this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(), false, null); }
注意,三個參數(shù)是 addresses
, adminUris
和 nodes的數(shù)組
. 當(dāng)一個容器試圖連接一個隊列時,它們是有位置性的,它決定了哪個節(jié)點上的隊列是mastered,并以同樣數(shù)組位置來連接其地址.
發(fā)布者確認和返回
確認和返回消息可通過分別設(shè)置CachingConnectionFactory
的 publisherConfirms
和publisherReturns
屬性為ture來完成.
當(dāng)設(shè)置了這些選項時,由工廠創(chuàng)建的通道將包裝在PublisherCallbackChannel中
,這用來方便回調(diào). 當(dāng)獲取到這樣的通道時,client可在channel上注冊一個 PublisherCallbackChannel.Listener
. PublisherCallbackChannel
實現(xiàn)包含一些邏輯來路由確認/返回給適當(dāng)?shù)谋O(jiān)聽器. 這些特性將在下面的章節(jié)中進一步解釋.
對于一些更多的背景信息, 可以參考下面的博客:Introducing Publisher Confirms.
記錄通道關(guān)閉事件
1.5版本中引入了允許用戶控制日志級別的機制.
CachingConnectionFactory
使用默認的策略來記錄通道關(guān)閉事件:
- 不記錄通道正常關(guān)閉事件 (200 OK).
- 如果通道是因為失敗的被動的隊列聲明關(guān)閉的,將記錄為debug級別.
- 如果通道關(guān)閉是因為basic.consume因專用消費者條件而拒絕引起的,將被記錄為INFO級別.
- 所有其它的事件將記錄為ERROR級別.
要修改此行為,需要在CachingConnectionFactory的closeExceptionLogger屬性中注入一個自定義的ConditionalExceptionLogger.
也可參考the section called “Consumer Failure Events”.
運行時緩存屬性
從1.6版本開始, CachingConnectionFactory
通過getCacheProperties()方法提供了緩存統(tǒng)計. 這些統(tǒng)計數(shù)據(jù)可用來在生產(chǎn)環(huán)境中優(yōu)化緩存.例如, 最高水位標記可用來確定是否需要加大緩存.如果它等于緩存大小,你也許應(yīng)該考慮進一步加大.
Table 3.1. CacheMode.CHANNEL的緩存屬性
Property | Meaning |
---|---|
channelCacheSize | 當(dāng)前配置的允許空閑的最大通道數(shù)量. |
localPort | 連接的本地端口(如果可用的話). 在可以在RabbitMQ 管理界面中關(guān)聯(lián) connections/channels. |
idleChannelsTx | 當(dāng)前空閑(緩存的)的事務(wù)通道的數(shù)目. |
idleChannelsNotTx | 當(dāng)前空閑(緩存的)的非事務(wù)通道的數(shù)目. |
idleChannelsTxHighWater | 同時空閑(緩存的)的事務(wù)通道的最大數(shù)目 |
idleChannelsNotTxHighWater | 同時空閑(緩存的)的非事務(wù)通道的最大數(shù)目. |
Table 3.2. CacheMode.CONNECTION的緩存屬性
Property | Meaning |
---|---|
openConnections | 表示連接到brokers上連接對象的數(shù)目. |
channelCacheSize | 當(dāng)前允許空閑的最大通道數(shù)目 |
connectionCacheSize | 當(dāng)前允許空閑的最大連接數(shù)目. |
idleConnections | 當(dāng)前空閑的連接數(shù)目. |
idleConnectionsHighWater | 目前已經(jīng)空閑的最大連接數(shù)目. |
idleChannelsTx:<localPort> | 在當(dāng)前連接上目前空閑的事務(wù)通道的數(shù)目. 屬性名的localPort部分可用來在RabbitMQ 管理界面中關(guān)聯(lián)connections/channels. |
idleChannelsNotTx:<localPort> | 在當(dāng)前連接上目前空閑和非事務(wù)通道的數(shù)目.屬性名的localPort部分可用來在RabbitMQ管理界面中關(guān)聯(lián)connections/channels |
idleChannelsTxHighWater: <localPort> | 已同時空閑的事務(wù)通道的最大數(shù)目. 屬性名的 localPort部分可用來在RabbitMQ管理界面中關(guān)聯(lián)connections/channels. |
idleChannelsNotTxHighWater: <localPort> | 憶同時空閑的非事務(wù)通道的最大數(shù)目.屬性名的localPort部分可用來RabbitMQ管理界面中關(guān)聯(lián)connections/channels. |
cacheMode
屬性 (包含CHANNEL
或 CONNECTION
).
Figure 3.1. JVisualVM Example

3.1.3 添加自定義Client 連接屬性
CachingConnectionFactory
現(xiàn)在允許你訪問底層連接工廠,例如, 設(shè)置自定義client 屬性:
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");
當(dāng)在RabbitMQ管理界面中查看連接時,將會看到這些屬性.
3.1.4 AmqpTemplate
介紹
像其它Spring Framework提供的高級抽象一樣, Spring AMQP 提供了扮演核心角色的模板. 定義了主要操作的接口稱為AmqpTemplate
. 這些操作包含了發(fā)送和接收消息的一般行為.換句話說,它們不是針對某個特定實現(xiàn)的,從其名稱"AMQP"就可看出.另一方面,接口的實現(xiàn)會盡量作為AMQP協(xié)議的實現(xiàn).不像JMS,它只是接口級別的API實現(xiàn), AMQP是一個線路級協(xié)議.協(xié)議的實現(xiàn)可提供它們自己的client libraries, 因此模板接口的實現(xiàn)都依賴特定的client library.目前,只有一個實現(xiàn):RabbitTemplate
. 在下面的例子中,你會經(jīng)常看到"AmqpTemplate",但當(dāng)你查看配置例子或者任何實例化或調(diào)用setter方法的代碼時,你都會看到實現(xiàn)類型(如."RabbitTemplate").
正如上面所提到的, AmqpTemplate
接口定義了所有發(fā)送和接收消息的基本操作. 我們將分別在以下兩個部分探索消息發(fā)送和接收。
也可參考the section called “AsyncRabbitTemplate”.
添加重試功能
從1.3版本開始, 你可為RabbitTemplate
配置使用 RetryTemplate
來幫助處理broker連接的問題. 參考spring-retry 項目來了解全部信息;下面就是一個例子,它使用指數(shù)回退策略(exponential back off policy)和默認的 SimpleRetryPolicy
(向調(diào)用者拋出異常前,會做三次嘗試).
使用XML命名空間:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval"value="10000" />
</bean>
</property>
</bean>
使用 @Configuration
:
@Bean
public AmqpTemplate rabbitTemplate(); RabbitTemplate template = new RabbitTemplate(connectionFactory()); RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); template.setRetryTemplate(retryTemplate); return template; }
從1.4版本開始,除了retryTemplate
屬性外,RabbitTemplate 上也支持recoveryCallback
選項. 它可用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T>recoveryCallback)第二個參數(shù)
.
RecoveryCallback
會有一些限制,因為在retry context只包含lastThrowable
字段.在更復(fù)雜的情況下,你應(yīng)該使用外部RetryTemplate,這樣你就可以通過上下文屬性傳遞更多信息給
RecoveryCallback
.
retryTemplate.execute( new RetryCallback<Object, Exception>() { @Override
public Object doWithRetry(RetryContext context) throws Exception { context.setAttribute("message", message); return rabbitTemplate.convertAndSend(exchange, routingKey, message); } }, new RecoveryCallback<Object>() { @Overridepublic Object recover(RetryContext context) throws Exception { Object message = context.getAttribute("message"); Throwable t = context.getLastThrowable(); // Do something with message
return null;
}
});
}
在這種情況下,你不需要在RabbitTemplate中注入RetryTemplate
.
發(fā)布者確認和返回
AmqpTemplate的RabbitTemplate
實現(xiàn)支持發(fā)布者確認和返回.
對于返回消息,模板的 mandatory
屬性必須設(shè)置為true
, 或者對于特定消息,其 mandatory-expression
必須評估為true
.
此功能需要將CachingConnectionFactory
的publisherReturns
屬性設(shè)置為true (參考 the section called “Publisher Confirms and Returns”).
返回是通過注冊在RabbitTemplate.ReturnCallback(通過調(diào)用setReturnCallback(ReturnCallback callback))來返回給客戶端的
. 回調(diào)必須實現(xiàn)下面的方法:
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
每個RabbitTemplate只支持一個ReturnCallback
.也可參考the section called “Reply Timeout”.
對于發(fā)布者確認(又名發(fā)布者應(yīng)答), 模板需要將 CachingConnectionFactory
中的publisherConfirms
屬性設(shè)置為true.
確認是通過注冊在RabbitTemplate.ConfirmCallback(通過調(diào)用setConfirmCallback(ConfirmCallback callback))
發(fā)送給client的. 回調(diào)必須實現(xiàn)下面的方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData
對象是在發(fā)送原始消息的時候,由client提供的. ack
為true 表示確認,為false時,表示不確認(nack). 對于nack
, cause可能會包含nack的原因(如果生成nack時,它可用的話).
一個例子是當(dāng)發(fā)送消息到一個不存在的交換器時.在那種情況下,broker會關(guān)閉通道; 關(guān)閉的原因會包含在cause中
. cause
是1.4版本中加入的.
RabbitTemplate中只支持一個ConfirmCallback
.
當(dāng)rabbit模板完成發(fā)送操作時,會關(guān)閉通道; 這可以排除當(dāng)連接工廠緩存滿時(緩存中還有空間,通道沒有物理關(guān)閉,返回/確認正常處理)確認和返回的接待問題.
當(dāng)緩存滿了的時候, 框架會延遲5秒來關(guān)閉,以為接收確認/返回消息留有時間.當(dāng)使用確認時,通道會在收到最后一個確認時關(guān)閉.
當(dāng)使用返回時,通道會保持5秒的打開狀態(tài).一般建議將連接工廠的channelCacheSize
設(shè)為足夠大,這樣發(fā)布消息的通道就會返回到緩存中,而不是被關(guān)閉.
你可以使用RabbitMQ管理插件來監(jiān)控通道的使用情況;如果你看到通道打開/關(guān)閉的非常迅速,那么你必須考慮加大緩存,從而減少服務(wù)器的開銷.
Messaging 集成
從1.4版本開始, 構(gòu)建于RabbitTemplate上的RabbitMessagingTemplate提供了與
Spring Framework消息抽象的集成(如.org.springframework.messaging.Message)
.
This allows you to create the message to send in generic manner.
驗證 User Id
從1.6版本開始,模板支持user-id-expression
(當(dāng)使用Java配置時,為userIdExpression
). 如果發(fā)送消息,user id屬性的值將在評估表達式后進行設(shè)置.評價的根對象是要發(fā)送的消息。
例子:
<rabbit:template...user-id-expression="'guest'" />
<rabbit:template...user-id-expression="@myConnectionFactory.username" />
第一個示例是一個文本表達式;第二個例子將獲取上下文中連接工廠bean的username
屬性.
3.1.5 發(fā)送消息
介紹
當(dāng)發(fā)送消息時,可使用下面的任何一種方法:
void send(Message message) throws AmqpException; void send(String routingKey, Message message) throws AmqpException; void send(String exchange, String routingKey, Message message) throws AmqpException;
我們將使用上面列出的最后一個方法來討論,因為它實際是最清晰的.它允許在運行時提供一個AMQP Exchange 名稱和路由鍵(routing key).最后一個參數(shù)是負責(zé)初建創(chuàng)建Message實例的回調(diào).使用此方法來發(fā)送消息的示例如下:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果你打算使用模板實例來多次(或多次)向同一個交換器發(fā)送消息時,"exchange" 可設(shè)置在模板自已身上.在這種情況中,可以使用上面列出的第二個方法. 下面的例子在功能上等價于前面那個:
amqpTemplate.setExchange("marketData.topic"); amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果在模塊上設(shè)置"exchange"和"routingKey"屬性,那么方法就只接受Message
參數(shù):
amqpTemplate.setExchange("marketData.topic"); amqpTemplate.setRoutingKey("quotes.nasdaq.FOO"); amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
關(guān)于交換器和路由鍵更好的想法是明確的參數(shù)將總是會覆蓋模板默認值.事實上, 即使你不在模板上明確設(shè)置這些屬性, 總是有默認值的地方. 在兩種情況中,默認值是空字符串,這是合情合理的.
就路由鍵而言,它并不總是首先需要的 (如. Fanout 交換器). 此外,綁定的交換器上的隊列可能會使用空字符串. 這些在模板的路由鍵中都是合法的.
就交換器名稱而言,空字符串也是常常使用的,因為AMQP規(guī)范定義了無名稱的"默認交換器".
由于所有隊列可使用它們的隊列名稱作為路由鍵自動綁定到默認交換器上(它是Direct交換器e) ,上面的第二個方法可通過默認的交換器將簡單的點對點消息傳遞到任何隊列.
只需要簡單的將隊列名稱作為路由鍵-在運行時提供方法參數(shù):
RabbitTemplate template = new RabbitTemplate(); // 使用默認的無名交換器 template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,如果你喜歡創(chuàng)建一個模板用于主要或?qū)iT向一個隊列發(fā)送消息, 以下是完全合理的:
RabbitTemplate template = new RabbitTemplate(); // 使用默認無名交換器 template.setRoutingKey("queue.helloWorld"); // 但我們總是向此隊列發(fā)送消息 template.send(new Message("Hello World".getBytes(), someProperties));Message Builder API
從1.3版本開始,通過
MessageBuilder
和MessagePropertiesBuilder提供了消息構(gòu)建API
; 它們提供了更加方便地創(chuàng)建消息和消息屬性的方法:Message message = MessageBuilder.withBody("foo".getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build();或
MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build(); Message message = MessageBuilder.withBody("foo".getBytes()) .andProperties(props) .build();每個MessageProperies上定義的屬性都可以被設(shè)置. 其它方法包括
setHeader(String key, String value)
,removeHeader(String key)
,removeHeaders()
, 和copyProperties(MessageProperties properties)
.
每個屬性方法都有一個set*IfAbsent()
變種. 在默認的初始值存在的情況下, 方法名為set*IfAbsentOrDefault()
.提供了五個靜態(tài)方法來創(chuàng)建初始message builder:
public static MessageBuilder withBody(byte[] body)public static MessageBuilder withClonedBody(byte[] body)public static MessageBuilder withBody(byte[] body, int from, int to)public static MessageBuilder fromMessage(Message message)public static MessageBuilder fromClonedMessage(Message message)
builder創(chuàng)建的消息body是參數(shù)的直接引用.
builder創(chuàng)建的消息body是包含拷貝原字節(jié)數(shù)組的新數(shù)組.
build創(chuàng)建的消息body是包含原字節(jié)數(shù)組范圍的新數(shù)組.查看
Arrays.copyOfRange()
來了解更多信息.builder創(chuàng)建的消息body是原body參數(shù)的直接引用. 參數(shù)的屬性將拷貝到新
MessageProperties對象中
.builer創(chuàng)建的消息body包含參數(shù)body的新數(shù)組.參數(shù)的屬性將拷貝到新的
MessageProperties
對象中.public static MessagePropertiesBuilder newInstance()public static MessagePropertiesBuilder fromProperties(MessageProperties properties)public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties)
新消息屬性將使用默認值進行初始化
builder會使用提供的properties對象進行初始化,
build()
方法也會返回參數(shù)properties對象.參數(shù)的屬性會拷貝到新的M
essageProperties對象中
.在AmqpTemplate的
RabbitTemplate
實現(xiàn)中, 每個send()
方法的重載版本都接受一個額外的CorrelationData
對象.
當(dāng)啟用了發(fā)布者確認時,此對象會在3.1.4, “AmqpTemplate”的回調(diào)中返回.這允許發(fā)送者使用確認(ack或nack)來關(guān)聯(lián)發(fā)送的消息.發(fā)布者返回
當(dāng)模板的
mandatory
屬性為true時,返回消息將由 Section 3.1.4, “AmqpTemplate”描述的回調(diào)來返回.從1.4版本開始,
RabbitTemplate
支持 SpELmandatoryExpression
屬性,它將對每個請求消息進行評估,作為根評估對象來解析成布爾值. Bean引用,如"@myBean.isMandatory(#root)"
可用在此表達式中.發(fā)布者返回內(nèi)部也可用于
RabbitTemplate
的發(fā)送和接收操作中. 參考the section called “Reply Timeout” 來了解更多信息.批量
從1.4.2版本開始,引入了
BatchingRabbitTemplate
.它是RabbitTemplate
的子類,覆蓋了send
方法,此方法可根據(jù)BatchingStrategy來批量發(fā)送消息
; 只有當(dāng)一個批次完成時才會向RabbitMQ發(fā)送消息。public interface BatchingStrategy { MessageBatch addToBatch(String exchange, String routingKey, Message message); Date nextRelease(); Collection<MessageBatch> releaseBatches(); }
警告
成批的數(shù)據(jù)是保存在內(nèi)存中的,如果出現(xiàn)系統(tǒng)故障,未發(fā)送的消息將會丟失.
這里提供了一個 SimpleBatchingStrategy
.它支持將消息發(fā)送到單個 exchange/routing key.它有下面的屬性:
batchSize
- 發(fā)送前一個批次中消息的數(shù)量bufferLimit
- 批量消息的最大大小;如果超過了此值,它會取代batchSize
, 并導(dǎo)致要發(fā)送的部分批處理timeout
- 當(dāng)沒有新的活動添加到消息批處理時之后,將發(fā)送部分批處理的時間(a time after which a partial batch will be sent when there is no new activity adding messages to the batch)
SimpleBatchingStrategy
通過在每個消息的前面嵌入4字節(jié)二進制長度來格式化批次消息. 這是通過設(shè)置springBatchFormat消息屬性為lengthHeader4向接收系統(tǒng)傳達的.
重要
批量消息自動由監(jiān)聽器容器來分批(de-batched)(使用springBatchFormat
消息頭).拒絕批量消息中的任何一個會將導(dǎo)致拒絕整個批次消息.
3.1.6 接收消息
介紹
Message 接收總是比發(fā)送稍顯復(fù)雜.有兩種方式來接收Message
. 最簡單的選擇是在輪詢方法調(diào)用中一次只接收一個消息. 更復(fù)雜的更常見的方法是注冊一個偵聽器,按需異步的接收消息。
在下面兩個子章節(jié)中,我們將看到這兩種方法的示例.
Polling Consumer
AmqpTemplate
自身可用來輪詢消息接收.默認情況下,如果沒有可用消息,將會立即返回 null
;它是無阻塞的.
從1.5版本開始,你可以設(shè)置receiveTimeout
,以毫秒為單位, receive方法會阻塞設(shè)定的時間來等待消息.小于0的值則意味著無限期阻塞 (或者至少要等到與broker的連接丟失).
1.6版本引入了receive
方法的變種,以允許在每個調(diào)用上都可設(shè)置超時時間.
警告
由于接收操作會為每個消息創(chuàng)建一個新的QueueingConsumer
,這種技術(shù)并不適用于大容量環(huán)境,可考慮使用異步消費者,或?qū)?/span>receiveTimeout
設(shè)為0來應(yīng)對這種情況.
這里有四個簡單可用的receive 方法.同發(fā)送方的交換器一樣, 有一種方法需要直接在模板本身上設(shè)置的默認隊列屬性, 還有一種方法需要在運行接受隊列參數(shù).
版本1.6 引入了接受timeoutMillis
的變種,基于每個請求重寫了receiveTimeout
方法.
Message receive() throws AmqpException; Message receive(String queueName) throws AmqpException; Message receive(long timeoutMillis) throws AmqpException; Message receive(String queueName, long timeoutMillis) throws AmqpException;
與發(fā)送消息的情況類似, AmqpTemplate
有一些便利的方法來接收POJOs 而非Message
實例, 其實現(xiàn)可提供一種方法來定制MessageConverter
以用于創(chuàng)建返回的Object
:
Object receiveAndConvert() throws AmqpException; Object receiveAndConvert(String queueName) throws AmqpException; Message receiveAndConvert(long timeoutMillis) throws AmqpException; Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
類似于sendAndReceive
方法,從1.3版本開始, AmqpTemplate
有多個便利的receiveAndReply
方法同步接收,處理,以及回應(yīng)消息:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException; <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException; <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate
實現(xiàn)會負責(zé)receive 和 reply 階段.在大多數(shù)情況下,如果有必要,你只需要提供ReceiveAndReplyCallback
的實現(xiàn)來為收到的消息執(zhí)行某些業(yè)務(wù)邏輯或為收到的消息構(gòu)建回應(yīng)對象.
注意,ReceiveAndReplyCallback
可能返回null
. 在這種情況下,將不會發(fā)送回應(yīng),receiveAndReply
的工作類似于receive
方法. 這允許相同的隊列用于消息的混合物,其中一些可能不需要答復(fù)。
自動消息(請求和應(yīng)答)轉(zhuǎn)換只能適應(yīng)于提供的回調(diào)不是ReceiveAndReplyMessageCallback 實例的情況下- 它提供了一個原始的消息交換合同。
ReplyToAddressCallback
只在這種情況中有用,需要根據(jù)收到的信息通過自定義邏輯來決定replyTo
地址,并在ReceiveAndReplyCallback中進行回應(yīng)的情況
. 默認情況下,請求消息中的 replyTo
信息用來路由回復(fù).
下面是一個基于POJO的接收和回復(fù)…
boolean received = this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() { public Invoice handle(Order order) { return processOrder(order); } }); if (received) { log.info("We received an order!"); }
異步消費者
@RabbitListener
注解)并提供了一個開放的基礎(chǔ)設(shè)施,編程注冊端點。這是目前為止建立一個異步消費者的最方便方式, 參考the section called “Annotation-driven Listener Endpoints”來了解更多詳情.
消息監(jiān)聽器
對于異步消息接收, 會涉及到一個專用組件(不是AmqpTemplate
).此組件可作為消息消費回調(diào)的容器.
稍后,我們會講解這個容器和它的屬性,但首先讓我們來看一下回調(diào),因為這里是你的應(yīng)用程序代碼與消息系統(tǒng)集成的地方. MessageListener
接口:
public interface MessageListener { void onMessage(Message message); }
如果出于任何理由,你的回調(diào)邏輯需要依賴于AMQP Channel實例,那么你可以使用ChannelAwareMessageListener
. 它看起來是很相似的,但多了一個額外的參數(shù):
public interface ChannelAwareMessageListener { void onMessage(Message message, Channel channel) throws Exception; }
MessageListenerAdapter
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo); listener.setDefaultListenerMethod("myMethod");
你也可以繼承適配器,并實現(xiàn)getListenerMethodName()
方法(基于消息來動態(tài)選擇不同的方法). 這個方法有兩個參數(shù):originalMessage
和extractedMessage
, 后者是轉(zhuǎn)換后的結(jié)果.默認情況下,需要配置SimpleMessageConverter
;
參考the section called “SimpleMessageConverter” 來了解更多信息以及其它轉(zhuǎn)換器的信息.
從1.4.2開始,原始消息包含consumerQueue
和 consumerTag
屬性,這些屬性可用來確定消息是從那個隊列中收到的.
從1.5版本開始,你可以配置消費者queue/tag到方法名稱的映射(map)以動態(tài)選擇要調(diào)用的方法.如果map中無條目,我們將退回到默認監(jiān)聽器方法.
容器
你已經(jīng)看過了消息監(jiān)聽回調(diào)上的各種各樣的選項,現(xiàn)在我們將注意力轉(zhuǎn)向容器. 基本上,容器處理主動(active)的職責(zé),這樣監(jiān)聽器回調(diào)可以保持被動(passive). 容器是“生命周期”組件的一個例子。
它提供了啟動和停止的方法.當(dāng)配置容器時,你本質(zhì)上縮短了AMQP Queue和 MessageListener
實例之間的距離.你必須提供一個ConnectionFactory
的引用,隊列名稱或隊列實例.
下面是使用默認實現(xiàn)SimpleMessageListenerContainer
的最基礎(chǔ)的例子:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory); container.setQueueNames("some.queue"); container.setMessageListener(new MessageListenerAdapter(somePojo));
作為一個主動組件, 最常見的是使用bean定義來創(chuàng)建監(jiān)聽器容器,這樣它就可以簡單地運行于后臺.這可以通過XML來完成:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
或者你可以@Configuration 風(fēng)格:
@Configuration
public class ExampleAmqpConfiguration { @Bean
public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory()); container.setQueueName("some.queue"); container.setMessageListener(exampleListener()); return container; } @Bean
public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean
public MessageListener exampleListener() { returnnew MessageListener() { publicvoid onMessage(Message message) { System.out.println("received: " + message); } }; } }
從RabbitMQ Version 3.2開始, broker支持消費者優(yōu)先級了(參考 Using Consumer Priorities with RabbitMQ).
這可以通過在消費者設(shè)置x-priority
參數(shù)來啟用.
SimpleMessageListenerContainer
現(xiàn)在支持設(shè)置消費者參數(shù):
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
為了方便,命名空間在listener元素上提供了priority
屬性:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
從1.3版本開始,容器監(jiān)聽的隊列可在運行時進行修改,參考 Section 3.1.18, “Listener Container Queues”.
auto-delete 隊列
當(dāng)容器配置為監(jiān)聽auto-delete
隊列或隊列有x-expires
選項或者broker配置了Time-To-Live 策略,隊列將在容器停止時(最后的消費者退出時)由broker進行刪除.
在1.3版本之前,容器會因隊列缺失而不能重啟; 當(dāng)連接關(guān)閉/打開時,RabbitAdmin
只能自動重新聲明隊列.
從1.3版本開始, 在啟動時,容器會使用RabbitAdmin
來重新聲明缺失的隊列.
您也可以使用條件聲明(the section called “Conditional Declaration”) 與auto-startup="false"
來管理隊列的延遲聲明,直到容器啟動.
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo"queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />
在這種情況下,隊列和交換器是由 containerAdmin
來聲明的,auto-startup="false"
因此在上下文初始化期間不會聲明元素.同樣,出于同樣原因,容器也不會啟動.當(dāng)容器隨后啟動時,它會使用containerAdmin引用來聲明元素
.
批量消息
批量消息會自動地通過監(jiān)聽器容器 (使用springBatchFormat
消息頭)來解批(de-batched). 拒絕批量消息中的任何一個都將導(dǎo)致整批消息被拒絕. 參考the section called “Batching” 來了解更多關(guān)于批量消息的詳情.
消費者失敗事件
從1.5版本開始,無論時候,當(dāng)監(jiān)聽器(消費者)經(jīng)歷某種失敗時,SimpleMessageListenerContainer
會發(fā)布應(yīng)用程序事件. 事件ListenerContainerConsumerFailedEvent
有下面的屬性:
container
- 消費者經(jīng)歷問題的監(jiān)聽容器.reason
- 失敗的文本原因。fatal
- 一個表示失敗是否是致命的boolean值;對于非致命異常,容器會根據(jù)retryInterval值嘗試重新啟動消費者.throwable
-捕捉到的Throwable
.
這些事件能通過實現(xiàn)ApplicationListener<ListenerContainerConsumerFailedEvent>來消費
.
當(dāng) concurrentConsumers
大于1時,系統(tǒng)級事件(如連接失敗)將發(fā)布到所有消費者.
如果消費者因隊列是專有使用而失敗了,默認情況下,在發(fā)布事件的時候,也會發(fā)出WARN
日志. 要改變?nèi)罩拘袨?需要在SimpleMessageListenerContainer的exclusiveConsumerExceptionLogger屬性中提供自定義的ConditionalExceptionLogger
.
也可參考the section called “Logging Channel Close Events”.