| |||||||||
日 | 一 | 二 | 三 | 四 | 五 | 六 | |||
---|---|---|---|---|---|---|---|---|---|
25 | 26 | 27 | 28 | 29 | 30 | 31 | |||
1 | 2 | 3 | 4 | 5 | 6 | 7 | |||
8 | 9 | 10 | 11 | 12 | 13 | 14 | |||
15 | 16 | 17 | 18 | 19 | 20 | 21 | |||
22 | 23 | 24 | 25 | 26 | 27 | 28 | |||
29 | 30 | 1 | 2 | 3 | 4 | 5 |
RabbitMQ的目標是盡可能廣泛地支持大部分平臺.RabbitMQ 可運行在任何支持Erlang的平臺上, 包括內嵌系統,多核集群,云服務器.
下面的平臺支持Erlang,因此也可以運行RabbitMQ:
RabbitMQ的開源版本大部分都部署在下面的平臺上:
RabbitMQ可運行Windows XP及其后續版本中(Server 2003, Vista, Windows 7, Windows 8, Windows 10, Server 2008 and Server 2012). 盡管沒有測試,但應該可以運行在Windows NT ,Windows 2000 上.
64位的Windows Erlang VM從R15版本開始可用.建議使用最新的64位Erlang版本來運行。參考Erlang version compatibility page.
雖沒有官方支持,Erlang 和 RabbitMQ 能運行在含有POSIX layer including Solaris, FreeBSD, NetBSD, OpenBSD的操作系統上.
RabbitMQ 可運行物理或虛擬硬件上. 這可以允許不支持的平臺通過仿真來運行RabbitMQ.
參考EC2 guide 來了解RabbitMQ如何運行在Amazon EC2上的更多信息.
rabbitmq-server — 啟動RabbitMQ AMQP server
rabbitmq-server [-detached]
RabbitMQ是AMQP的實現, 后者是高性能企業消息通信的新興標準. RabbitMQ server是AMQP 中間件的健壯,可擴展實現.
前端運行rabbitmq-server,它會顯示橫幅消息,會報告啟動時的過程信息,最后會顯示"broker running",以表明RabbitMQ中間件已經成功啟動。
要關閉server,只需要終止過程或使用rabbitmqctl(1)(即:rabbitmqctl stop).
默認是 /var/lib/rabbitmq/mnesia. 用于設置Mnesia 數據庫文件存放的目錄.
日志目錄 ,server生成的/var/log/rabbitmq. Log 日志文志會放置在文件會放置在此目錄.(如:window10下默認安裝時,日志目錄為:C:\Users\Administrator\AppData\Roaming\RabbitMQ\log)
默認是rabbit. 當你想在一臺機器上運行多個節點時,此配置是相當有用的, RABBITMQ_NODENAME在每個erlang-node和機器的組合中應該唯一。
參考clustering on a single machine guide 來更多細節.
默認情況下,RabbitMQ會綁定到所有網絡接口上,如果只想綁定某個網絡接口,可修改此設置。
默認是5672.
以后端的方式來啟動進程 ,注意,這會導致pid無法寫入到pid文件中.例如:
rabbitmq-server -detached
以后端方式來啟動RabbitMQ AMQP server.
rabbitmq-env.conf(5) rabbitmqctl(1)
1、下載推薦的安裝包
2、安裝
安裝依賴包
yum
install
unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
#rpm -ivh esl-erlang_18.3-1~centos~7_amd64.rpm
Error: Package: rabbitmq-server-3.6.1-1.noarch (/rabbitmq-server-3.6.1-1.noarch)
Requires: erlang >= R16B-3
You could try using --skip-broken to work around the problem
You could try running: rpm -Va --nofiles --nodigest
這是由于erlang的版本問題,其實是沒有影響的,你可以使用下面的命令進行安裝:
#rpm -ivh --nodeps rabbitmq-server-3.6.1-1.noarch.rpm
啟動
#service rabbitmq-server start --后臺方式運行
#service rabbitmq-server stop --停止運行
#service rabbitmq-server status --查看狀態
#rabbitmq-server start
可以看到使用的日志文件
日志目錄
/var/log/rabbitmq
#cat /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log 可以看到下面的日志記錄
...................................................................................................................................................................................................................................................
卸載
管理
Rabbitmq服務器的主要通過rabbitmqctl和rabbimq-plugins兩個工具來管理,以下是一些常用功能。
1). 服務器啟動與關閉
啟動: rabbitmq-server –detached
關閉:rabbitmqctl stop
若單機有多個實例,則在rabbitmqctlh后加–n 指定名稱
2). 插件管理
開啟某個插件:rabbitmq-pluginsenable xxx
關閉某個插件:rabbitmq-pluginsdisablexxx
注意:重啟服務器后生效。
3).virtual_host管理
新建virtual_host: rabbitmqctladd_vhost xxx
撤銷virtual_host:rabbitmqctl delete_vhost xxx
4). 用戶管理
新建用戶:rabbitmqctl add_user xxxpwd
刪除用戶: rabbitmqctl delete_user xxx
改密碼: rabbimqctlchange_password {username} {newpassword}
設置用戶角色:rabbitmqctlset_user_tags {username} {tag ...}
Tag可以為 administrator,monitoring, management
5). 權限管理
權限設置:set_permissions [-pvhostpath] {user} {conf} {write} {read}
Vhostpath
Vhost路徑
user
用戶名
Conf
一個正則表達式match哪些配置資源能夠被該用戶訪問。
Write
一個正則表達式match哪些配置資源能夠被該用戶讀。
Read
一個正則表達式match哪些配置資源能夠被該用戶訪問。
6). 獲取服務器狀態信息
服務器狀態:rabbitmqctl status
隊列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...]
Queueinfoitem可以為:name,durable,auto_delete,arguments,messages_ready,
messages_unacknowledged,messages,consumers,memory
Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...]
Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments.
Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...]
Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments
Connection信息:rabbitmqctllist_connections [connectioninfoitem ...]
Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。
Channel信息:rabbitmqctl list_channels[channelinfoitem ...]
Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked
常用命令:
查看所有隊列信息
# rabbitmqctl list_queues
關閉應用
# rabbitmqctl stop_app
啟動應用,和上述關閉命令配合使用,達到清空隊列的目的
# rabbitmqctl start_app
清除所有隊列
# rabbitmqctl reset
更多用法及參數,可以執行如下命令查看
# rabbitmqctl
rabbitmq-server start 或者 service rabbitmq-server start #啟動rabbitmq
rabbitmqctl list_exchanges
rabbitmqctl list_bindings
rabbitmqctl list_queues #分別查看當前系統種存在的Exchange和Exchange上綁定的Queue信息。
rabbitmqctl status #查看運行信息
rabbitmqctl stop #停止運行rabbitmq
rabbitmq-plugins enable rabbitmq_management
#啟動rabbitmq的圖形管理界面,這個操作必須重啟rabbitmq, 然后在web中 http://127.0.0.1:15672 用戶名和密碼都是guest guest。如果局域網無法訪問設置防火墻過濾規則或關閉防火墻。
RabbitMQ broker是一個或多個Erlang節點的邏輯分組,多個運行的RabbitMQ應用程序可共享用戶,虛擬主機,隊列,交換機,綁定以及運行時參數。有時我們將多個節點的集合稱為集群。
RabbitMQ broker操作所需的所有數據/狀態都可以在多個節點間復制. 例外是消息隊列,默認情況下它駐留在一個節點, 盡管它們對所有節點來說,是可見的,可達的.要在集群中跨節點復制隊列,可參考high availability 文檔(注意,你仍然先需要一個工作集群).
RabbitMQ節點彼此之間使用域名,要么是簡短的,要么是全限定的(FQDNs). 因此,集群中所有成員的主機名都必須是可解析的,也可用于機器上的命令行工具,如rabbitmqctl.
主機名解析可使用任何一種標準的操作系統提供方法:
要使用FQDNs, 參考RABBITMQ_USE_LONGNAME in the Configuration guide.
集群可以通過多種方式來構建:
一個集群的構成可以動態修改. 所有RabbitMQ brokers開始都是以單個節點來運行的. 這些節點可以加入到集群中, 隨后也可以脫離集群再次成為單一節點。
RabbitMQ brokers 可以容忍個別節點故障. 節點可以隨意地啟動和關閉,只要在已知關閉的時間內能夠聯系到集群節點.
RabbitMQ 集群有多種模式來處理網絡分化, 主要是一致性方向. 集群是在LAN中使用的,不推薦在WAN中運行集群. Shovel 或 Federation 插件對于跨WAN連接brokers ,有更好的解決方案. 注意 Shovel 和 Federation 不等同于集群.
節點可以是磁盤節點,也可以是內存節點。多數情況下,你希望所有的節點都是磁盤節點,但RAM節點是一種特殊情況,它可以提高集群中隊列和,交換機,綁定的性能. 當有疑問時,最好只使用磁盤節點。
下面是通過三臺機器-rabbit1, rabbit2, rabbit3來設置和操作RabbitMQ集群的文字記錄.
我們假設用戶已經登錄到這三臺機器上,并且都已經在機器上安裝了RabbitMQ,以及rabbitmq-server 和rabbitmqctl 腳本都已經在用戶的PATH環境變量中.
This transcript can be modified to run on a single host, as explained more details below.
RabbitMQ 節點和CLI 工具(如rabbitmqctl) 使用cookie來確定每個節點之間是否可以通信. 兩個節點之間要能通信,它們必須要有相同的共享密鑰Erlang cookie. cookie只是具有字母數字特征的字符串。只要你喜歡,它可長可短. 每個集群節點必須有相同的cookie.
當RabbitMQ 服務器啟動時,Erlang VM 會自動地創建一個隨機的cookie文件. 最簡單的處理方式是允許一個節點來創建文件,然后再將這個文件拷貝到集群的其它節點中。
在 Unix 系統中, cookie的通常位于/var/lib/rabbitmq/.erlang.cookie 或$HOME/.erlang.cookie.
在Windows中, 其位置在C:\Users\Current User\.erlang.cookie(%HOMEDRIVE% + %HOMEPATH%\.erlang.cookie) 或C:\Documents and Settings\Current User\.erlang.cookie, 對于RabbitMQ Windows service其位置在C:\Windows\.erlang.cookie。如果使用了Windows service , cookie可被放于這兩個位置中.
作為替代方案,你可以在 rabbitmq-server 和 rabbitmqctl 腳本中調用erl時,插入"-setcookie cookie"選項.
當cookie未配置時 (例如,不相同), RabbitMQ 會記錄這樣的錯誤"Connection attempt from disallowed node" and "Could not auto-cluster".
集群可通過重新配置,而將現有RabbitMQ 節點加入到集群配置中. 因此第一步是以正常的方式在所有節點上啟動RabbitMQ:
rabbit1$ rabbitmq-server -detached
rabbit2$ rabbitmq-server -detached
rabbit3$ rabbitmq-server -detached
這會創建三個獨立的RabbitMQ brokers, 每個節點一個,可通過cluster_status命令來驗證:
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.
以rabbitmq-server shell腳本來啟動RabbitMQ broker的節點名稱是rabbit@shorthostname,在這里,短節點名稱是小寫的(如上面的rabbit@rabbit1). 如果在windows上,你使用rabbitmq-server.bat批處理文件來啟動,短節點名稱是大寫的(如:rabbit@RABBIT1). 當你輸入節點名稱時,不論是大寫還是小寫的,這些字符串都必須精確匹配。
為了把這三個節點構建到一個集群中,我們可以告訴其中的兩個節點, 假設為rabbit@rabbit2 和 rabbit@rabbit3, 將加入到第三個節點的集群中,這第三個節點假設為rabbit@rabbit1.
首先我們將rabbit@rabbit2加入到rabbit@rabbit1的集群中. 要做到這一點,我們必須在rabbit@rabbit2 上停止RabbitMQ應用程序,并將其加入到rabbit@rabbit1 集群中, 然后再重啟RabbitMQ 應用程序.
注意:加入集群會隱式地重置節點, 因此這會刪除此節點上先前存在的所有資源和數據.(如何備份數據)
rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.
在每個節點上通過運行cluster_status 命令,我們可以看到兩個節點已經加入了集群:
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
現在我們將rabbit@rabbit3節點加入到同一個集群中. 操作步驟同上面的一致,除了這次我們選擇rabbit2來加入集群,但這并不重要:
rabbit3$ rabbitmqctl stop_app
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.
在任何一個節點上通過運行cluster_status命令,我們可以看到三個節點已經加入了集群:
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.
通過上面的步驟,當集群運行的時候,我們可以在任何時候將新的節點加入到集群中.
注意,加入到集群中的節點可在任何時候停止, 對于崩潰來說也沒有問題. 在這兩種情況下,集群剩余的節點將不受影響地繼續操作,當它們重啟的時候,這些崩潰的節點會再次自動追趕上其它的集群節點。
我們關閉了節點rabbit@rabbit1和rabbit@rabbit3,并在每步觀察集群的狀態:
rabbit1$ rabbitmqctl stop
Stopping and halting node rabbit@rabbit1 ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit3]}] ...done.
rabbit3$ rabbitmqctl stop
Stopping and halting node rabbit@rabbit3 ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2]}] ...done.
譯者注:關閉了rabbit1節點后,運行的節點已經沒有rabbit1節點了
現在我們再次啟動節點,并檢查集群狀態:
rabbit1$ rabbitmq-server -detached
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmq-server -detached
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.
這里有一些重要的警告:
當節點不再是集群的一部分時,可以明確地將其從集群中刪除. 首先我們將節點rabbit@rabbit3從集群中刪除, 以使其回歸獨立操作.要做到這一點,需要在rabbit@rabbit3節點上停止RabbitMQ 應用程序,重設節點,并重啟RabbitMQ應用程序.
rabbit3$ rabbitmqctl stop_app
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl reset
Resetting node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.
在節點上運行cluster_status 命令來確認rabbit@rabbit3節點現在已不再是集群的一部分,并且會獨自操作:
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.
我們也可以遠程地刪除節點,這是相當有用的,舉例來說,當處理無反應的節點時.舉例來說,我們可以從 rabbit@rabbit2中刪除rabbit@rabbi1.
rabbit1$ rabbitmqctl stop_app
Stopping node rabbit@rabbit1 ...done.
rabbit2$ rabbitmqctl forget_cluster_node rabbit@rabbit1
Removing node rabbit@rabbit1 from cluster ... ...done.
注意,rabbit1仍然認為它與rabbit2處在一個集群中,但嘗試啟動時會出現一個錯誤.這時,我們需要對其進行重置以使其能再次啟動.
rabbit1$ rabbitmqctl start_app
Starting node rabbit@rabbit1 ... Error: inconsistent_cluster: Node rabbit@rabbit1 thinks it's clustered with node rabbit@rabbit2, but rabbit@rabbit2 disagrees
rabbit1$ rabbitmqctl reset
Resetting node rabbit@rabbit1 ...done.
rabbit1$ rabbitmqctl start_app Starting node rabbit@mcnulty ... ...done.
現在, cluster_status 命令會顯示三個節點都是獨立節點,并且操作是獨立的:
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.
注意:rabbit@rabbit2節點仍然殘留有集群的狀態(譯者注:怎么看出來的呢?), 但是 rabbit@rabbit1 和rabbit@rabbit3 節點是新鮮的RabbitMQ brokers.如果我們想重新初始化rabbit@rabbit2節點,我們可以按其它節點的步驟來操作:
rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl reset
Resetting node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.
當從主版本或小版本進行升級時 (如:從3.0.x 到3.1.x,或從2.x.x 到3.x.x),或者是升級Erlang時, 整個集群在升級時必須記下來(taken down) (因為集群不會像這樣來運行多個混合的版本). 當從補丁版本升級到另一個時(如:從3.0.x 到3.0.y)時,這種情況是不會出現的;這些版本在集群中是可以混合使用的(例外是3.0.0不能與 3.0.x 系列后的版本混合).
在主版本與小版本之間升級時,RabbitMQ有必要的話會自動更新其持久化數據. 在集群中,此任務是由第一個磁盤節點來啟動的("upgrader"節點). 因此在升級RabbitMQ集群時,你不需要嘗試先啟動RAM節點,任何啟動的RAM節點都會發生錯誤,并且不能啟動.
雖然不是嚴格必須的,但使用磁盤節點來作為升級節點通常是好的主意,最后停止那個節點。
自動升級只適用于2.1.1及其之后的版本,如果你有更早的集群 ,你必須重新構建升級.
在某些情況下,在一臺機器上運行RabbitMQ節點的集群是有用的(試驗性質).
要在一臺機器上運行多個RabbitMQ節點,必須確保節點含有不同的節點名稱,數據存儲路徑,日志文件位置,綁定到不同的端口,并包含那些插件使用的端口等等 .參考配置指南中的RABBITMQ_NODENAME, RABBITMQ_NODE_PORT, 和 RABBITMQ_DIST_PORT文檔 ,以及 File and Directory Locations guide指南中的 RABBITMQ_MNESIA_DIR, RABBITMQ_CONFIG_FILE, and RABBITMQ_LOG_BASE。
你可以在同一個主機上通過重復調用rabbitmq-server(rabbitmq-server.bat on Windows)來手動地啟動多個節點 . 例如:
$ RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=hare rabbitmq-server -detached
$ rabbitmqctl -n hare stop_app
$ rabbitmqctl -n hare join_cluster rabbit@`hostname -s`
$ rabbitmqctl -n hare start_app
這會設置兩個節點的集群,這兩個節點都是磁盤節點. 注意,如果你想打開非AMQP的其它端口,你需要通過命令行進行配置:
$ RABBITMQ_NODE_PORT=5672 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit rabbitmq-server -detached
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=hare rabbitmq-server -detached
RabbitMQ節點使用主機名來相互通信.因此,所有節點名稱都集群中的節點應該都能被解析.對于像 rabbitmqctl這樣的工具來說,也是如此.
除此之外,默認情況下,RabbitMQ使用當前系統的主機名稱來命名數據庫目錄.如果主機名變了,將會創建一個空的數據庫.為避免數據丟失,應該總是設置一個固定的,可解析的主機名稱。無論何時,只要主機名變化了,你就必須要重啟RabbitMQ:
$ /etc/init.d/rabbitmq-server restart
類似的效果可通過使用 rabbit@localhost作為broker節點名稱來達到。這個解決方案的影響是集群將不會工作,因為選中的主機名不能被遠程主機所解析。當從遠程主機調用時,類似地rabbitmqctl命令也會失敗. 免遭此缺點的復雜方案是使用DNS,如:如果運行EC2,則使用 Amazon Route 53 。如果你想使用節點名稱的全限定主機名(RabbitMQ 默認使用短名稱),那么可使用DNS解析, 可設置環境變量 RABBITMQ_USE_LONGNAME=true.
當在一個數據中心或可靠網絡時,帶防火墻的集群節點是存在的,但這些節點通常被防火墻隔離。再一次聲明,當各節點之間的網絡連接不穩定時,集群不建議在WAN在使用。
在多數配置中,你需要打開4369和25672端口以使用集群正常工作.
Erlang 使用Port Mapper Daemon (epmd) 來解析集群中的節點名稱. 默認epmd端口是4369,但它可以通過ERL_EPMD_PORT環境變量進行修改.所有的節點都必須使用同一個端口。詳細信息可參考Erlang epmd manpage.
一旦分布式Erlang節點通過empd解析后,其它節點將會嘗試直接通信。默認地通信端口比RABBITMQ_NODE_PORT (即,默認是25672)高了20000. 這可以通過RABBITMQ_DIST_PORT 環境變量修改
客戶端可以正常連接到集群中的任意節點,如果那個節點發生故障了 ,只要有剩余集群節點幸存,當客戶端發現在關閉的連接時,它就能夠重新連接到剩余幸存的集群節點上。一般來說,將節點主機名稱或IP地址放到客戶端程序是極其不明智的,這會導致缺乏靈活性,并需要客戶端程序重新編輯,編譯,重新配置以適應集群配置變化或者集群節點變化。相反,我們建議采用更抽象的方法: 如有簡短TTL配置的動態DNS服務或普通的TCP負載均衡器. 一般來說,這方面的管理集群內連接節點是超出了RabbitMQ本身的范圍,我們建議使用其他技術專門設計來解決這些問題。
內存節點只在內存中保存其元數據。它不會像磁盤節點將元數據寫入到磁盤中,但它們擁有更好的性能。 然而,也應該注意到,由于持久化隊列數據總是存儲在磁盤上的,其性能提升只會影響資源管理(如: 添加/刪除隊列,交換機,或虛擬主機), 但不會影響發布或消費的速度.
內存節點是一個高級使用例子;當設置你的第一個集群時,你應該不使用它們。你應該用足夠的磁盤節點來處理冗余需求,然后如果有必要,再用內存節點進行擴展.
集群中只含有內存節點是相當脆弱的,如果集群停止了,你將不能再次啟動,并且會導致數據丟失。RabbitMQ在許多情況下,會阻止創建只包含內存節點的集群,但不能完全阻止。
(譯者注:在集群構建中,最好有兩個或以上的磁盤節點,然后再考慮使用內存節點進行擴展)
當節點加入集群時,我們可將其聲明為內存節點. 我們可以通過使用像先前rabbitmqctl join_cluster命令再加--ram標志來達到目的:
rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster --ram rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app Starting node rabbit@rabbit2 ...done.
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
我們可以將節點的類型從磁盤修改為內存,反之亦然. 假設我們想反轉rabbit@rabbit2 和 rabbit@rabbit1的節點類型,即先將內存節點轉換為磁盤節點,隨后再將其從磁盤節點轉換為內存節點.要做到這點,我們可以使用change_cluster_node_type命令. 首先節點必須先停止.
rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done. rabbit2$
rabbitmqctl change_cluster_node_type disc
Turning rabbit@rabbit2 into a disc node ... ...done. Starting node rabbit@rabbit2 ...done.
rabbit1$ rabbitmqctl stop_app
Stopping node rabbit@rabbit1 ...done.
rabbit1$ rabbitmqctl change_cluster_node_type ram
Turning rabbit@rabbit1 into a ram node ...
rabbit1$ rabbitmqctl start_app
Starting node rabbit@rabbit1 ...done.
RabbitMQ Java client 將com.rabbitmq.client作為其頂層包. 關鍵類和接口有:
核心API類是Connection和Channel, 它們代表對應AMQP 0-9-1 connection 和 channel. 在使用前,可像下面這樣來導入:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
下面的代碼會使用給定的參數連接到AMQP broker:
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
也可以使用URIs 來設置連接參數:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
Connection 接口可用來打開一個channel:
Channel channel = conn.createChannel();
channel現在可用來發送和接收消息,正如后續章節中描述的一樣.
要斷開連接,只需要簡單地關閉channel和connection:
channel.close(); conn.close();
關閉channel被認為是最佳實踐,但在這里不是嚴格必須的 - 當底層連接關閉的時候,channel也會自動關閉.
繼續上面的例子,下面的代碼聲明了一個交換器和一個隊列,然后再將它們進行綁定.
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
這實際上會聲明下面的對象,它們兩者都可以可選參數來定制. 在這里,它們兩個都沒有特定參數。
上面的函數然后使用給定的路由鍵來綁定隊列和交換器.
注意,當只有一個客戶端時,這是一種典型聲明隊列的方式:它不需要一個已知的名稱,其它的客戶端也不會使用它(exclusive),并會被自動清除(autodelete).
如果多個客戶端想共享帶有名稱的隊列,下面的代碼應該更適合:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
這實際上會聲明:
注意,Channel API 的方法都是重載的。這些 exchangeDeclare, queueDeclare 和queueBind 都使用的是預設行為.
這里也有更多參數的長形式,它們允許你按需覆蓋默認行為,允許你完全控制。
要向交換器中發布消息,可按下面這樣來使用Channel.basicPublish方法:
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
為了更好的控制,你可以使用重載方法來指定mandatory標志,或使用預先設置的消息屬性來發送消息:
channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);
這會使用分發模式2(持久化)來發送消息, 優先級為1,且content-type 為"text/plain".你可以使用Builder類來構建你自己的消息屬性對象:
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);
下面的例子使用自定義的headers來發布消息:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);
下面的例子使用expiration來發布消息:
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);
BasicProperties is an inner class of the autogenerated holder class AMQP.
Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.
Channel 實例不能在多個線程間共享。應用程序必須在每個線程中使用不同的channel實例,而不能將同個channel實例在多個線程間共享。 有些channl上的操作是線程安全的,有些則不是,這會導致傳輸時出現錯誤的幀交叉。
在多個線程共享channels也會干擾Publisher Confirms.
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
接收消息最高效的方式是用Consumer接口來訂閱。當消息到達時,它們會自動地進行分發,而不需要顯示地請求。
當在調用Consumers的相關方法時, 個別訂閱總是通過它們的consumer tags來確定的, consumer tags可通過客戶端或服務端來生成,參考 the AMQP specification document.
同一個channel上的消費者必須有不同的consumer tags.
實現Consumer的最簡單方式是繼承便利類DefaultConsumer.子類可通過在設置訂閱時,將其傳遞給basicConsume調用:
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {
@Override
publicvoid handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
在這里,由于我們指定了autoAck = false,因此消費者有必要應答分發的消息,最便利的方式是在handleDelivery 方法中處理.
更復雜的消費者可能需要覆蓋更多的方法,實踐中,handleShutdownSignal會在channels和connections關閉時調用,handleConsumeOk 會在其它消費者之前
消費者可實現handleCancelOk 和 handleCancel方法來接收顯示和隱式取消操作通知。
你可以使用Channel.basicCancel來顯示地取消某個特定的消費者:
channel.basicCancel(consumerTag);
passing the consumer tag.
消費者回調是在單獨線程上處理的,這意味著消費者可以安全地在Connection或Channel, 如queueDeclare, txCommit, basicCancel或basicPublish上調用阻塞方法。
每個Channel都有其自己的dispatch線程.對于一個消費者一個channel的大部分情況來說,這意味著消費者不會阻擋其它的消費者。如果在一個channel上多個消費者,則必須意識到長時間運行的消費者可能阻擋此channel上其它消費者回調調度.
要顯示地獲取一個消息,可使用Channel.basicGet.返回值是一個GetResponse實例, 在它之中,header信息(屬性) 和消息body都可以提取:
boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag(); ...
因為autoAck = false,你必須調用Channel.basicAck來應答你已經成功地接收了消息:
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message }
如果發布消息時,設置了"mandatory"標志,但如果消息不能路由的話,broker會將其返回到發送客戶端 (通過 AMQP.Basic.Return 命令).
要收到這種返回的通知, clients可實現ReturnListener接口,并調用Channel.setReturnListener.如果channel沒有配置return listener,那么返回的消息會默默地丟棄。
channel.setReturnListener(new ReturnListener() {
publicvoid handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {
...
}
});
return listener將被調用,例如,如果client使用"mandatory"標志向未綁定隊列的direct類型交換器發送了消息.
AMQP 0-9-1 connection和channel 使用相同的方法來管理網絡故障,內部故障,以及顯示本地關閉.
AMQP 0-9-1 connection 和 channel 有如下的生命周期狀態:
這些對象總是以closed狀態結束的,不管基于什么原因引發的關閉,比如:應用程序請求,內部client library故障, 遠程網絡請求或網絡故障.
AMQP connection 和channel 對象會持有下面與關閉相關的方法:
可以像這樣來簡單使用監聽器:
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) { ... } }
);
可通過顯示調用getCloseReason()方法或通過使用ShutdownListener類中的業務方法的cause參數來從ShutdownSignalException中獲取關閉原因的有用信息.
ShutdownSignalException 類提供方法來分析關閉的原因.通過調用isHardError()方法,我們可以知道是connection錯誤還是channel錯誤.getReason()會返回相關cause的相關信息,這些引起cause的方法形式-要么是AMQP.Channel.Close方法,要么是AMQP.Connection.Close (或者是null,如果是library中引發的異常,如網絡通信故障,在這種情況下,可通過getCause()方法來獲取信息).
public void shutdownCompleted(ShutdownSignalException cause) { if (cause.isHardError()) {
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication()) {
Method reason = cause.getReason(); ... } ... }
else { Channel ch = (Channel)cause.getReference(); ... } }
channel和connection對象的isOpen()方法不建議在在生產代碼中使用,因為此方法的返回值依賴于shutdown cause的存在性. 下面的代碼演示了竟爭條件的可能性:
public void brokenMethod(Channel channel) { if (channel.isOpen()) { // The following code depends on the channel being in open state. // However there is a possibility of the change in the channel state // between isOpen() and basicQos(1) call ... channel.basicQos(1); } }
相反,我們應該忽略這種檢查,并簡單地嘗試這種操作.如果代碼執行期間,connection的channel關閉了,那么將拋出ShutdownSignalException,這就表明對象處于一種無效狀態了.當broker意外關閉連接時,我們也應該捕獲由SocketException引發的IOException,或者當broker清理關閉時,捕獲ShutdownSignalException.
public void validMethod(Channel channel) { try { ... channel.basicQos(1); } catch (ShutdownSignalException sse) { // possibly check if channel was closed // by the time we started action and reasons for // closing it ... } catch (IOException ioe) { // check why connection was closed ... } }
Consumer 線程默認是通過一個新的ExecutorService線程池來自動分配的(參考下面的Receiving).如果需要在newConnection() 方法中更好地控制ExecutorService,可以使用定制的線程池.下面的示例展示了一個比正常分配稍大的線程池:
ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es);Executors 和 ExecutorService 都是java.util.concurrent包中的類.
當連接關閉時,默認的ExecutorService將會被shutdown(), 但用戶自定義的ExecutorService (如上面所示)將不會被shutdown(). 提供自定義ExecutorService的Clients必須確保最終它能被關閉(通過調用它的shutdown() 方法), 否則池中的線程可能會阻止JVM終止.
同一個executor service,可在多個連接之間共享,或者連續地在重新連接上重用,但在shutdown()后,則不能再使用.
使用這種特性時,唯一需要考慮的是:在消費者回調的處理過程中,是否有證據證明有嚴重的瓶頸. 如果沒有消費者執行回調,或很少,默認的配置是綽綽有余. 開銷最初是很小的,分配的全部線程資源也是有界限的,即使偶爾可能出現一陣消費活動.
可以傳遞一個Address數組給newConnection(). Address只是 com.rabbitmq.client 包中包含host和port組件的簡單便利類. 例如:
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1) , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr);將會嘗試連接hostname1:portnumber1, 如果不能連接,則會連接hostname2:portnumber2,然后會返回數組中第一個成功連接(不會拋出IOException)上broker的連接.這完全等價于在factory上重復調用factory.newConnection()方法來設置host和port, 直到有一個成功返回.
如果提供了ExecutorService(在factory.newConnection(es, addrArr)中使用),那么線程池將與第一個成功連接相關聯.
參考Heartbeats guide 來了解更多關于心跳及其在Java client中如何配置的更多信息.
像Google App Engine (GAE)這樣的環境會限制線程直接實例化. 在這樣的環境中使用RabbitMQ Java client, 需要配置一個定制的ThreadFactory,即使用合適的方法來實例化線程,如: GAE's ThreadManager. 下面是Google App Engine的相關代碼.
import com.google.appengine.api.ThreadManager; ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory());
clients和RabbitMQ節點之間的連接可發生故障. RabbitMQ Java client 支持連接和拓撲(queues, exchanges, bindings, 和consumers)的自動恢復. 大多數應用程序的連接自動恢復過程會遵循下面的步驟:
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();如果恢復因異常失敗(如. RabbitMQ節點仍然不可達),它會在固定時間間隔后進行重試(默認是5秒). 時間間隔可以進行配置:
ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);
當提供了address列表時,將會在所有address上逐個進行嘗試:ConnectionFactory factory = new ConnectionFactory(); Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")}; factory.newConnection(addresses);
可在可恢復連接和通道上注冊一個或多個恢復監聽器. 當啟用了連接恢復時,ConnectionFactory#newConnection 和 Connection#createChannel 的連接已實現了com.rabbitmq.client.Recoverable,并提供了兩個方法:
當連接失敗時,使用Channel.basicPublish方法發送的消息將會丟失. client不會保證在連接恢復后,消息會得到分發.要確保發布的消息到達了RabbitMQ,應用程序必須使用Publisher Confirms
拓撲恢復
拓撲恢復涉及交換器,隊列,綁定以及消費者恢復.默認是啟用的,但也可以禁用:
ConnectionFactory factory = new ConnectionFactory(); Connection conn = factory.newConnection(); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(false);
當使用手動應答時,在消息分發與應答之間可能存在網絡連接中斷. 在連接恢復后,RabbitMQ會在所有通道上重設delivery標記. 也就是說,使用舊delivery標記的basic.ack, basic.nack, 以及basic.reject將會引發channel exception. 為了避免發生這種情況, RabbitMQ Java client可以跟蹤,更新,以使它們在恢復期間單調地增長. Channel.basicAck, Channel.basicNack, 以及Channel.basicReject 然后可以轉換這些 delivery tags,并且不再發送過期的delivery tags. 使用手動應答和自動恢復的應用程序必須負責處理重新分發.
關于connection, channel, recovery, 和consumer lifecycle 的異常將會委派給exception handler,Exception handler是實現了ExceptionHandler接口的任何對象. 默認情況下,將會使用DefaultExceptionHandler實例,它會將異常細節輸出到標準輸出上.
可使用ConnectionFactory#setExceptionHandler來覆蓋原始handler,它將被用于由此factory創建的所有連接:
ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);
Exception handlers 應該用于異常日志.在Google App Engine (GAE) 上使用RabbitMQ Java client,必須使用一個自定義的線程工廠來初始化線程,如使用GAE's ThreadManager. 此外,還需要設置一個較小的心跳間隔(4-5 seconds) 來避免InputStream 上讀超時:
ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);
為了能使拓撲恢復, RabbitMQ Java client 維持了聲明隊列,交換器,綁定的緩存. 緩存是基于每個連接的.某些RabbitMQ的特性使得客戶端不能觀察到拓撲的變化,如,當隊列因TTL被刪除時. RabbitMQ Java client 會嘗試在下面的情況中使用緩存實體失效:
為了便于編程, Java client API提供了一個使用臨時回復隊列的RpcClient類來提供簡單的RPC-style communication.
此類不會在RPC參數和返回值上強加任何特定格式. 它只是簡單地提供一種機制來向帶特定路由鍵的交換器發送消息,并在回復隊列上等待響應.
import com.rabbitmq.client.RpcClient;
RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);
(其實現細節為:請求消息使用basic.correlation_id唯一值字段來發送消息,并使用basic.reply_to來設置響應隊列的名稱.)
一旦你創建此類的實例,你可以使用下面的任意一個方法來發送RPC請求:
byte[] primitiveCall(byte[] message);
String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)
primitiveCall 方法會將原始byte數組轉換為請求和響應的消息體. stringCall只是一個primitiveCall的簡單包裝,將消息體視為帶有默認字符集編碼的String實例.
mapCall 變種稍為有些復雜: 它會將原始java值包含在java.util.Map中,并將其編碼為AMQP 0-9-1二進制表示形式,并以同樣的方式來解碼response. (注意:在這里,對一些值對象類型有所限制,具體可參考javadoc.)
所有的編組/解組便利方法都使用primitiveCall來作為傳輸機制,其它方法只是在它上面的做了一個封裝.