2.5 Clustering
??? ActiveMQ從多種不同的方面提供了集群的支持。
2.5.1 Queue consumer clusters
???
ActiveMQ支持訂閱同一個(gè)queue的consumers上的集群。如果一個(gè)consumer失效,那么所有未被確認(rèn)
(unacknowledged)的消息都會(huì)被發(fā)送到這個(gè)queue上其它的consumers。如果某個(gè)consumer的處理速度比其它
consumers更快,那么這個(gè)consumer就會(huì)消費(fèi)更多的消息。
??? 需要注意的是,筆者發(fā)現(xiàn)AcitveMQ5.0版本的Queue consumer clusters存在一個(gè)bug:采用AMQ Message Store,運(yùn)行一個(gè)producer,兩個(gè)consumer,并采用如下的配置文件:
- < beans > ??
- ??<broker?xmlns="http://activemq.org/config/1.0"?brokerName="BugBroker1"?useJmx="true">??
- ????
- ????<transportConnectors>??
- ??????<transportConnector?uri="tcp://localhost:61616"/>??
- ????</transportConnectors>??
- ??????
- ????<persistenceAdapter>??
- ??????<amqPersistenceAdapter?directory="activemq-data/BugBroker1"?maxFileLength="32mb"/>??
- ????</persistenceAdapter>??
- ???????
- ??</broker>??
- </ beans > ??
?? 那么經(jīng)過一段時(shí)間后可能會(huì)報(bào)出如下錯(cuò)誤:
ERROR
[ActiveMQ Transport: tcp:///127.0.0.1:1843 -
RecoveryListenerAdapter.java:58 - RecoveryListenerAdapter] Message id
ID:versus-1837-1203915536609-0:2:1:1:419 could not be recovered from the
data store!
??? Apache官方文檔說,此bug已經(jīng)被修正,預(yù)定在5.1.0版本上體現(xiàn)。
?
2.5.2 Broker clusters
???
一個(gè)常見的場景是有多個(gè)JMS
broker,有一個(gè)客戶連接到其中一個(gè)broker。如果這個(gè)broker失效,那么客戶會(huì)自動(dòng)重新連接到其它的broker。在ActiveMQ中使
用failover:// 協(xié)議來實(shí)現(xiàn)這個(gè)功能。ActiveMQ3.x版本的reliable://協(xié)議已經(jīng)變更為failover://。
???
如果某個(gè)網(wǎng)絡(luò)上有多個(gè)brokers而且客戶使用靜態(tài)發(fā)現(xiàn)(使用Static Transport或Failover
Transport)或動(dòng)態(tài)發(fā)現(xiàn)(使用Discovery
Transport),那么客戶可以容易地在某個(gè)broker失效的情況下切換到其它的brokers。然而,stand alone
brokers并不了解其它brokers上的consumers,也就是說如果某個(gè)broker上沒有consumers,那么這個(gè)broker上的消
息可能會(huì)因得不到處理而積壓起來。目前的解決方案是使用Network of
brokers,以便在broker之間存儲(chǔ)轉(zhuǎn)發(fā)消息。ActiveMQ在未來會(huì)有更好的特性,用來在客戶端處理這個(gè)問題。
???
從ActiveMQ1.1版本起,ActiveMQ支持networks of
brokers。它支持分布式的queues和topics。一個(gè)broker會(huì)相同對(duì)待所有的訂閱(subscription):不管他們是來自本地的
客戶連接,還是來自遠(yuǎn)程broker,它都會(huì)遞送有關(guān)的消息拷貝到每個(gè)訂閱。遠(yuǎn)程broker得到這個(gè)消息拷貝后,會(huì)依次把它遞送到其內(nèi)部的本地連接上。
有兩種方式配置Network of brokers,一種是使用static transport,如下:
- < broker ? brokerName = "receiver" ? persistent = "false" ? useJmx = "false" > ??
- ??<transportConnectors>??
- ????<transportConnector?uri="tcp://localhost:62002"/>??
- ??</transportConnectors>??
- ??<networkConnectors>??
- ????<networkConnector?uri="static:(?tcp://localhost:61616,tcp://remotehost:61616)"/>??
- ??</networkConnectors>??
- ??…??
- </ broker > ??
- <broker?name="sender"?persistent="false"?useJmx="false">??
- ??<transportConnectors>??
- ????<transportConnector?uri="tcp://localhost:0"?discoveryUri="multicast://default"/>??
- ??</transportConnectors>??
- ??<networkConnectors>??
- ????<networkConnector?uri="multicast://default"/>??
- ??</networkConnectors>??
- ??...??
- </broker>??
Property | Default Value | Description |
name | bridge | name of the network - for more than one network connector between the same two brokers - use different names |
dynamicOnly | false | if true, only forward messages if a consumer is active on the connected broker |
decreaseNetworkConsumerPriority | false | decrease the priority for dispatching to a Queue consumer the further away it is (in network hops) from the producer |
networkTTL | 1 | the number of brokers in the network that messages and subscriptions can pass through |
conduitSubscriptions | true | multiple consumers subscribing to the same destination are treated as one consumer by the network |
excludedDestinations | empty | destinations matching this list won't be forwarded across the network |
dynamicallyIncludedDestinations | empty | destinations that match this list will be forwarded across the network n.b. an empty list means all destinations not in the excluded list will be forwarded |
staticallyIncludedDestinations | empty | destinations that match will always be passed across the network - even if no consumers have ever registered an interest |
duplex | false | if
true, a network connection will be used to both produce AND Consume
messages. This is useful for hub and spoke scenarios when the hub is
behind a firewall etc. |
??? 關(guān)于conduitSubscriptions屬性,這里稍稍說明一下。設(shè)想有兩個(gè)brokers,分別是brokerA和brokerB,它們之間用 forwarding bridge連接。有一個(gè)consumer連接到brokerA并訂閱queue:Q.TEST。有兩個(gè)consumers連接到brokerB,也是訂 閱queue:Q.TEST。這三個(gè)consumers有相同的優(yōu)先級(jí)。然后啟動(dòng)一個(gè)producer,它發(fā)送了30條消息到brokerA。如果 conduitSubscriptions=true,那么brokerA上的consumer會(huì)得到15條消息, 另外15條消息會(huì)發(fā)送給brokerB。此時(shí)負(fù)載并不均衡,因?yàn)榇藭r(shí)brokerA將brokerB上的兩個(gè)consumers視為一個(gè);如果 conduitSubscriptions=false,那么每個(gè)consumer上都會(huì)收到10條消息。以下是關(guān)于NetworkConnector屬 性的一個(gè)例子:
- <networkConnectors>??
- ??<networkConnector?uri="static://(tcp://localhost:61617)"??
- ?????name="bridge"?dynamicOnly="false"?conduitSubscriptions="true"??
- ?????decreaseNetworkConsumerPriority="false">??
- ?????<excludedDestinations>??
- ???????<queue?physicalName="exclude.test.foo"/>??
- ???????<topic?physicalName="exclude.test.bar"/>??
- ?????</excludedDestinations>??
- ?????<dynamicallyIncludedDestinations>??
- ???????<queue?physicalName="include.test.foo"/>??
- ???????<topic?physicalName="include.test.bar"/>??
- ?????</dynamicallyIncludedDestinations>??
- ?????<staticallyIncludedDestinations>??
- ???????<queue?physicalName="always.include.queue"/>??
- ???????<topic?physicalName="always.include.topic"/>??
- ?????</staticallyIncludedDestinations>??
- ??</networkConnector>??
- </networkConnectors>??
?
2.5.3 Master Slave
???
在一個(gè)網(wǎng)絡(luò)內(nèi)運(yùn)行多個(gè)brokers或者stand alone
brokers時(shí)存在一個(gè)問題,這就是消息在物理上只被一個(gè)broker持有,因此當(dāng)某個(gè)broker失效,那么你只能等待直到它重啟后,這個(gè)
broker上的消息才能夠被繼續(xù)發(fā)送(如果沒有設(shè)置持久化,那么在這種情況下,消息將會(huì)丟失)。Master Slave
背后的想法是,消息被復(fù)制到slave broker,因此即使master broker遇到了像硬件故障之類的錯(cuò)誤,你也可以立即切換到slave
broker而不丟失任何消息。
??? Master Slave是目前ActiveMQ推薦的高可靠性和容錯(cuò)的解決方案。以下是幾種不同的類型:
Master Slave Type | Requirements | Pros | Cons |
Pure Master Slave | None | No central point of failure | Requires manual restart to bring back a failed master and can only support 1 slave |
Shared File System Master Slave | A Shared File system such as a SAN | Run as many slaves as required. Automatic recovery of old masters | Requires shared file system |
JDBC Master Slave | A Shared database | Run as many slaves as required. Automatic recovery of old masters | Requires a shared database. Also relatively slow as it cannot use the high performance journal |
?
2.5.3.1 Pure Master Slave
??? Pure Master Slave的工作方式如下:
- Slave broker消費(fèi)master broker上所有的消息狀態(tài),例如消息、確認(rèn)和事務(wù)狀態(tài)等。只要slave broker連接到了master broker,它不會(huì)(也不被允許)啟動(dòng)任何network connectors或者transport connectors,所以唯一的目的就是復(fù)制master broker的狀態(tài)。
- Master broker只有在消息成功被復(fù)制到slave broker之后才會(huì)響應(yīng)客戶。例如,客戶的commit請(qǐng)求只有在master broker和slave broker都處理完畢commit請(qǐng)求之后才會(huì)結(jié)束。
- 當(dāng)master broker失效的時(shí)候,slave broker有兩種選擇,一種是slave broker啟動(dòng)所有的network connectors和transport connectors,這允許客戶端切換到slave broker;另外一種是slave broker停止。這種情況下,slave broker只是復(fù)制了master broker的狀態(tài)。
- 客戶應(yīng)該使用failover transport并且應(yīng)該首先嘗試連接master broker。例如:
failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false
設(shè)置randomize為false就可以讓客戶總是首先嘗試連接master broker(slave broker并不會(huì)接受任何連接,直到它成為了master broker)。
?? Pure Master Slave具有以下限制:
- 只能有一個(gè)slave broker連接到master broker。
- 在因master broker失效而導(dǎo)致slave broker成為master之后,之前的master broker只有在當(dāng)前的master broker(原slave broker)停止后才能重新生效。
- Master broker失效后而切換到slave broker后,最安全的恢復(fù)master broker的方式是人工處理。首先要停止slave broker(這意味著所有的客戶也要停止)。然后把slave broker的數(shù)據(jù)目錄中所有的數(shù)據(jù)拷貝到master broker的數(shù)據(jù)目錄中。然后重啟master broker和slave broker。
?? Master broker不需要特殊的配置。Slave broker需要進(jìn)行以下配置
- <broker?masterConnectorURI="tcp://masterhost:62001"?shutdownOnMasterFailure="false">??
- ????...??
- ????<transportConnectors>??
- ??????<transportConnector?uri="tcp://slavehost:61616"/>??
- ???</transportConnectors>??
- </broker>??
??? 其中的masterConnectorURI用于指向master broker,shutdownOnMasterFailure用于指定slave broker在master broker失效的時(shí)候是否需要停止。此外,也可以使用如下配置:
- <broker?brokerName="slave"?useJmx="false"??deleteAllMessagesOnStartup="true"??xmlns="http://activemq.org/config/1.0">??
- ??...??
- ??<services>??
- ????<masterConnector?remoteURI=?"tcp://localhost:62001"?userName="user"?password="password"/>??
- ??</services>??
- </broker>??
?? 需要注意的是,筆者認(rèn)為ActiveMQ5.0版本的Pure Master Slave仍然不夠穩(wěn)定。
?
2.5.3.2 Shared File System Master Slave
???
如果你使用SAN或者共享文件系統(tǒng),那么你可以使用Shared File System Master
Slave?;旧?,你可以運(yùn)行多個(gè)broker,這些broker共享數(shù)據(jù)目錄。當(dāng)?shù)谝粋€(gè)broker得到文件上的排他鎖之后,其它的broker便會(huì)
在循環(huán)中等待獲得這把鎖??蛻舳耸褂胒ailover transport來連接到可用的broker。當(dāng)master
broker失效的時(shí)候會(huì)釋放這把鎖,這時(shí)候其中一個(gè)slave broker會(huì)得到這把鎖從而成為master
broker。以下是ActiveMQ配置的一個(gè)例子:
- <broker?useJmx="false"??xmlns="http://activemq.org/config/1.0">??
- ???<persistenceAdapter>??
- ??????<journaledJDBC?dataDirectory="/sharedFileSystem/broker"/>??
- ???</persistenceAdapter>??
- ???…??
- </broker>??
?
2.5.3.3 JDBC Master Slave
??? JDBC Master Slave的工作原理跟Shared File System Master Slave類似,只是采用了數(shù)據(jù)庫作為持久化存儲(chǔ)。以下是ActiveMQ配置的一個(gè)例子:
- <beans>??
- ??<broker?xmlns="http://activemq.org/config/1.0"?brokerName="JdbcMasterBroker">??
- ????...??
- ????<persistenceAdapter>??
- ??????<jdbcPersistenceAdapter?dataSource="#mysql-ds"/>??
- ????</persistenceAdapter>??
- ??????
- ??</broker>??
- ????
- ??<bean?id="mysql-ds"?class="org.apache.commons.dbcp.BasicDataSource"?destroy-method="close">??
- ????<property?name="driverClassName"?value="com.mysql.jdbc.Driver"/>??
- ????<property?name="url"?value="jdbc:mysql://localhost:3306/test?relaxAutoCommit=true"/>??
- ????<property?name="username"?value="username"/>??
- ????<property?name="password"?value="passward"/>??
- ????<property?name="poolPreparedStatements"?value="true"/>??
- ??</bean>???
- </beans>??
?? 需要注意的是,如果你使用MySQL數(shù)據(jù)庫,需要首先執(zhí)行以下三條語句:(Apache官方文檔說,此bug已經(jīng)被修正,預(yù)定在5.1.0版本上體現(xiàn))
- ALTER?TABLE?activemq_acks?ENGINE?=?InnoDB;??
- ALTER?TABLE?activemq_lock?ENGINE?=?InnoDB;??
- ALTER?TABLE?activemq_msgs?ENGINE?=?InnoDB;