Terry.Li-彬

          虛其心,可解天下之問;專其心,可治天下之學;靜其心,可悟天下之理;恒其心,可成天下之業(yè)。

            BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            143 隨筆 :: 344 文章 :: 130 評論 :: 0 Trackbacks

          2.6.7 Wildcards
          ??? Wildcards用來支持聯(lián)合的名字分層體系(federated name hierarchies)。它不是JMS規(guī)范的一部分,而是ActiveMQ的擴展。ActiveMQ支持以下三種wildcards:

          • "." 用于作為路徑上名字間的分隔符。
          • "*" 用于匹配路徑上的任何名字。
          • ">" 用于遞歸地匹配任何以這個名字開始的destination。

          ?? 作為一種組織事件和訂閱感興趣那部分信息的一種方法,這個概念在金融市場領域已經(jīng)流行了一段時間了。設想你有以下兩個destination:

          • PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股價)
          • PRICE.STOCK.NYSE.SUNW (SUN在紐約證券交易所的股價)

          ?? 訂閱者可以明確地指定destination的名字來訂閱消息,或者它也可以使用wildcards來定義一個分層的模式來匹配它希望訂閱的destination。例如:

          Subscription Meaning
          PRICE.> Any price for any product on any exchange
          PRICE.STOCK.> Any price for a stock on any exchange
          PRICE.STOCK.NASDAQ.* Any stock price on NASDAQ
          PRICE.STOCK.*.IBM Any IBM stock price on any exchange

          ?

          2.6.8 Async Sends
          ??? ActiveMQ支持以同步(sync)方式或者異步(async)方式向broker發(fā)送消息。 使用何種方式對send方法的延遲有巨大的影響。對于生產(chǎn)者來說,既然延遲是決定吞吐量的重要因素,那么使用異步發(fā)送方式會極大地提高系統(tǒng)的性能。
          ??? ActiveMQ缺省使用異步傳輸方式。但是按照JMS規(guī)范,當在事務外發(fā)送持久化消息的時候,ActiveMQ會強制使用同步發(fā)送方式。在這種情況下, 每一次發(fā)送都是同步的,而且阻塞到收到broker的應答。這個應答保證了broker已經(jīng)成功地將消息持久化,而且不會丟失。但是這樣作也嚴重地影響了 性能。
          ??? 如果你的系統(tǒng)可以容忍少量的消息丟失,那么可以在事務外發(fā)送持久消息的時候,選擇使用異步方式。以下是幾種不同的配置方式:

          Java代碼
          1. cf?=? new ?ActiveMQConnectionFactory( "tcp://locahost:61616?jms.useAsyncSend=true" );??
          2. ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);??
          3. ((ActiveMQConnection)connection).setUseAsyncSend(true);??

          ?

          2.6.9 Dispatch Policies
          2.6.9.1 Round Robin Dispatch Policy
          ??? 在2.6.4小節(jié)介紹過ActiveMQ的prefetch機制,ActiveMQ的缺省參數(shù)是針對處理大量消息時的高性能和高吞吐量而設置的。所以缺省 的prefetch參數(shù)比較大,而且缺省的dispatch policies會嘗試盡可能快的填滿prefetch緩沖。然而在有些情況下,例如只有少量的消息而且單個消息的處理時間比較長,那么在缺省的 prefetch和dispatch policies下,這些少量的消息總是傾向于被分發(fā)到個別的consumer上。這樣就會因為負載的不均衡分配而導致處理時間的增加。
          ??? Round robin dispatch policy會嘗試平均分發(fā)消息,以下是ActiveMQ配置文件的一個例子:

          Xml代碼
          1. < destinationPolicy > ??
          2. ??<policyMap>??
          3. ????<policyEntries>??
          4. ??????<policyEntry?topic="FOO.>">??
          5. ????????<dispatchPolicy>??
          6. ??????????<roundRobinDispatchPolicy?/>??
          7. ????????</dispatchPolicy>??
          8. ??????</policyEntry>??
          9. ????</policyEntries>??
          10. ??</policyMap>??
          11. </ destinationPolicy > ??

          ?

          2.6.9.2 Strict Order Dispatch Policy
          ??? 有時候需要保證不同的topic consumer以相同的順序接收消息。通常ActiveMQ會保證topic consumer以相同的順序接收來自同一個producer的消息。然而,由于多線程和異步處理,不同的topic consumer可能會以不同的順序接收來自不同producer的消息。例如有兩個producer,分別是P和Q。差不多是同一時間內,P發(fā)送了 P1、P2和P3三個消息;Q發(fā)送了Q1和Q2兩個消息。兩個不同的consumer可能會以以下順序接收到消息:

          ?? consumer1: P1 P2 Q1 P3 Q2
          ??? consumer2: P1 Q1 Q2 P2 P3
          ??? Strict order dispatch policy 會保證每個topic consumer會以相同的順序接收消息,代價是性能上的損失。以下是采用了strict order dispatch policy后,兩個不同的consumer可能以以下的順序接收消息:
          ??? consumer1: P1 P2 Q1 P3 Q2
          ??? consumer2: P1 P2 Q1 P3 Q2

          ?? 以下是ActiveMQ配置文件的一個例子:

          Xml代碼
          1. < destinationPolicy > ??
          2. ??<policyMap>??
          3. ????<policyEntries>??
          4. ??????<policyEntry?topic=""FOO.>">??
          5. ????????<dispatchPolicy>??
          6. ??????????<strictOrderDispatchPolicy?/>??
          7. ????????</dispatchPolicy>??
          8. ??????</policyEntry>??
          9. ????</policyEntries>??
          10. ??</policyMap>??
          11. </ destinationPolicy > ??

          ?

          2.6.10 Message Cursors
          ??? 當producer發(fā)送的持久化消息到達broker之后,broker首先會把它保存在持久存儲中。接下來,如果發(fā)現(xiàn)當前有活躍的consumer,而 且這個consumer消費消息的速度能跟上producer生產(chǎn)消息的速度,那么ActiveMQ會直接把消息傳遞給broker內部跟這個 consumer關聯(lián)的dispatch queue;如果當前沒有活躍的consumer或者consumer消費消息的速度跟不上producer生產(chǎn)消息的速度,那么ActiveMQ會使用 Pending Message Cursors保存對消息的引用。在需要的時候,Pending Message Cursors把消息引用傳遞給broker內部跟這個consumer關聯(lián)的dispatch queue。以下是兩種Pending Message Cursors:

          • VM Cursor。在內存中保存消息的引用。
          • File Cursor。首先在內存中保存消息的引用,如果內存使用量達到上限,那么會把消息引用保存到臨時文件中。

          ?? 在缺省情況下,ActiveMQ 5.0根據(jù)使用的Message Store來決定使用何種類型的Message Cursors,但是你可以根據(jù)destination來配置Message Cursors。

          ??? 對于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor。可以使用的PendingDurableSubscriberMessageStoragePolicy有 vmDurableCursor 和 fileDurableSubscriberCursor。以下是ActiveMQ配置文件的一個例子:
          Xml代碼
          1. <destinationPolicy>??
          2. ??<policyMap>??
          3. ????<policyEntries>??
          4. ??????<policyEntry?topic="org.apache.>">??
          5. ????????<pendingSubscriberPolicy>??
          6. ??????????<vmCursor?/>??
          7. ????????</pendingSubscriberPolicy>??
          8. ????????<PendingDurableSubscriberMessageStoragePolicy>??
          9. ??????????<vmDurableCursor/>??
          10. ????????</PendingDurableSubscriberMessageStoragePolicy>??
          11. ??????</policyEntry>??
          12. ????</policyEntries>??
          13. ??</policyMap>??
          14. </destinationPolicy>??

          ?? 對于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。以下是ActiveMQ配置文件的一個例子:

          Xml代碼
          1. <destinationPolicy>??
          2. ??<policyMap>??
          3. ????<policyEntries>??
          4. ??????<policyEntry?queue="org.apache.>">??
          5. ????????<pendingQueuePolicy>??
          6. ??????????<vmQueueCursor?/>??
          7. ????????</pendingQueuePolicy>??
          8. ??????</policyEntry>??
          9. ????</policyEntries>??
          10. ??</policyMap>??
          11. </destinationPolicy>??

          ?

          2.6.11 Optimized Acknowledgement
          ??? ActiveMQ缺省支持批量確認消息。由于批量確認會提高性能,因此這是缺省的確認方式。如果希望在應用程序中禁止經(jīng)過優(yōu)化的確認方式,那么可以采用如下方法:

          Java代碼
          1. cf?=?new?ActiveMQConnectionFactory?("tcp://locahost:61616?jms.optimizeAcknowledge=false");??
          2. ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(false);??
          3. ((ActiveMQConnection)connection).setOptimizeAcknowledge(false);??

          ?

          2.6.12 Producer Flow Control
          ??? 同步發(fā)送消息的producer會自動使用producer flow control ;對于異步發(fā)送消息的producer,要使用producer flow control,你先要為connection配置一個ProducerWindowSize參數(shù),如下:

          Java代碼
          1. ((ActiveMQConnectionFactory)cf).setProducerWindowSize(1024000);??
          ??? ProducerWindowSize是producer在發(fā)送消息的過程中,收到broker對于之前發(fā)送消息的確認之前, 能夠發(fā)送消息的最大字節(jié)數(shù)。你也可以禁用producer flow control,以下是ActiveMQ配置文件的一個例子:
          Java代碼
          1. <destinationPolicy>??
          2. ??<policyMap>??
          3. ????<policyEntries>??
          4. ??????<policyEntry?topic="FOO.>"?producerFlowControl="false">??
          5. ????????<dispatchPolicy>??
          6. ??????????<strictOrderDispatchPolicy/>??
          7. ????????</dispatchPolicy>??
          8. ??????</policyEntry>??
          9. ????</policyEntries>??
          10. ??</policyMap>??
          11. </destinationPolicy>??

          ?

          2.6.13 Message Transformation
          ??? 有時候需要在JMS provider內部進行message的轉換。從4.2版本起,ActiveMQ 提供了一個MessageTransformer 接口用于進行消息轉換,如下:

          Java代碼
          1. public?interface?MessageTransformer?{??
          2. ????Message?producerTransform(Session?session,?MessageProducer?producer,?Message?message)?throws?JMSException;??
          3. ????Message?consumerTransform(Session?session,?MessageConsumer?consumer,?Message?message)throws?JMSException;??
          4. }??
          ??? 通過在以下對象上通過調用setTransformer方法來設置MessageTransformer:
          • ActiveMQConnectionFactory
          • ActiveMQConnection
          • ActiveMQSession
          • ActiveMQMessageConsumer
          • ActiveMQMessageProducer

          ?? MessageTransformer接口支持:

          • 在消息被發(fā)送到JMS provider的消息總線前進行轉換。通過producerTransform方法。
          • 在消息到達消息總線后,但是在consumer接收到消息前進行轉換。通過consumerTransform方法。

          ?? 以下是個簡單的例子:??

          ?

          Java代碼
          1. public?class?SimpleMessage?implements?Serializable?{??
          2. ????//??
          3. ????private?static?final?long?serialVersionUID?=?2251041841871975105L;??
          4. ??????
          5. ????//??
          6. ????private?String?id;??
          7. ????private?String?text;??
          8. ??????
          9. ????public?String?getId()?{??
          10. ????????return?id;??
          11. ????}??
          12. ????public?void?setId(String?id)?{??
          13. ????????this.id?=?id;??
          14. ????}??
          15. ????public?String?getText()?{??
          16. ????????return?text;??
          17. ????}??
          18. ????public?void?setText(String?text)?{??
          19. ????????this.text?=?text;??
          20. ????}??
          21. }??
          ??? 在producer內發(fā)送ObjectMessage,如下:
          Java代碼
          1. SimpleMessage?sm?=?new?SimpleMessage();??
          2. sm.setId("1");??
          3. sm.setText("this?is?a?sample?message");??
          4. ObjectMessage?message?=?session.createObjectMessage();??
          5. message.setObject(sm);??
          6. producer.send(message);??

          ?? 在consumer的session上設置一個MessageTransformer用于將ObjectMessage轉換成TextMessage,如下:

          Java代碼
          1. ((ActiveMQSession)session).setTransformer(new?MessageTransformer()?{??
          2. public?Message?consumerTransform(Session?session,?MessageConsumer?consumer,?Message?message)?throws?JMSException?{??
          3. ????ObjectMessage?om?=?(ObjectMessage)message;??
          4. ????XStream?xstream?=?new?XStream();??
          5. ????xstream.alias("simple?message",?SimpleMessage.class);??
          6. ????String?xml?=?xstream.toXML(om.getObject());??
          7. ????return?session.createTextMessage(xml);??
          8. }??
          9. ??
          10. public?Message?producerTransform(Session?session,?MessageProducer?consumer,?Message?message)?throws?JMSException?{??
          11. ????return?null;??
          12. }??
          13. });
          posted on 2010-09-01 22:29 禮物 閱讀(1048) 評論(0)  編輯  收藏 所屬分類: ActiveMQ
          主站蜘蛛池模板: 平邑县| 华坪县| 双牌县| 平和县| 电白县| 连山| 怀集县| 根河市| 宾川县| 丰城市| 滦南县| 丹凤县| 平山县| 麻城市| 迭部县| 屯留县| 鄂尔多斯市| 博白县| 洞口县| 桂东县| 定南县| 佳木斯市| 靖宇县| 卢湾区| 赤水市| 海阳市| 吕梁市| 涪陵区| 榆树市| 双桥区| 大同县| 秭归县| 阿鲁科尔沁旗| 开原市| 忻城县| 金堂县| 镇远县| 德令哈市| 武邑县| 淮滨县| 万州区|