Terry.Li-彬

          虛其心,可解天下之問;專其心,可治天下之學;靜其心,可悟天下之理;恒其心,可成天下之業。

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            143 隨筆 :: 344 文章 :: 130 評論 :: 0 Trackbacks

          2.2 Transport
          ??? ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。以下簡單介紹其中的幾種,更多請參考Apache官方文檔。

          ?

          2.2.1 VM Transport
          ??? VM transport允許在VM內部通信,從而避免了網絡傳輸的開銷。這時候采用的連接不是socket連接,而是直接地方法調用。 第一個創建VM 連接的客戶會啟動一個embed VM broker,接下來所有使用相同的broker name的VM連接都會使用這個broker。當這個broker上所有的連接都關閉的時候,這個broker也會自動關閉。
          ??? 以下是配置語法:

          ?? vm://brokerName?transportOptions

          ?? 例如:vm://broker1?marshal=false&broker.persistent=false

          ?? Transport Options的可選值如下:

          Option Name Default Value Description
          Marshal false If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
          wireFormat default The name of the WireFormat to use
          wireFormat.* ? All the properties with this prefix are used to configure the wireFormat
          create true If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1
          broker.* ? All the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information

          ?

          ?? 以下是高級配置語法:

          ?? vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions

          ?? vm:broker:(tcp://localhost)?brokerOptions

          ??? 例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false

          ??? Transport Options的可選值如下:

          Option Name Default Value Description
          marshal false If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
          wireFormat default The name of the WireFormat to use
          wireFormat.* ? All the properties with this prefix are used to configure the wireFormat

          ?

          ?? 使用配置文件的配置語法:??
          ??? vm://localhost?brokerConfig=xbean:activemq.xml
          ??? 例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml

          ?

          ?? 使用Spring的配置:

          Xml代碼
          1. < bean ? id = "broker" ? class = "org.apache.activemq.xbean.BrokerFactoryBean" > ??
          2. ??<property?name="config"?value="classpath:org/apache/activemq/xbean/activemq.xml"?/>??
          3. ??<property?name="start"?value="true"?/>??
          4. </ bean > ??
          5. ??
          6. < bean ? id = "connectionFactory" ? class = "org.apache.activemq.ActiveMQConnectionFactory" ? depends-on = "broker" > ??
          7. ??<property?name="brokerURL"?value="vm://localhost"/>??
          8. </ bean > ??

          ?? 如果persistent是true,那么ActiveMQ會在當前目錄下創建一個缺省值是activemq-data的目錄用于持久化保存數據。需要注 意的是,如果程序中啟動了多個不同名字的VM broker,那么可能會有如下警告:Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException…可以通過在transportOptions中追加 broker.useJmx=false來禁用JMX來避免這個警告。

          ?

          2.2.2 TCP Transport
          ??? TCP transport 允許客戶端通過TCP socket連接到遠程的broker。以下是配置語法:
          ??? tcp://hostname:port?transportOptions
          ??? Transport Options的可選值如下:

          Option Name Default Value Description
          minmumWireFormatVersion 0 The minimum version wireformat that is allowed
          trace false Causes all commands that are sent over the transport to be logged
          useLocalHost true When true, it causes the local machines name to resolve to "localhost".
          socketBufferSize 64 * 1024 Sets the socket buffer size in bytes
          soTimeout 0 sets the socket timeout in milliseconds
          connectionTimeout 30000 A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored.
          wireFormat default The name of the WireFormat to use
          wireFormat.* ? All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information

          ?? 例如:tcp://localhost:61616?trace=false

          ?

          2.2.3 Failover Transport
          ??? Failover Transport是一種重新連接的機制,它工作于其它transport的上層,用于建立可靠的傳輸。它的配置語法允許制定任意多個復合的URI。 Failover transport會自動選擇其中的一個URI來嘗試建立連接。如果沒有成功,那么會選擇一個其它的URI來建立一個新的連接。以下是配置語法:
          ??? failover:(uri1,...,uriN)?transportOptions
          ??? failover:uri1,...,uriN
          ??? Transport Options的可選值如下:

          Option Name D efault Value Description
          initialReconnectDelay 10 How long to wait before the first reconnect attempt (in ms)
          maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts (in ms)
          useExponentialBackOff true Should an exponential backoff be used between reconnect attempts
          backOffMultiplier 2 The exponent used in the exponential backoff attempts
          maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
          randomize true use a random algorithm to choose the URI to use for reconnect from the list provided
          backup false initialize and hold a second transport connection - to enable fast failover

          ?? 例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100

          ?

          2.2.4 Discovery transport
          ??? Discovery transport是可靠的tranport。它使用Discovery transport來定位用來連接的URI列表。以下是配置語法:
          ??? discovery:(discoveryAgentURI)?transportOptions
          ??? discovery:discoveryAgentURI
          ??? Transport Options的可選值如下:

          Option Name Default Value Description
          initialReconnectDelay 10 How long to wait before the first reconnect attempt
          maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts
          useExponentialBackOff true Should an exponential backoff be used btween reconnect attempts
          backOffMultiplier 2 The exponent used in the exponential backoff attempts
          maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client

          ?? 例如:discovery:(multicast://default)?initialReconnectDelay=100??
          ??? 為了使用Discovery來發現broker,需要為broker啟用discovery agent。 以下是XML配置文件中的一個例子:

          Xml代碼
          1. < broker ? name = "foo" > ??
          2. ???<transportConnectors>??
          3. ??????<transportConnector?uri="tcp://localhost:0"?discoveryUri="multicast://default"/>??
          4. ????</transportConnectors>??
          5. ????...??
          6. </ broker > ??

          ?? 在使用Failover Transport或Discovery transport等能夠自動重連的transport的時候,需要注意的是:設想有兩個broker,它們都啟用AMQ Message Store作為持久化存儲,有一個producer和一個consumer連接到某個queue。當因其中一個broker失效時而切換到另一個 broker的時候,如果失效的broker的queue中還有未被consumer消費的消息,那么這個queue里的消息仍然滯留在失效broker 的中,直到失效的broker被修復并重新切換回這個被修復的broker后,之前被保留的消息才會被consumer消費掉。如果被處理的消息有時序限 制,那么應用程序就需要處理這個問題。另外也可以通過ActiveMQ集群來解決這個問題。

          ?? 在transport重連的時候,可以在connection上注冊TransportListener來獲得回調,例如:

          Java代碼
          1. (ActiveMQConnection)connection).addTransportListener( new ?TransportListener()?{??
          2. ????public?void?onCommand(Object?cmd)?{??
          3. ????}??
          4. ??
          5. ????public?void?onException(IOException?exp)?{??
          6. ????}??
          7. ??
          8. ????public?void?transportInterupted()?{??
          9. ????????//?The?transport?has?suffered?an?interruption?from?which?it?hopes?to?recover.??
          10. ????}??
          11. ??
          12. ????public?void?transportResumed()?{??
          13. ????????//?The?transport?has?resumed?after?an?interruption.??
          14. ????}??
          15. });
          posted on 2010-09-01 22:26 禮物 閱讀(709) 評論(0)  編輯  收藏 所屬分類: ActiveMQ
          主站蜘蛛池模板: 濮阳市| 长顺县| 常山县| 平果县| 英德市| 舞钢市| 米泉市| 富宁县| 正阳县| 金川县| 尉犁县| 塔河县| 高淳县| 太仆寺旗| 登封市| 边坝县| 南溪县| 海南省| 金坛市| 双江| 融水| 周至县| 阿城市| 黄大仙区| 锦屏县| 调兵山市| 宜君县| 怀仁县| 西城区| 镇平县| 日喀则市| 调兵山市| 莱芜市| 凭祥市| 两当县| 高陵县| 江口县| 拜城县| 新兴县| 自治县| 清镇市|