Terry.Li-彬

          虛其心,可解天下之問;專其心,可治天下之學;靜其心,可悟天下之理;恒其心,可成天下之業。

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            143 隨筆 :: 344 文章 :: 130 評論 :: 0 Trackbacks

          2.6 Features
          ??? ActiveMQ包含了很多功能強大的特性,下面簡要介紹其中的幾個。
          2.6.1 Exclusive Consumer
          ??? Queue中的消息是按照順序被分發到consumers的。然而,當你有多個consumers同時從相同的queue中提取消息時,你將失去這個保 證。因為這些消息是被多個線程并發的處理。有的時候,保證消息按照順序處理是很重要的。例如,你可能不希望在插入訂單操作結束之前執行更新這個訂單的操 作。
          ??? ActiveMQ從4.x版本起開始支持Exclusive Consumer (或者說Exclusive Queues)。 Broker會從多個consumers中挑選一個consumer來處理queue中所有的消息,從而保證了消息的有序處理。如果這個consumer 失效,那么broker會自動切換到其它的consumer。
          ??? 可以通過Destination Options 來創建一個Exclusive Consumer,如下:

          Java代碼
          1. queue?=? new ?ActiveMQQueue( "TEST.QUEUE?consumer.exclusive=true" );??
          2. consumer?=?session.createConsumer(queue);??
          ??? 順便說一下,可以給consumer設置優先級,以便針對網絡情況(如network hops)進行優化,如下:
          Java代碼
          1. queue?=?new?ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true?&consumer.priority=10");??

          ?

          2.6.2 Message Groups
          ??? 用Apache官方文檔的話說,Message Groups rock!它是Exclusive Consumer功能的增強。邏輯上,Message Groups 可以看成是一種并發的Exclusive Consumer。跟所有的消息都由唯一的consumer處理不同,JMS 消息屬性JMSXGroupID 被用來區分message group。Message Groups特性保證所有具有相同JMSXGroupID 的消息會被分發到相同的consumer(只要這個consumer保持active)。另外一方面,Message Groups特性也是一種負載均衡的機制。
          ??? 在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那么broker 會檢查是否有某個consumer擁有這個message group。如果沒有,那么broker會選擇一個consumer,并將它關聯到這個message group。此后,這個consumer會接收這個message group的所有消息,直到:

          • Consumer被關閉。
          • Message group被關閉。通過發送一個消息,并設置這個消息的JMSXGroupSeq為0。

          ?? 從4.1版本開始,ActiveMQ支持一個布爾字段JMSXGroupFirstForConsumer 。當某個message group的第一個消息被發送到consumer的時候,這個字段被設置。如果客戶使用failover transport連接到broker。在由于網絡問題等造成客戶重新連接到broker的時候,相同message group的消息可能會被分發到不同與之前的consumer,因此JMSXGroupFirstForConsumer字段也會被重新設置。?

          ?? 以下是使用message groups的例子:

          Java代碼
          1. Mesasge?message?=?session.createTextMessage("<foo>hey</foo>");??
          2. message.setStringProperty("JMSXGroupID",?"IBM_NASDAQ_20/4/05");??
          3. ...??
          4. producer.send(message);??

          2.6.3 JMS Selectors
          ??? JMS Selectors用于在訂閱中,基于消息屬性對進行消息的過濾。JMS Selectors由SQL92語法定義。以下是個Selectors的例子:
          Java代碼
          1. consumer?=?session.createConsumer(destination,?"JMSType?=?'car'?AND?weight?>?2500");??
          ???? 在JMS Selectors表達式中,可以使用IN、NOT IN、LIKE等,例如:
          ??? LIKE '12%3' ('123' true,'12993' true,'1234' false)
          ??? LIKE 'l_se' ('lose' true,'loose' false)
          ??? LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false)
          ??? 需要注意的是,JMS Selectors表達式中的日期和時間需要使用標準的long型毫秒值。另外表達式中的屬性不會自動進行類型轉換,例如:
          Java代碼
          1. myMessage.setStringProperty("NumberOfOrders",?"2");??
          ??? "NumberOfOrders > 1" 求值結果是false。關于JMS Selectors的詳細文檔請參考javax.jms.Message的javadoc。
          ??? 上一小節介紹的Message Groups雖然可以保證具有相同message group的消息被唯一的consumer順序處理,但是卻不能確定被哪個consumer處理。在某些情況下,Message Groups可以和JMS Selector一起工作,例如:
          ??? 設想有三個consumers分別是A、B和C。你可以在producer中為消息設置三個message groups分別是"A"、"B"和"C"。然后令consumer A使用"JMXGroupID = 'A'"作為selector。B和C也同理。這樣就可以保證message group A的消息只被consumer A處理。需要注意的是,這種做法有以下缺點:
          • producer必須知道當前正在運行的consumers,也就是說producer和consumer被耦合到一起。
          • 如果某個consumer失效,那么應該被這個consumer消費的消息將會一直被積壓在broker上。

          2.6.4 Pending Message Limit Strategy
          ??? 首先簡要介紹一下prefetch機制。ActiveMQ通過prefetch機制來提高性能,這意味這客戶端的內存里可能會緩存一定數量的消息。緩存消 息的數量由prefetch limit來控制。當某個consumer的prefetch buffer已經達到上限,那么broker不會再向consumer分發消息,直到consumer向broker發送消息的確認。可以通過在 ActiveMQConnectionFactory或者ActiveMQConnection上設置ActiveMQPrefetchPolicy對象 來配置prefetch policy。也可以通過connection options或者destination options來配置。例如:
          ??? tcp://localhost:61616?jms.prefetchPolicy.all=50
          ??? tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
          ??? queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
          ??? prefetch size的缺省值如下:

          • persistent queues (default value: 1000)
          • non-persistent queues (default value: 1000)
          • persistent topics (default value: 100)
          • non-persistent topics (default value: Short.MAX_VALUE -1)

          ??? 慢消費者會在非持久的topics上導致問題:一旦消息積壓起來,會導致broker把大量消息保存在內存中,broker也會因此而變慢。未來 ActiveMQ可能會實現磁盤緩存,但是這也還是會存在性能問題。目前ActiveMQ使用Pending Message Limit Strategy來解決這個問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限后,新消息到來時會丟棄舊消息。通過在配置文件的destination map中配置PendingMessageLimitStrategy,可以為不用的topic namespace配置不同的策略。目前有以下兩種:

          • ConstantPendingMessageLimitStrategy。這個策略使用常量限制。
            例如:<constantPendingMessageLimitStrategy limit="50"/>
          • PrefetchRatePendingMessageLimitStrategy。這個策略使用prefetch size的倍數限制。
            例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

          ?? 在以上兩種方式中,如果設置0意味著除了prefetch之外不再緩存消息;如果設置-1意味著禁止丟棄消息。?
          ??? 此外,你還可以配置消息的丟棄策略,目前有以下兩種:

          • oldestMessageEvictionStrategy。這個策略丟棄最舊的消息。
          • oldestMessageWithLowestPriorityEvictionStrategy。這個策略丟棄最舊的,而且具有最低優先級的消息。

          ?? 以下是個ActiveMQ配置文件的例子:

          Xml代碼
          1. <broker?persistent="false"?brokerName="${brokername}"?xmlns="http://activemq.org/config/1.0">??
          2. ????<destinationPolicy>??
          3. ??????<policyMap>??
          4. ????????<policyEntries>??
          5. ??????????<policyEntry?topic="PRICES.>">??
          6. ????????????<!--??10?seconds?worth?-->??
          7. ????????????<subscriptionRecoveryPolicy>??
          8. ??????????????<timedSubscriptionRecoveryPolicy?recoverDuration="10000"?/>??
          9. ????????????</subscriptionRecoveryPolicy>??
          10. ??????????????
          11. ????????????<!--?lets?force?old?messages?to?be?discarded?for?slow?consumers?-->??
          12. ????????????<pendingMessageLimitStrategy>??
          13. ??????????????<constantPendingMessageLimitStrategy?limit="10"/>??
          14. ????????????</pendingMessageLimitStrategy>??
          15. ??????????</policyEntry>??
          16. ????????</policyEntries>??
          17. ??????</policyMap>??
          18. ????</destinationPolicy>??
          19. ????...??
          20. </broker>??

          ?

          2.6.5 Composite Destinations
          ??? 從1.1版本起, ActiveMQ支持composite destinations。它允許用一個虛擬的destination 代表多個destinations。例如你可以通過composite destinations在一個操作中同時向12個queue發送消息。在composite destinations中,多個destination之間采用","分割。例如:

          Java代碼
          1. Queue?queue?=?new?ActiveMQQueue("FOO.A,FOO.B,FOO.C");??

          ?? 如果你希望使用不同類型的destination,那么需要加上前綴如queue:// 或topic://,例如:?

          Java代碼
          1. Queue?queue?=?new?ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");???

          ?? 以下是ActiveMQ配置文件進行配置的一個例子:

          Xml代碼
          1. <destinationInterceptors>??
          2. ??<virtualDestinationInterceptor>??
          3. ????<virtualDestinations>??
          4. ??????<compositeQueue?name="MY.QUEUE">??
          5. ????????<forwardTo>??
          6. ??????????<queue?physicalName="FOO"?/>??
          7. ??????????<topic?physicalName="BAR"?/>??
          8. ????????</forwardTo>??
          9. ??????</compositeQueue>??
          10. ????</virtualDestinations>??
          11. ??</virtualDestinationInterceptor>??
          12. </destinationInterceptors>??

          ?? 可以在轉發前,先通過JMS Selector判斷一個消息是否需要轉發,例如:

          Xml代碼
          1. <destinationInterceptors>??
          2. ??<virtualDestinationInterceptor>??
          3. ????<virtualDestinations>??
          4. ??????<compositeQueue?name="MY.QUEUE">??
          5. ????????<forwardTo>??
          6. ??????????<filteredDestination?selector="odd?=?'yes'"?queue="FOO"/>??
          7. ??????????<filteredDestination?selector="i?=?5"?topic="BAR"/>??
          8. ????????</forwardTo>??
          9. ??????</compositeQueue>??
          10. ????</virtualDestinations>??
          11. ??</virtualDestinationInterceptor>??
          12. </destinationInterceptors>??

          ?

          2.6.6 Mirrored Queues
          ??? 每個queue中的消息只能被一個consumer消費。然而,有時候你可能希望能夠監視生產者和消費者之間的消息流。你可以通過使用Virtual Destinations 來建立一個virtual queue 來把消息轉發到多個queues中。但是 為系統中每個queue都進行如此的配置可能會很麻煩。
          ??? ActiveMQ支持Mirrored Queues。Broker會把發送到某個queue的所有消息轉發到一個名稱類似的topic,因此監控程序可以訂閱這個mirrored queue topic。為了啟用Mirrored Queues,首先要將BrokerService的useMirroredQueues屬性設置成true,然后可以通過 destinationInterceptors設置其它屬性,如mirror topic的前綴,缺省是"VirtualTopic.Mirror."。以下是ActiveMQ配置文件的一個例子:

          Xml代碼
          1. <broker?xmlns="http://activemq.org/config/1.0"?brokerName="MirroredQueuesBroker1"?useMirroredQueues="true">??
          2. ??
          3. ??<transportConnectors>??
          4. ????<transportConnector?uri="tcp://localhost:61616"/>??
          5. ??</transportConnectors>??
          6. ????
          7. ??<destinationInterceptors>??
          8. ??????<mirroredQueue?copyMessage?=?"true"?prefix="Mirror.Topic"/>??
          9. ??</destinationInterceptors>??
          10. ??...??
          11. </broker>??
          ??? 假如某個producer向名為Foo.Bar的queue中發送消息,那么你可以通過訂閱名為Mirror.Topic.Foo.Bar的topic來獲得發送到Foo.Bar中的所有消息。
          posted on 2010-09-01 22:28 禮物 閱讀(1612) 評論(0)  編輯  收藏 所屬分類: ActiveMQ
          主站蜘蛛池模板: 六枝特区| 福海县| 潼关县| 临潭县| 吐鲁番市| 阿尔山市| 墨竹工卡县| 顺昌县| 衡南县| 福贡县| 离岛区| 巴里| 额尔古纳市| 湘潭市| 边坝县| 天柱县| 南投县| 米泉市| 新竹市| 长寿区| 南雄市| 吉安市| 阳原县| 宜良县| 兴安盟| 金昌市| 桓台县| 嘉义县| 武汉市| 汉中市| 闵行区| 雷山县| 城市| 仙居县| 邵阳县| 会宁县| 大同县| 江北区| 余庆县| 眉山市| 宁河县|