2.6 Features
??? ActiveMQ包含了很多功能強大的特性,下面簡要介紹其中的幾個。
2.6.1 Exclusive Consumer
???
Queue中的消息是按照順序被分發到consumers的。然而,當你有多個consumers同時從相同的queue中提取消息時,你將失去這個保
證。因為這些消息是被多個線程并發的處理。有的時候,保證消息按照順序處理是很重要的。例如,你可能不希望在插入訂單操作結束之前執行更新這個訂單的操
作。
??? ActiveMQ從4.x版本起開始支持Exclusive Consumer (或者說Exclusive Queues)。
Broker會從多個consumers中挑選一個consumer來處理queue中所有的消息,從而保證了消息的有序處理。如果這個consumer
失效,那么broker會自動切換到其它的consumer。
??? 可以通過Destination Options 來創建一個Exclusive Consumer,如下:
- queue?=? new ?ActiveMQQueue( "TEST.QUEUE?consumer.exclusive=true" );??
- consumer?=?session.createConsumer(queue);??
- queue?=?new?ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true?&consumer.priority=10");??
?
2.6.2 Message Groups
???
用Apache官方文檔的話說,Message Groups rock!它是Exclusive
Consumer功能的增強。邏輯上,Message Groups 可以看成是一種并發的Exclusive
Consumer。跟所有的消息都由唯一的consumer處理不同,JMS 消息屬性JMSXGroupID 被用來區分message
group。Message Groups特性保證所有具有相同JMSXGroupID
的消息會被分發到相同的consumer(只要這個consumer保持active)。另外一方面,Message
Groups特性也是一種負載均衡的機制。
???
在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么broker
會檢查是否有某個consumer擁有這個message
group。如果沒有,那么broker會選擇一個consumer,并將它關聯到這個message
group。此后,這個consumer會接收這個message group的所有消息,直到:
- Consumer被關閉。
- Message group被關閉。通過發送一個消息,并設置這個消息的JMSXGroupSeq為0。
?? 從4.1版本開始,ActiveMQ支持一個布爾字段JMSXGroupFirstForConsumer 。當某個message group的第一個消息被發送到consumer的時候,這個字段被設置。如果客戶使用failover transport連接到broker。在由于網絡問題等造成客戶重新連接到broker的時候,相同message group的消息可能會被分發到不同與之前的consumer,因此JMSXGroupFirstForConsumer字段也會被重新設置。?
?? 以下是使用message groups的例子:
- Mesasge?message?=?session.createTextMessage("<foo>hey</foo>");??
- message.setStringProperty("JMSXGroupID",?"IBM_NASDAQ_20/4/05");??
- ...??
- producer.send(message);??
2.6.3 JMS Selectors
??? JMS Selectors用于在訂閱中,基于消息屬性對進行消息的過濾。JMS Selectors由SQL92語法定義。以下是個Selectors的例子:
- consumer?=?session.createConsumer(destination,?"JMSType?=?'car'?AND?weight?>?2500");??
??? LIKE '12%3' ('123' true,'12993' true,'1234' false)
??? LIKE 'l_se' ('lose' true,'loose' false)
??? LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)
??? 需要注意的是,JMS Selectors表達式中的日期和時間需要使用標準的long型毫秒值。另外表達式中的屬性不會自動進行類型轉換,例如:
- myMessage.setStringProperty("NumberOfOrders",?"2");??
??? 上一小節介紹的Message Groups雖然可以保證具有相同message group的消息被唯一的consumer順序處理,但是卻不能確定被哪個consumer處理。在某些情況下,Message Groups可以和JMS Selector一起工作,例如:
??? 設想有三個consumers分別是A、B和C。你可以在producer中為消息設置三個message groups分別是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作為selector。B和C也同理。這樣就可以保證message group A的消息只被consumer A處理。需要注意的是,這種做法有以下缺點:
- producer必須知道當前正在運行的consumers,也就是說producer和consumer被耦合到一起。
- 如果某個consumer失效,那么應該被這個consumer消費的消息將會一直被積壓在broker上。
2.6.4 Pending Message Limit Strategy
???
首先簡要介紹一下prefetch機制。ActiveMQ通過prefetch機制來提高性能,這意味這客戶端的內存里可能會緩存一定數量的消息。緩存消
息的數量由prefetch limit來控制。當某個consumer的prefetch
buffer已經達到上限,那么broker不會再向consumer分發消息,直到consumer向broker發送消息的確認。可以通過在
ActiveMQConnectionFactory或者ActiveMQConnection上設置ActiveMQPrefetchPolicy對象
來配置prefetch policy。也可以通過connection options或者destination options來配置。例如:
??? tcp://localhost:61616?jms.prefetchPolicy.all=50
??? tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
??? queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
??? prefetch size的缺省值如下:
- persistent queues (default value: 1000)
- non-persistent queues (default value: 1000)
- persistent topics (default value: 100)
- non-persistent topics (default value: Short.MAX_VALUE -1)
??? 慢消費者會在非持久的topics上導致問題:一旦消息積壓起來,會導致broker把大量消息保存在內存中,broker也會因此而變慢。未來 ActiveMQ可能會實現磁盤緩存,但是這也還是會存在性能問題。目前ActiveMQ使用Pending Message Limit Strategy來解決這個問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限后,新消息到來時會丟棄舊消息。通過在配置文件的destination map中配置PendingMessageLimitStrategy,可以為不用的topic namespace配置不同的策略。目前有以下兩種:
- ConstantPendingMessageLimitStrategy。這個策略使用常量限制。
例如:<constantPendingMessageLimitStrategy limit="50"/> - PrefetchRatePendingMessageLimitStrategy。這個策略使用prefetch size的倍數限制。
例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
?? 在以上兩種方式中,如果設置0意味著除了prefetch之外不再緩存消息;如果設置-1意味著禁止丟棄消息。?
??? 此外,你還可以配置消息的丟棄策略,目前有以下兩種:
- oldestMessageEvictionStrategy。這個策略丟棄最舊的消息。
- oldestMessageWithLowestPriorityEvictionStrategy。這個策略丟棄最舊的,而且具有最低優先級的消息。
?? 以下是個ActiveMQ配置文件的例子:
- <broker?persistent="false"?brokerName="${brokername}"?xmlns="http://activemq.org/config/1.0">??
- ????<destinationPolicy>??
- ??????<policyMap>??
- ????????<policyEntries>??
- ??????????<policyEntry?topic="PRICES.>">??
- ????????????<!--??10?seconds?worth?-->??
- ????????????<subscriptionRecoveryPolicy>??
- ??????????????<timedSubscriptionRecoveryPolicy?recoverDuration="10000"?/>??
- ????????????</subscriptionRecoveryPolicy>??
- ??????????????
- ????????????<!--?lets?force?old?messages?to?be?discarded?for?slow?consumers?-->??
- ????????????<pendingMessageLimitStrategy>??
- ??????????????<constantPendingMessageLimitStrategy?limit="10"/>??
- ????????????</pendingMessageLimitStrategy>??
- ??????????</policyEntry>??
- ????????</policyEntries>??
- ??????</policyMap>??
- ????</destinationPolicy>??
- ????...??
- </broker>??
?
2.6.5 Composite Destinations
???
從1.1版本起, ActiveMQ支持composite destinations。它允許用一個虛擬的destination
代表多個destinations。例如你可以通過composite
destinations在一個操作中同時向12個queue發送消息。在composite
destinations中,多個destination之間采用","分割。例如:
- Queue?queue?=?new?ActiveMQQueue("FOO.A,FOO.B,FOO.C");??
?? 如果你希望使用不同類型的destination,那么需要加上前綴如queue:// 或topic://,例如:?
- Queue?queue?=?new?ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");???
?? 以下是ActiveMQ配置文件進行配置的一個例子:
- <destinationInterceptors>??
- ??<virtualDestinationInterceptor>??
- ????<virtualDestinations>??
- ??????<compositeQueue?name="MY.QUEUE">??
- ????????<forwardTo>??
- ??????????<queue?physicalName="FOO"?/>??
- ??????????<topic?physicalName="BAR"?/>??
- ????????</forwardTo>??
- ??????</compositeQueue>??
- ????</virtualDestinations>??
- ??</virtualDestinationInterceptor>??
- </destinationInterceptors>??
?? 可以在轉發前,先通過JMS Selector判斷一個消息是否需要轉發,例如:
- <destinationInterceptors>??
- ??<virtualDestinationInterceptor>??
- ????<virtualDestinations>??
- ??????<compositeQueue?name="MY.QUEUE">??
- ????????<forwardTo>??
- ??????????<filteredDestination?selector="odd?=?'yes'"?queue="FOO"/>??
- ??????????<filteredDestination?selector="i?=?5"?topic="BAR"/>??
- ????????</forwardTo>??
- ??????</compositeQueue>??
- ????</virtualDestinations>??
- ??</virtualDestinationInterceptor>??
- </destinationInterceptors>??
?
2.6.6 Mirrored Queues
???
每個queue中的消息只能被一個consumer消費。然而,有時候你可能希望能夠監視生產者和消費者之間的消息流。你可以通過使用Virtual
Destinations 來建立一個virtual queue 來把消息轉發到多個queues中。但是
為系統中每個queue都進行如此的配置可能會很麻煩。
??? ActiveMQ支持Mirrored
Queues。Broker會把發送到某個queue的所有消息轉發到一個名稱類似的topic,因此監控程序可以訂閱這個mirrored queue
topic。為了啟用Mirrored
Queues,首先要將BrokerService的useMirroredQueues屬性設置成true,然后可以通過
destinationInterceptors設置其它屬性,如mirror
topic的前綴,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一個例子:
- <broker?xmlns="http://activemq.org/config/1.0"?brokerName="MirroredQueuesBroker1"?useMirroredQueues="true">??
- ??
- ??<transportConnectors>??
- ????<transportConnector?uri="tcp://localhost:61616"/>??
- ??</transportConnectors>??
- ????
- ??<destinationInterceptors>??
- ??????<mirroredQueue?copyMessage?=?"true"?prefix="Mirror.Topic"/>??
- ??</destinationInterceptors>??
- ??...??
- </broker>??