隨筆 - 41  文章 - 7  trackbacks - 0
          <2025年6月>
          25262728293031
          1234567
          891011121314
          15161718192021
          22232425262728
          293012345

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

               摘要: 在本章節中我們將覆蓋:創建一個本地服務器集群創建一個簡單集群自動添加一個RabbitMQ 集群引入消息負載均衡器創建集群客戶端介紹RabbitMQ提供了各種各樣的特性以及集群功能.通過使用集群,一組適當配置的主機的行為與單個broker實例一樣,但集群帶有下面的目的:高可用性: 如果一個節點宕機了,分布式broker仍然能接受和處理消息.這方面內容會在Chapter 7,Developi...  閱讀全文
          posted @ 2016-06-15 20:54 胡小軍 閱讀(1952) | 評論 (2)編輯 收藏
               摘要: 在本章中我們將覆蓋:使用Spring來開發web監控程序使用Spring來開發異步web搜索使用STOMP來開發web監控程序介紹RabbitMQ可以像客戶端一樣使用在服務端。當前,RabbitMQ覆蓋了大部分使用的語言和技術來構建web程序,如PHP,Node.js, Python, Ruby, 以及其它.你可以在http://www.rabbitmq.com/devtools.html找到全部...  閱讀全文
          posted @ 2016-06-14 22:07 胡小軍 閱讀(2288) | 評論 (0)編輯 收藏
               摘要: 在本章中我們將覆蓋:使用.NET client通過MQTT綁定iPhone應用與RabbitMQ在Andriod上使用消息來更新Google Maps通過Andriod后端來發布消息使用Qpid來交換RabbitMQ消息使用Mosquitto來交換RabbitMQ消息使用.NET clients來綁定WCF程序介紹在前面的章節中,我們已經介紹了基本概念。現在,我們要使用這些概念來創建真實的應用程序...  閱讀全文
          posted @ 2016-06-13 20:25 胡小軍 閱讀(1776) | 評論 (2)編輯 收藏

          RabbitMQ的目標是盡可能廣泛地支持大部分平臺.RabbitMQ 可運行在任何支持Erlang的平臺上, 包括內嵌系統,多核集群,云服務器.

          下面的平臺支持Erlang,因此也可以運行RabbitMQ:

          • Linux
          • Windows, NT through 10
          • Windows Server 2003/2008/2012
          • Mac OS X
          • Solaris
          • FreeBSD
          • TRU64
          • VxWorks

          RabbitMQ的開源版本大部分都部署在下面的平臺上:

          • Ubuntu and Debian-based Linux distributions
          • Fedora, CentOS and RPM-based Linux distributions
          • openSUSE and derived distributions (including SLES and SLERT)
          • Mac OS X
          • Windows XP and later

          Windows

          RabbitMQ可運行Windows XP及其后續版本中(Server 2003, Vista, Windows 7, Windows 8, Windows 10, Server 2008 and Server 2012). 盡管沒有測試,但應該可以運行在Windows NT ,Windows 2000 上.

          64位的Windows Erlang VM從R15版本開始可用.建議使用最新的64位Erlang版本來運行。參考Erlang version compatibility page.

          通用UNIX

          雖沒有官方支持,Erlang 和 RabbitMQ 能運行在含有POSIX layer including Solaris, FreeBSD, NetBSD, OpenBSD的操作系統上.

          虛擬平臺

          RabbitMQ 可運行物理或虛擬硬件上. 這可以允許不支持的平臺通過仿真來運行RabbitMQ.
          參考EC2 guide 來了解RabbitMQ如何運行在Amazon EC2上的更多信息.

          posted @ 2016-06-06 00:09 胡小軍 閱讀(1245) | 評論 (0)編輯 收藏

          名稱

          rabbitmq-server — 啟動RabbitMQ AMQP server

          語法

          rabbitmq-server [-detached]

          描述

          RabbitMQ是AMQP的實現, 后者是高性能企業消息通信的新興標準. RabbitMQ server是AMQP 中間件的健壯,可擴展實現.

          前端運行rabbitmq-server,它會顯示橫幅消息,會報告啟動時的過程信息,最后會顯示"broker running",以表明RabbitMQ中間件已經成功啟動。

          要關閉server,只需要終止過程或使用rabbitmqctl(1)(即:rabbitmqctl stop).

          環境變量

          RABBITMQ_MNESIA_BASE

          默認是 /var/lib/rabbitmq/mnesia. 用于設置Mnesia 數據庫文件存放的目錄.

          RABBITMQ_LOG_BASE

          日志目錄 ,server生成的/var/log/rabbitmq. Log 日志文志會放置在文件會放置在此目錄.(如:window10下默認安裝時,日志目錄為:C:\Users\Administrator\AppData\Roaming\RabbitMQ\log

          RABBITMQ_NODENAME

          默認是rabbit. 當你想在一臺機器上運行多個節點時,此配置是相當有用的, RABBITMQ_NODENAME在每個erlang-node和機器的組合中應該唯一。

          參考clustering on a single machine guide 來更多細節.

          RABBITMQ_NODE_IP_ADDRESS

          默認情況下,RabbitMQ會綁定到所有網絡接口上,如果只想綁定某個網絡接口,可修改此設置。

          RABBITMQ_NODE_PORT

          默認是5672.

          選項

          -detached

          以后端的方式來啟動進程 ,注意,這會導致pid無法寫入到pid文件中.例如:

          rabbitmq-server -detached

          以后端方式來啟動RabbitMQ AMQP server.

          也可參考

          rabbitmq-env.conf(5) rabbitmqctl(1)

          posted @ 2016-06-06 00:06 胡小軍 閱讀(1185) | 評論 (0)編輯 收藏
               摘要: 本章我們將覆蓋:使用虛擬主機配置用戶使用SSL實現客戶端證書從瀏覽器中管理RabbitMQ配置RabbitMQ參數開Python程序來監控RabbitMQ自己開發web程序來監控RabbitMQ介紹一旦安裝后,RabbitMQ不需要任何配置就可以工作. 然而,RabbitMQ有許多的配置選項,這些配置選項使得它更為靈活,能工作于多種不同環境中.在本章中,我們將看到如何改變配置來滿足應用程序的需求。...  閱讀全文
          posted @ 2016-06-05 20:10 胡小軍 閱讀(1608) | 評論 (0)編輯 收藏

          一.安裝Erlang

          1、下載推薦的安裝包

          2、安裝

          安裝依賴包

          yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

          #rpm -ivh esl-erlang_18.3-1~centos~7_amd64.rpm

          二.安裝RabbitMQ
          下載RabbitMQ
          # wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm
          # rpm -ivh rabbitmq-server-3.6.1-1.noarch.rpm

          安裝rabbitmq-server的過程中遇到了一個問題:

          Error: Package: rabbitmq-server-3.6.1-1.noarch (/rabbitmq-server-3.6.1-1.noarch) 
          Requires: erlang >= R16B-3 
          You could try using --skip-broken to work around the problem 
          You could try running: rpm -Va --nofiles --nodigest

          這是由于erlang的版本問題,其實是沒有影響的,你可以使用下面的命令進行安裝:

          #rpm -ivh --nodeps rabbitmq-server-3.6.1-1.noarch.rpm


          啟動

          #service rabbitmq-server start --后臺方式運行

          #service rabbitmq-server stop  --停止運行

          #service rabbitmq-server status --查看狀態

          #rabbitmq-server start 

          可以看到使用的日志文件

          日志目錄

          /var/log/rabbitmq

          #cat /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log 可以看到下面的日志記錄

          ...................................................................................................................................................................................................................................................

          =INFO REPORT==== 28-Apr-2016::04:20:10 ===
          node           : rabbit@iZ94nxslz66Z
          home dir       : /var/lib/rabbitmq
          config file(s) : /etc/rabbitmq/rabbitmq.config (not found)
          cookie hash    : fisYwC976M1LblhTfYslpg==
          log            : /var/log/rabbitmq/rabbit@iZ94nxslz66Z.log
          sasl log       : /var/log/rabbitmq/rabbit@iZ94nxslz66Z-sasl.log
          database dir   : /var/lib/rabbitmq/mnesia/rabbit@iZ94nxslz66Z
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Memory limit set to 397MB of 992MB total.
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Disk free limit set to 50MB
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Limiting to approx 65435 file handles (58889 sockets)
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          FHC read buffering:  OFF
          FHC write buffering: ON
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Database directory at /var/lib/rabbitmq/mnesia/rabbit@iZ94nxslz66Z is empty. Initialising from scratch...
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Priority queues enabled, real BQ is rabbit_variable_queue
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Adding vhost '/'
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Creating user 'guest'
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Setting user tags for user 'guest' to [administrator]
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Setting permissions for 'guest' in '/' to '.*', '.*', '.*'
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          msg_store_transient: using rabbit_msg_store_ets_index to provide index
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          msg_store_persistent: using rabbit_msg_store_ets_index to provide index
          =WARNING REPORT==== 28-Apr-2016::04:20:11 ===
          msg_store_persistent: rebuilding indices from scratch
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          started TCP Listener on [::]:5672
          =INFO REPORT==== 28-Apr-2016::04:20:11 ===
          Server startup complete; 0 plugins started.
          =INFO REPORT==== 28-Apr-2016::04:21:52 ===
          Management plugin: using rates mode 'basic'
          =INFO REPORT==== 28-Apr-2016::04:21:52 ===
          Management plugin started. Port: 15672
          =INFO REPORT==== 28-Apr-2016::04:21:52 ===
          Statistics database started.
          =INFO REPORT==== 28-Apr-2016::04:21:52 ===
          Plugins changed; enabled [mochiweb,webmachine,rabbitmq_web_dispatch,
                                    amqp_client,rabbitmq_management_agent,
                                    rabbitmq_management], disabled []
          =INFO REPORT==== 28-Apr-2016::04:23:01 ===
          Stopping RabbitMQ
          =INFO REPORT==== 28-Apr-2016::04:23:01 ===
          stopped TCP Listener on [::]:5672
          =INFO REPORT==== 28-Apr-2016::04:23:01 ===
          Stopped RabbitMQ application
          =INFO REPORT==== 28-Apr-2016::04:23:01 ===
          Halting Erlang VM
          =INFO REPORT==== 28-Apr-2016::04:23:29 ===
          Starting RabbitMQ 3.6.1 on Erlang 18.3
          Copyright (C) 2007-2016 Pivotal Software, Inc.
          Licensed under the MPL.  See http://www.rabbitmq.com/
          ...................................................................................................................................................................................................................................................


          卸載

          #rpm -qa|grep rabbitmq
          rabbitmq-server-3.6.1-1.noarch
          #rpm -e --nodeps rabbitmq-server-3.6.1-1.noarch
          #rpm -qa|grep erlang
          esl-erlang-18.3-1.x86_64
          #rpm -e --nodeps esl-erlang-18.3-1.x86_64

          管理

          Rabbitmq服務器的主要通過rabbitmqctl和rabbimq-plugins兩個工具來管理,以下是一些常用功能。

          1). 服務器啟動與關閉

                啟動: rabbitmq-server –detached

                關閉:rabbitmqctl stop

                若單機有多個實例,則在rabbitmqctlh后加–n 指定名稱

          2). 插件管理

                開啟某個插件:rabbitmq-pluginsenable xxx

                關閉某個插件:rabbitmq-pluginsdisablexxx

                注意:重啟服務器后生效。

          3).virtual_host管理

                新建virtual_host: rabbitmqctladd_vhost  xxx

                撤銷virtual_host:rabbitmqctl  delete_vhost xxx

          4). 用戶管理

                新建用戶:rabbitmqctl add_user xxxpwd

                刪除用戶:   rabbitmqctl delete_user xxx

                改密碼: rabbimqctlchange_password {username} {newpassword}

                設置用戶角色:rabbitmqctlset_user_tags {username} {tag ...}

                        Tag可以為 administrator,monitoring, management

          5). 權限管理

                權限設置:set_permissions [-pvhostpath] {user} {conf} {write} {read}

                         Vhostpath

                         Vhost路徑

                         user

                用戶名

                        Conf

                一個正則表達式match哪些配置資源能夠被該用戶訪問。

                        Write

                一個正則表達式match哪些配置資源能夠被該用戶讀。

                         Read

                一個正則表達式match哪些配置資源能夠被該用戶訪問。

          6). 獲取服務器狀態信息

                 服務器狀態:rabbitmqctl status

                 隊列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...]

                          Queueinfoitem可以為:name,durable,auto_delete,arguments,messages_ready,

                          messages_unacknowledged,messages,consumers,memory

                 Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...]

                           Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments.

                 Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...]       

                           Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments

                 Connection信息:rabbitmqctllist_connections [connectioninfoitem ...]

                 Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。

                 Channel信息:rabbitmqctl  list_channels[channelinfoitem ...]

                Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked

           

           

          常用命令:

          查看所有隊列信息

          # rabbitmqctl list_queues

          關閉應用

          # rabbitmqctl stop_app

          啟動應用,和上述關閉命令配合使用,達到清空隊列的目的

          # rabbitmqctl start_app

          清除所有隊列

          # rabbitmqctl reset

          更多用法及參數,可以執行如下命令查看

          # rabbitmqctl

           

           

          rabbitmq常用命令

          rabbitmq-server start  或者   service rabbitmq-server start     #啟動rabbitmq

          rabbitmqctl list_exchanges 

          rabbitmqctl list_bindings

          rabbitmqctl list_queues #分別查看當前系統種存在的Exchange和Exchange上綁定的Queue信息。

          rabbitmqctl status  #查看運行信息

          rabbitmqctl stop     #停止運行rabbitmq

          rabbitmq-plugins enable rabbitmq_management  

          #啟動rabbitmq的圖形管理界面,這個操作必須重啟rabbitmq, 然后在web中 http://127.0.0.1:15672 用戶名和密碼都是guest guest。如果局域網無法訪問設置防火墻過濾規則或關閉防火墻。


          posted @ 2016-06-05 20:08 胡小軍 閱讀(3192) | 評論 (0)編輯 收藏

          概述

          RabbitMQ broker是一個或多個Erlang節點的邏輯分組,多個運行的RabbitMQ應用程序可共享用戶,虛擬主機,隊列,交換機,綁定以及運行時參數。有時我們將多個節點的集合稱為集群。

          什么是復制?

          RabbitMQ broker操作所需的所有數據/狀態都可以在多個節點間復制. 例外是消息隊列,默認情況下它駐留在一個節點, 盡管它們對所有節點來說,是可見的,可達的.要在集群中跨節點復制隊列,可參考high availability 文檔(注意,你仍然先需要一個工作集群).

          主機名解析需求

          RabbitMQ節點彼此之間使用域名,要么是簡短的,要么是全限定的(FQDNs). 因此,集群中所有成員的主機名都必須是可解析的,也可用于機器上的命令行工具,如rabbitmqctl.

          主機名解析可使用任何一種標準的操作系統提供方法:

          • DNS 記錄
          • 本地主機文件(e.g. /etc/hosts)
          在更加嚴格的環境中,DNS記錄或主機文件修改是受限的,不可能的或不受歡迎的, Erlang VM可通過使用替代主機名解析方法來配置, 如一個替代的DNS服務器,一個本地文件,一個非標準的主機文件位置或一個混合方法. 這些方法可以與標準操作主機名解析方法一起協同工作。

          要使用FQDNs, 參考RABBITMQ_USE_LONGNAME in the Configuration guide.

          集群構成

          集群可以通過多種方式來構建:

          一個集群的構成可以動態修改. 所有RabbitMQ brokers開始都是以單個節點來運行的. 這些節點可以加入到集群中, 隨后也可以脫離集群再次成為單一節點。

          故障處理

          RabbitMQ brokers 可以容忍個別節點故障. 節點可以隨意地啟動和關閉,只要在已知關閉的時間內能夠聯系到集群節點.

          RabbitMQ 集群有多種模式來處理網絡分化, 主要是一致性方向. 集群是在LAN中使用的,不推薦在WAN中運行集群. Shovel 或 Federation 插件對于跨WAN連接brokers ,有更好的解決方案. 注意 Shovel 和 Federation 不等同于集群.

          磁盤和內存節點

          節點可以是磁盤節點,也可以是內存節點。多數情況下,你希望所有的節點都是磁盤節點,但RAM節點是一種特殊情況,它可以提高集群中隊列和,交換機,綁定的性能. 當有疑問時,最好只使用磁盤節點。

          集群文字記錄(Transcript)

          下面是通過三臺機器-rabbit1rabbit2rabbit3來設置和操作RabbitMQ集群的文字記錄.

          我們假設用戶已經登錄到這三臺機器上,并且都已經在機器上安裝了RabbitMQ,以及rabbitmq-server 和rabbitmqctl 腳本都已經在用戶的PATH環境變量中.

          This transcript can be modified to run on a single host, as explained more details below.

          節點(以及CLI工具)之間如何來認證: Erlang Cookie

          RabbitMQ 節點和CLI 工具(如rabbitmqctl) 使用cookie來確定每個節點之間是否可以通信. 兩個節點之間要能通信,它們必須要有相同的共享密鑰Erlang cookie. cookie只是具有字母數字特征的字符串。只要你喜歡,它可長可短. 每個集群節點必須有相同的cookie.

          當RabbitMQ 服務器啟動時,Erlang VM 會自動地創建一個隨機的cookie文件. 最簡單的處理方式是允許一個節點來創建文件,然后再將這個文件拷貝到集群的其它節點中。

          在 Unix 系統中, cookie的通常位于/var/lib/rabbitmq/.erlang.cookie 或$HOME/.erlang.cookie.

          在Windows中, 其位置在C:\Users\Current User\.erlang.cookie(%HOMEDRIVE% + %HOMEPATH%\.erlang.cookie) 或C:\Documents and Settings\Current User\.erlang.cookie, 對于RabbitMQ Windows service其位置在C:\Windows\.erlang.cookie。如果使用了Windows service ,  cookie可被放于這兩個位置中.

          作為替代方案,你可以在 rabbitmq-server 和 rabbitmqctl 腳本中調用erl時,插入"-setcookie cookie"選項.

          當cookie未配置時 (例如,不相同), RabbitMQ 會記錄這樣的錯誤"Connection attempt from disallowed node" and "Could not auto-cluster".

          啟動獨立節點

          集群可通過重新配置,而將現有RabbitMQ 節點加入到集群配置中. 因此第一步是以正常的方式在所有節點上啟動RabbitMQ:

          rabbit1$ rabbitmq-server -detached 
          rabbit2$ rabbitmq-server -detached
          rabbit3$ rabbitmq-server -detached

          這會創建三個獨立的RabbitMQ brokers, 每個節點一個,可通過cluster_status命令來驗證:

          rabbit1$ rabbitmqctl cluster_status 
          Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
          rabbit3$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

          rabbitmq-server shell腳本來啟動RabbitMQ broker的節點名稱是rabbit@shorthostname,在這里,短節點名稱是小寫的(如上面的rabbit@rabbit1). 如果在windows上,你使用rabbitmq-server.bat批處理文件來啟動,短節點名稱是大寫的(如:rabbit@RABBIT1). 當你輸入節點名稱時,不論是大寫還是小寫的,這些字符串都必須精確匹配。

          創建集群

          為了把這三個節點構建到一個集群中,我們可以告訴其中的兩個節點, 假設為rabbit@rabbit2 和 rabbit@rabbit3, 將加入到第三個節點的集群中,這第三個節點假設為rabbit@rabbit1.

          首先我們將rabbit@rabbit2加入到rabbit@rabbit1的集群中. 要做到這一點,我們必須在rabbit@rabbit2 上停止RabbitMQ應用程序,并將其加入到rabbit@rabbit1 集群中, 然后再重啟RabbitMQ 應用程序. 

          注意:加入集群會隱式地重置節點, 因此這會刪除此節點上先前存在的所有資源和數據.(如何備份數據)

          rabbit2$ rabbitmqctl stop_app 
          Stopping node rabbit@rabbit2 ...done.
          rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
          Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
          rabbit2$ rabbitmqctl start_app
          Starting node rabbit@rabbit2 ...done.

          在每個節點上通過運行cluster_status 命令,我們可以看到兩個節點已經加入了集群:

          rabbit1$ rabbitmqctl cluster_status 
          Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.

          現在我們將rabbit@rabbit3節點加入到同一個集群中. 操作步驟同上面的一致,除了這次我們選擇rabbit2來加入集群,但這并不重要:

          rabbit3$ rabbitmqctl stop_app 
          Stopping node rabbit@rabbit3 ...done.
          rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
          Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
          rabbit3$ rabbitmqctl start_app
          Starting node rabbit@rabbit3 ...done.

          在任何一個節點上通過運行cluster_status命令,我們可以看到三個節點已經加入了集群:

          rabbit1$ rabbitmqctl cluster_status 
          Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}] ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit1,rabbit@rabbit2]}] ...done.
          rabbit3$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.

          通過上面的步驟,當集群運行的時候,我們可以在任何時候將新的節點加入到集群中.

          重啟集群節點

          注意,加入到集群中的節點可在任何時候停止, 對于崩潰來說也沒有問題. 在這兩種情況下,集群剩余的節點將不受影響地繼續操作,當它們重啟的時候,這些崩潰的節點會再次自動追趕上其它的集群節點。

          我們關閉了節點rabbit@rabbit1和rabbit@rabbit3,并在每步觀察集群的狀態:

          rabbit1$ rabbitmqctl stop 
          Stopping and halting node rabbit@rabbit1 ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit3,rabbit@rabbit2]}] ...done.
          rabbit3$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit3]}] ...done.
          rabbit3$ rabbitmqctl stop
          Stopping and halting node rabbit@rabbit3 ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2]}] ...done.

          譯者注:關閉了rabbit1節點后,運行的節點已經沒有rabbit1節點了

          現在我們再次啟動節點,并檢查集群狀態:

          rabbit1$ rabbitmq-server -detached 
          rabbit1$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
          rabbit3$ rabbitmq-server -detached
          rabbit1$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}] ...done.
          rabbit3$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}] ...done.

          這里有一些重要的警告:

          • 當整個集群崩潰的時候, 最后一個崩潰的節點必須第一個上線.如果不是這樣,節點將會等待最后一個磁盤節點30秒以確認其重新上線,否則就會失敗. 如果最后一個下線的節點,不能再重新上線,那么它可能會使用forget_cluster_node命令來從集群中刪除 - 查閱 rabbitmqctl頁面來了解更多信息.
          • 如果所有集群節點都在同一個時間內停止且不受控制(如斷電)。在這種情況下,你可以在某個節點上使用force_boot命令使其再次成為可啟動的-查閱 rabbitmqctl頁面來了解更多信息.

          脫離集群

          當節點不再是集群的一部分時,可以明確地將其從集群中刪除. 首先我們將節點rabbit@rabbit3從集群中刪除, 以使其回歸獨立操作.要做到這一點,需要在rabbit@rabbit3節點上停止RabbitMQ 應用程序,重設節點,并重啟RabbitMQ應用程序.

          rabbit3$ rabbitmqctl stop_app 
          Stopping node rabbit@rabbit3 ...done.
          rabbit3$ rabbitmqctl reset
          Resetting node rabbit@rabbit3 ...done.
          rabbit3$ rabbitmqctl start_app
          Starting node rabbit@rabbit3 ...done.

          在節點上運行cluster_status 命令來確認rabbit@rabbit3節點現在已不再是集群的一部分,并且會獨自操作:

          rabbit1$ rabbitmqctl cluster_status 
          Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.
          rabbit3$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

          我們也可以遠程地刪除節點,這是相當有用的,舉例來說,當處理無反應的節點時.舉例來說,我們可以從 rabbit@rabbit2中刪除rabbit@rabbi1.

          rabbit1$ rabbitmqctl stop_app 
          Stopping node rabbit@rabbit1 ...done.
          rabbit2$ rabbitmqctl forget_cluster_node rabbit@rabbit1
          Removing node rabbit@rabbit1 from cluster ... ...done.

          注意,rabbit1仍然認為它與rabbit2處在一個集群中,但嘗試啟動時會出現一個錯誤.這時,我們需要對其進行重置以使其能再次啟動.

          rabbit1$ rabbitmqctl start_app 
          Starting node rabbit@rabbit1 ... Error: inconsistent_cluster: Node rabbit@rabbit1 thinks it's clustered with node rabbit@rabbit2, but rabbit@rabbit2 disagrees
          rabbit1$ rabbitmqctl reset
          Resetting node rabbit@rabbit1 ...done.
          rabbit1$ rabbitmqctl start_app Starting node rabbit@mcnulty ... ...done.

          現在, cluster_status 命令會顯示三個節點都是獨立節點,并且操作是獨立的:

          rabbit1$ rabbitmqctl cluster_status 
          Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}] ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}] ...done.
          rabbit3$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit3 ... [{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}] ...done.

          注意:rabbit@rabbit2節點仍然殘留有集群的狀態(譯者注:怎么看出來的呢?), 但是 rabbit@rabbit1 和rabbit@rabbit3 節點是新鮮的RabbitMQ brokers.如果我們想重新初始化rabbit@rabbit2節點,我們可以按其它節點的步驟來操作:

          rabbit2$ rabbitmqctl stop_app 
          Stopping node rabbit@rabbit2 ...done.
          rabbit2$ rabbitmqctl reset
          Resetting node rabbit@rabbit2 ...done.
          rabbit2$ rabbitmqctl start_app
          Starting node rabbit@rabbit2 ...done.

          升級集群

          當從主版本或小版本進行升級時 (如:從3.0.x 到3.1.x,或從2.x.x 到3.x.x),或者是升級Erlang時, 整個集群在升級時必須記下來(taken down) (因為集群不會像這樣來運行多個混合的版本). 當從補丁版本升級到另一個時(如:從3.0.x 到3.0.y)時,這種情況是不會出現的;這些版本在集群中是可以混合使用的(例外是3.0.0不能與 3.0.x 系列后的版本混合).

          在主版本與小版本之間升級時,RabbitMQ有必要的話會自動更新其持久化數據. 在集群中,此任務是由第一個磁盤節點來啟動的("upgrader"節點). 因此在升級RabbitMQ集群時,你不需要嘗試先啟動RAM節點,任何啟動的RAM節點都會發生錯誤,并且不能啟動.

          雖然不是嚴格必須的,但使用磁盤節點來作為升級節點通常是好的主意,最后停止那個節點。

          自動升級只適用于2.1.1及其之后的版本,如果你有更早的集群 ,你必須重新構建升級.

          單臺機器上的集群

          在某些情況下,在一臺機器上運行RabbitMQ節點的集群是有用的(試驗性質). 

          要在一臺機器上運行多個RabbitMQ節點,必須確保節點含有不同的節點名稱,數據存儲路徑,日志文件位置,綁定到不同的端口,并包含那些插件使用的端口等等 .參考配置指南中的RABBITMQ_NODENAMERABBITMQ_NODE_PORT, 和 RABBITMQ_DIST_PORT文檔 ,以及 File and Directory Locations guide指南中的 RABBITMQ_MNESIA_DIRRABBITMQ_CONFIG_FILE, and RABBITMQ_LOG_BASE。

          你可以在同一個主機上通過重復調用rabbitmq-server(rabbitmq-server.bat on Windows)來手動地啟動多個節點 . 例如:

          $ RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 
          $ RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=hare rabbitmq-server -detached
          $ rabbitmqctl -n hare stop_app
          $ rabbitmqctl -n hare join_cluster rabbit@`hostname -s`
          $ rabbitmqctl -n hare start_app

          這會設置兩個節點的集群,這兩個節點都是磁盤節點. 注意,如果你想打開非AMQP的其它端口,你需要通過命令行進行配置

          $ RABBITMQ_NODE_PORT=5672 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit rabbitmq-server -detached 
          $ RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=hare rabbitmq-server -detached

          主機名稱變更

          RabbitMQ節點使用主機名來相互通信.因此,所有節點名稱都集群中的節點應該都能被解析.對于像 rabbitmqctl這樣的工具來說,也是如此.

          除此之外,默認情況下,RabbitMQ使用當前系統的主機名稱來命名數據庫目錄.如果主機名變了,將會創建一個空的數據庫.為避免數據丟失,應該總是設置一個固定的,可解析的主機名稱。無論何時,只要主機名變化了,你就必須要重啟RabbitMQ:

          $ /etc/init.d/rabbitmq-server restart

          類似的效果可通過使用 rabbit@localhost作為broker節點名稱來達到。這個解決方案的影響是集群將不會工作,因為選中的主機名不能被遠程主機所解析。當從遠程主機調用時,類似地rabbitmqctl命令也會失敗. 免遭此缺點的復雜方案是使用DNS,如:如果運行EC2,則使用 Amazon Route 53 。如果你想使用節點名稱的全限定主機名(RabbitMQ 默認使用短名稱),那么可使用DNS解析, 可設置環境變量 RABBITMQ_USE_LONGNAME=true.


          防火墻節點

          當在一個數據中心或可靠網絡時,帶防火墻的集群節點是存在的,但這些節點通常被防火墻隔離。再一次聲明,當各節點之間的網絡連接不穩定時,集群不建議在WAN在使用

          在多數配置中,你需要打開4369和25672端口以使用集群正常工作.

          Erlang 使用Port Mapper Daemon (epmd) 來解析集群中的節點名稱. 默認epmd端口是4369,但它可以通過ERL_EPMD_PORT環境變量進行修改.所有的節點都必須使用同一個端口。詳細信息可參考Erlang epmd manpage.

          一旦分布式Erlang節點通過empd解析后,其它節點將會嘗試直接通信。默認地通信端口比RABBITMQ_NODE_PORT (即,默認是25672)高了20000. 這可以通過RABBITMQ_DIST_PORT 環境變量修改

          跨集群Erlang版本

          集群中所有節點必須運行相同版本的Erlang.

          從客戶端連接集群

          客戶端可以正常連接到集群中的任意節點,如果那個節點發生故障了 ,只要有剩余集群節點幸存,當客戶端發現在關閉的連接時,它就能夠重新連接到剩余幸存的集群節點上。一般來說,將節點主機名稱或IP地址放到客戶端程序是極其不明智的,這會導致缺乏靈活性,并需要客戶端程序重新編輯,編譯,重新配置以適應集群配置變化或者集群節點變化。相反,我們建議采用更抽象的方法: 如有簡短TTL配置的動態DNS服務或普通的TCP負載均衡器. 一般來說,這方面的管理集群內連接節點是超出了RabbitMQ本身的范圍,我們建議使用其他技術專門設計來解決這些問題。

          內存節點集群

          內存節點只在內存中保存其元數據。它不會像磁盤節點將元數據寫入到磁盤中,但它們擁有更好的性能。 然而,也應該注意到,由于持久化隊列數據總是存儲在磁盤上的,其性能提升只會影響資源管理(如: 添加/刪除隊列,交換機,或虛擬主機), 但不會影響發布或消費的速度.

          內存節點是一個高級使用例子;當設置你的第一個集群時,你應該不使用它們。你應該用足夠的磁盤節點來處理冗余需求,然后如果有必要,再用內存節點進行擴展.

          集群中只含有內存節點是相當脆弱的,如果集群停止了,你將不能再次啟動,并且會導致數據丟失。RabbitMQ在許多情況下,會阻止創建只包含內存節點的集群,但不能完全阻止。

          (譯者注:在集群構建中,最好有兩個或以上的磁盤節點,然后再考慮使用內存節點進行擴展)

          創建內存節點

          當節點加入集群時,我們可將其聲明為內存節點. 我們可以通過使用像先前rabbitmqctl join_cluster命令再加--ram標志來達到目的:

          rabbit2$ rabbitmqctl stop_app 
          Stopping node rabbit@rabbit2 ...done.
          rabbit2$ rabbitmqctl join_cluster --ram rabbit@rabbit1
          Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
          rabbit2$ rabbitmqctl start_app Starting node rabbit@rabbit2 ...done.


          rabbit1$ rabbitmqctl cluster_status 
          Cluster status of node rabbit@rabbit1 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}] ...done.
          rabbit2$ rabbitmqctl cluster_status
          Cluster status of node rabbit@rabbit2 ... [{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]}, {running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}] ...done.

          改變節點類型

          我們可以將節點的類型從磁盤修改為內存,反之亦然. 假設我們想反轉rabbit@rabbit2 和 rabbit@rabbit1的節點類型,即先將內存節點轉換為磁盤節點,隨后再將其從磁盤節點轉換為內存節點.要做到這點,我們可以使用change_cluster_node_type命令. 首先節點必須先停止.

          rabbit2$ rabbitmqctl stop_app 
          Stopping node rabbit@rabbit2 ...done. rabbit2$
          rabbitmqctl change_cluster_node_type disc
          Turning rabbit@rabbit2 into a disc node ... ...done. Starting node rabbit@rabbit2 ...done.
          rabbit1$
          rabbitmqctl stop_app
          Stopping node rabbit@rabbit1 ...done.
          rabbit1$
          rabbitmqctl change_cluster_node_type ram
          Turning rabbit@rabbit1 into a ram node ...
          rabbit1$
          rabbitmqctl start_app
          Starting node rabbit@rabbit1 ...done.
          posted @ 2016-06-05 19:53 胡小軍 閱讀(3955) | 評論 (0)編輯 收藏
          本章我們將覆蓋:
          1. 如何使用消息過期
          2. 如何使指定隊列上的消息過期
          3. 如何讓隊列過期
          4. 管理駁回的(rejected)或過期的消息
          5. 理解其它備用交換器擴展
          6. 理解有效user-ID擴展
          7. 通知隊列消息者失敗
          8. 理解交換器到交換器擴展
          9. 在消息中嵌入消息目的地
          介紹
          在本章中,我們將展示關于RabbitMQ擴展上的一些食譜.這些擴展不是AMQP 0-9-1標準的一部分,使用它們會破壞其它AMQPbroker的兼容性。
          另一方面, 在AMQP 0-10 (http://www.amqp.org/specification/0-10/amqp-org-download)中也出現了輕微的變化,這是一個簡單通往那里的路徑.最后, 它們通常是優化問題的有效解決方案。

          本章中的例子將更為真實,例如,配置參數,如列表和交換器, 以及路由鍵名稱將定義在Constants接口中。事實上,一個真正的應用程序會遵循這樣的準則從配置文件中讀取配置文件,以在不同應用程序中共享。
          然而,在下面的例子中,為了更簡短和較好的可讀性,我們并沒有指定Constants的命名空間。

          如何讓消息過期
          在本食譜中,我們將展示如何讓消息過期.食譜的資源可在Chapter02/Recipe01/Java/src/rmqexample中找到,如:
          1. Producer.java
          2. Consumer.java
          3. GetOne.java
          準備
          為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
          如何做
          本示例的核心是Producer.java文件.為了產生在給定生存時間(TTL)后過期的消息,我們需要執行下面的步驟:
          1. 創建或聲明一個用來發送消息的交換器, 并將其綁定到隊列上,就像第1章使用AMQP看到的一樣:
          channel.exchangeDeclare(exchange, "direct", false);
          channel.queueDeclare(queue, false, false, false, null);
          channel.queueBind(queue, exchange, routingKey);
          2. 像下面這樣初始化可選消息屬性TTL:
          BasicPropertiesmsgProperties = new BasicProperties.Builder().expiration("20000").build();
          3. 使用下面的代碼來發布消息:
          channel.basicPublish(exchange, routingKey, msgProperties,statMsg.getBytes());
          如何工作
          在這個例子中,生產者創建了一個交換器,一個命名隊列,并將它們進行了綁定,當隊列上沒有附著任何消費者,過期消息就顯得非常有意義了。
          設置過期時間TTL (以毫秒設置),會促使RabbitMQ在消息過期時,如果消息沒有被客戶端及時消費,立即刪除消息.
          在我們的例子中,我們假設應用程序發布了JVM資源統計信息到給定隊列,如果存在消費者,那么會像平時一樣,獲取到實時數據,反之,如果不存在這樣的消費者,那么消息會給定生存時間后立即過期。通過這種方式,可以避免我們收集大量的數據。一旦消息者綁定到了隊列中,它會得到先前的消息(未過期)。進一步的試驗,你可以用GetOne.java文件來替換Consumer.java文件運行.
          在調用 channel.basicGet() 時,會使你一次只能消費一個消息。
          TIP
          可使用channel.basicGet()方法來檢查未消費消息的隊列.也可以通過為第二參數傳遞false來調用,即autoAck標志.

          在這里我們可以通過調用rabbitmqctl list_queues來監控RabbitMQ隊列的狀態。  

          也可參考
          默認情況下,過期消息會丟失,但它們可以路由到其它地方。可參考管理拒絕消息或過期消息食譜來了解更多信息.

          如何讓指定隊列上的消息過期
          在本食譜中,我們將展示指定消息TTL的第二種方式.這次,我們不再通過消息屬性來指定,而是通過緩存消息的隊列來進行指定。在這種情況下,生產者只是簡單地發布消息到交換器中,因此,在交換器上綁定標準隊列和過期消息隊列是可行的。
          要在這方面進行備注,須存在一個創建自定義的隊列的消費者。生產者是相當標準的.
          像前面的食譜一樣,你可以在Chapter02/Recipe02/Java/src/rmqexample找到這三個源碼。
           
          準備
          為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
          如何做
          現在我們將展示創建特定消息TTL隊列的必要步驟。在我們的例子中,需要在Consumer.java文件中執行下面的步驟:
          1. 按下面來聲明交換器:
          channel.exchangeDeclare(exchange, "direct", false);
          2. 創建或聲明隊列,像下在這樣為x-message-ttl可選參數指定10,000毫秒的超時時間:
          Map<String, Object> arguments = new HashMap<String, Object>();
          arguments.put("x-message-ttl", 10000);
          channel.queueDeclare(queue, false, false, false, arguments);
          3. 綁定隊列到交換器上:
          channel.queueBind(queue, exchange, routingKey);
          如何工作
          在這個例子中,為了最終分析,我們再次假設生產者發送了JVM統計數據給RabbitMQ。最終因為Producer.java文件將其發到一個交換機,如果無消費者連接的話,消息最終會丟失。
          想要監控或分析這些統計數據的消費有下面三種選擇:
          1. 綁定到一個臨時隊列,即調用無參的channel.queueDeclare()方法
          2. 綁定到一個非自動刪除的命名隊列
          3. 綁定到一個非自動刪除的命名隊列,并且指定x-message-ttl ,如步驟2中展示的一樣.
          在第一種情況中,消費者將獲取實時統計數據,但當它掉線期間,它將不能在數據上執行分析。
          在第二種情況中,為了能讓它掉線期間,能獲取到發送的消息,可以使用一個命名隊列(最終是持久化的).但在掉線較長時間后,再重啟時,它將有巨大的backlog來進行恢復,因此在隊列中可能存在大部分舊消息的垃圾。
          在第三種情況中,舊消息垃圾會通過RabbitMQ自己來執行,以使我們從消費者和broker中獲益。
          更多
          當設置per-queue TTL, 就像本食譜中看到的一樣,只要未到超時時間,消息就不會被丟棄,此時消費者還可以嘗試消費它們。
          當使用queue TTL時, 這里有一個細微的變化,但使用per-message TTL時,在broker隊列中可能會存在過期消息.
          在這種情況下,這些過期消息仍然會占據資源(內存),同時broker統計數據中仍然會計數,直到它們不會到隊列頭部時。
          也中參考
          在這種情況下,過期消息也會恢復。參考管理駁回或過期消息食譜.
          如何讓隊列過期
          在第三種情況中,TTL不關聯任何消息,只關聯對列。這種情況對于服務器重啟和更新,是一個完美的選擇。一旦TTL超時,在最后一個消費者停止消費后,RabbitMQ會丟棄隊列.
          前面TTL相關食譜,你可在Chapter02/Recipe03/Java/src/rmqexample 中找到 Producer.java ,  Consumer.java ,and  GetOne.java 相關文件。
          準備
          為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
          如何做
          在前面的例子中,擴展只需要關注Consumer.java :
          1. 使用下面的代碼來創建或聲明交換器:
          channel.exchangeDeclare(exchange, "direct", false);
          2. 創建或聲明隊列,并為x-expires可選參數指定30,000毫秒的超時時間:
          Map<String, Object> arguments = new HashMap<String,Object>();
          arguments.put("x-expires", 30000);
          channel.queueDeclare(queue, false, false, false,arguments);
          3. 將隊列綁定到交換器上:
          channel.queueBind(queue, exchange, routingKey);
          如何工作
          當我們運行Consumer.java或 GetOne.java 文件的時候, 超時隊列已經創建好了,在消費者附著到隊列上或調用channel.basicGet()時,它將持續存在.
          只有當我們停止這兩個操作超過30秒時,隊列才會被刪除,并且隊列包含的消息也會清除。
          TIP
          無論生產者是否向其發送了消息,隊列事實上都是獨立刪除的。

          在這個試驗課程中,我們可通過 rabbitmqctl list_queues 命令來監控RabbitMQ 隊列狀態.
          因此,我們可以想像一種場景,有一個統計分析程序需要重啟來更新其代碼。由于命名隊列有較長的超時時間,因此重啟時,不會丟失任何消息。如果我們停止,隊列會在超過TTL后被刪除,無價值的消息將不再存儲。
          管理駁回或過期消息
          在這個例子中,我們將展示如何使用死信交換器來管理過期或駁回的消息. 死信交換器是一種正常的交換器,死消息會在這里重定向,如果沒有指定,死消息會被broker丟棄。
          你可以在Chapter02/Recipe04/Java/src/rmqexample中找到源碼文件:
          1. Producer.java
          2. Consumer.java
          要嘗試過期消息,你可以使用第一個代碼來發送帶TTL的消息,就如如何使指定隊列上消息過期食譜中描述的一樣.
          一旦啟動了,消費者不允許消息過期,但可以可以駁回消息,最終導致成為死消息。
          準備
          為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
          如何做
          下面的步驟展示了使用死信交換器來管理過期或駁回消息:
          1. 創建一個工作交換品節和死信交換器:
          channel.exchangeDeclare(Constants.exchange, "direct", false);
          channel.exchangeDeclare(Constants.exchange_dead_letter,"direct", false);
          2. 創建使用使用死信交換器和 x-message-ttle參數的隊列:
          arguments.put("x-message-ttl", 10000);
          arguments.put("x-dead-letter-exchange",exchange_dead_letter);
          channel.queueDeclare(queue, false, false, false,arguments);
          3. 然后像下面這樣綁定隊列:
          channel.queueBind(queue, exchange, "");
          4. 最后使用channel.basicPublish()來向交換器發送消息 .
          5. 要嘗試駁回消息,我們需要配置一個消費者,就像前面例子中看到的一樣,并使用下面的代碼來駁回消息:
          basicReject(envelope.getDeliveryTag(), false);
          如何工作
          我們先從第一個場景開始(單獨使用producer): the expired messages. 在步驟中,我們創建兩個交換器,工作交換器和死信交換器。在步驟2中,我們使用下面兩個可選參數來創建隊列:
          1. 使用arguments.put("x-message-ttl", 10000)來設置消息TTL ,正如如何使指定隊列上消息過期食譜中描述的一樣.
          2. 使用arguments.put("x-dead-letter-exchange", exchange_dead_letter)來設置死信交換器名稱;
          正如你所看到的,我們只是在配置中添加了可選的隊列參數。因此,當生產者發送消息到交換器時,它會隊列參數來路由。消息會在10秒后過期,之后它會重定向到exchange_dead_letter 
          TIP
          死信交換器是一個標準的交換器,因此你可以基于任何目的來使用.
          對于第二種場景,食譜的消費者會駁回消息.當消費者得到消息后, 它會使用basicReject()方法來發回一個否定應答(nack),當broker收到nack時,它會將消息重定向到exchange_dead_letter. 通過在死信交換器上綁定隊列,你可以管理這些消息。
          當消息重定向到死信隊列時,broker會修改header消息,并在x-dead鍵中增加下面的值:
          1. reason : 表示隊列是否過期的或駁回的(requeue =false )
          2. queue : 表示隊列源,例如stat_queue_02/05
          3. time : 表示消息變為死信的日期和時間
          4. exchange : 表示交換器,如monitor_exchange_02/05
          5. routing-keys : 表示發送消息時原先使用的路由鍵
          要在實踐中查看這些值,你可使用GetOneDeadLetterQ 類.這可以創建queue_dead_letter隊列并會綁定到exchange_dead_letter 
          更多
          你也可以使用arguments.put("x-dead-letter-routing-key", "myroutingkey")來指定死信路由鍵 ,它將會代替原來的路由鍵.這也就意味著你可以用不同的路由鍵來將不同消息路由到同一個隊列中。相當棒。
          理解交替交換器擴展
          目前,在第1章使用 AMQP中我們已經展示了如何來處理未路由消息(消息發布到了交換器,但未能達到隊列). AMQP讓生產者通過此條件進行應答,并最終決定是否有需要再次將消息分發到不同的目的地。通過這種擴展,我們可在broker中指定一個交替交換器來路由消息,而并不會對生產者造成更多的干預,本食譜的代碼在Chapter02/Recipe05/Java/src/rmqexample .

          準備
          為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
          如何做
          在本食譜中,我們會在Producer.java中聲明交替交換器.
          1. 將交換器的名字(無目的地路由消息)-alternateExchange ,放到可選參數map的"alternate-exchange"中,如下所示:
          Map<String, Object> arguments = new HashMap<String,Object>();
          arguments.put("alternate-exchange", alternateExchange);
          2. 通過傳遞arguments map來聲明交換器來發送消息:
          channel.exchangeDeclare(exchange, "direct", false, false,arguments);
          3. 聲明alternateExchange自身(已經在步驟1中指定了),如下所示:
          channel.exchangeDeclare(alternateExchange, "direct",false);
          4. 聲明標準化持久化隊列,并使用路由鍵alertRK將其綁定到alternateExchange交換器中:
          channel.queueDeclare(missingAlertQueue, true, false, false,null);
          channel.queueBind(missingAlertQueue, alternateExchange,alertRK);
          如何工作
          在這個例子中,我們再次使用了生成統計數據的producer,正如先前的例子一樣.但這次,我們添加了路由鍵來讓producer指定一個重要的級別,名為infoRK或alertRK (在例子中是隨機分配的).如果你運行一個producer以及至少一個consumer,將不會丟失任何消息,并且一切都會正常工作.
          TIP
          Consumers在交換器和隊列的聲明中,必須傳遞相同的可選參數,否則會拋出異常。
          但如果沒有消費者監聽的話,而我們不想丟失報警的話,這就是為什么必須選擇讓producer創建alternateExchange (步驟3)并將其綁定到持久隊列-missingAlertQueue的原因 (步驟4).
          在單獨運行producer的時候,你將看到報警存儲在這里.alternate交換器讓我們在不丟失消息的情況下可以路由消息.你可通過調用rabbitmqctllist_queues或運行CheckAlerts.java來檢查狀態 .
          最后的代碼讓我們可以查看隊列的內容和第一個消息,但不會進行消費。完成這種行為是簡單的,它足可以避免這種事實:RabbitMQ client發送了ack,消息未消費,而只是進行監控。
          現在,如果我們再次運行Consumer.java文件,它會從missingAlertQueue隊列中獲取并消費消息.這不是自動的,我們可以選擇性地從此隊列中獲取消息。
          通過創建第二個消費者實例( missingAlertConsumer ) 并使用相同的代碼來從兩個不同隊列消費消息就可以完成這種效果。如果在處理實時消息時,想要得到不同的行為,那么我們可以創建一個不同的消費者。

          更多
          在這個例子中,步驟3和步驟4是可選的。 當定義交換器時,可為交替交換器指定名稱,對于其是否存在或是否綁定到任何隊列上,并不作強制要求 。如果交替交換器不存在,生產者可通過在丟失消息上設置mandatory標志來得到應答,就如在第1章中處理未路由消息食譜中看到的一樣。
          甚至有可能出現另一種交換器-它自己的備用交換器,備用交換器可以是鏈式的,并且無目的地消息在按序地重試,直到找到一個目的地。
          如果在交換器鏈的末尾仍然沒有找到目的地,消息將會丟失,生產者可通過調設置mandatory 標志和指定一個合適的ReturnListener參數得到通知。
          理解經過驗證的user-ID擴展
          依據AMQP, 當消費者得到消息時,它是不知道發送者信息的。一般說來,消費者不應該關心是誰生產的消息,對于生產者-消費者解藕來說是相當有利的。然而,有時出于認證需要,為了達到此目的,RabbitMQ 提供了有效的user-ID擴展。
          在本例中,我們使用有效user-IDs模擬了訂單。你可在Chapter02/Recipe06/Java/src/rmqexample中找到源碼.
          準備
          為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。
          如何做
          完成下面的步驟,以使用經過驗證的user IDs來模擬訂單:
          1. 像下面一樣聲明或使用持久化隊列:
          channel.queueDeclare(queue, true, false, false, null);
          2.發送消息時,使用BasicProperties對象,在消息頭中指定一個user ID:
          BasicProperties messageProperties = new BasicProperties.Builder()
          .timestamp(new Date())
          .userId("guest");
          channel.basicPublish("",queue, messageProperties,bookOrderMsg.getBytes());
          3. 消費者獲取到訂單后,可像下面這樣打印訂單數據和消息頭:
          System.out.println("The message has been placed by "+properties.getUserId());
          如何工作
          當設置了user-ID時,RabbitMQ 會檢查是否是同一個用戶打開的連接。在這個例子中,用戶是guest ,即RabbitMQ默認用戶.
          通過調用properties.getUserId() 方法,消費者可以訪問發送者user ID。如果你想在步驟2中設置非當前用戶的userId,channel.basicPublish()會拋出異常.
          TIP
          如果不使用user-ID屬性,用戶將是非驗證的,properties.getUserId()方法會返回null.
          也可參考
          要更好的理解這個例子,你應該知道用戶和虛擬機管理,這部分內容將在下個章節中講解。在下個章節中,我們將了解如何通過在應用程序中使用SSL來提高程序的安全性。只使用user-ID屬性,我們可保證用戶已認證,但所有信息都是未加密的,因此很容易暴露。
          隊列失敗時通知消費者

          根據AMQP標準,消費者不會得到隊列刪除的通知。一個正在刪除隊列上等待消息的消費者不會收到任何錯誤信息,并會無限期地等待。然而,RabbitMQ client提供了一種擴展來讓消息收到一個cancel參數-即消費者cancel通知。我們馬上就會看到這個例子,你可在Chapter02/Recipe07/Java/src/rmqexample 中找到代碼.

          準備
          為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣。

          如何做
          為了能讓擴展工作,你只需要執行下面的步驟:
          1.在自定義的消費者中覆蓋handleCancel()方法,可繼承于com.rabbitmq.client.DefaultConsumer (指的是ActualConsumer.java ):
          public void handleCancel(String consumerTag) throws IOException {
          ...
          }
          如何工作
          在我們的例子中,我們選擇實現一個消費者,這個消費者只在生產者是持久化的,且隊列是由生產者創建的情況下才能工作。
          因此,如果隊列是非持久化的,Consumer.java文件會立即錯誤退出. 此行為可以通過調用channel.queueDeclarePassive()來完成 .
          Producer.java類在其啟動時會創建隊列,并在其關閉時調用channel.queueDelete()方法刪除隊列,如果當隊列關閉時,而消費者正在消費隊列,那么RabbitMQ client會調用步驟1中覆蓋的handleCancel()方法來立即通知消費者。
          相對于顯示調用channel.basicCancel() 消費者使用handleCancel()方法可以任意理由來退出。只有在這種情況下,RabbitMQ client library會調用Consumer接口的方法:  handleCancelOK() 
          更多
          消費者cancel通知是client library的擴展,而不是AMQP client libraries的常規方法.一個實例它們的library必須將其聲明為可選屬性(參考 http://www.rabbitmq.com/consumer-cancel. html#capabilities ).
          RabbitMQ client library 支持并聲明了這種特性。
          也可參考
          在集群中,如果一個節點失效了,也會發生同樣的事情:client在隊列刪除后仍然得不到通知,除非它定義了覆蓋了自己的handleCancel()方法。關于這點的更多信息,可參考Chapter 6,開發可伸縮性應用程序。
          理解交換器到交換器擴展
          默認情況下,AMQP支持交換器到隊列,但不支持交換器到交換器綁定。在本例中,我們將展示如何使用RabbitMQ 交換機到交換機擴展.
          在本例中,我們將合并來自兩個不同交換器的消息到第三個交換器中.你可以在Chapter02/Recipe08/Java/src/rmqexample找到源碼.
          準備
          為了使用本食譜,我們需要設置Java開發環境,如第1章節(使用AMQP)介紹章節中說明的一樣,并像廣播消息食譜中來運行生產者以及使用topic交換器來處理消息路由。
          如何做
          完成下面的步驟來使用RabbitMQ 交換器到交換器擴展:
          1. 使用下面的代碼來聲明我們需要追蹤消息的交換器:
          channel.exchangeDeclare(exchange, "topic", false);
          2. 使用exchangeBind()來綁定其它例子中的交換器 :
          channel.exchangeBind(exchange,ref_exchange_c1_8,"#");
          channel.exchangeBind(exchange,ref_exchange_c1_6,"#");
          3. 啟動追蹤消費者:
          TraceConsumer consumer = new TraceConsumer(channel);
          String consumerTag = channel.basicConsume(myqueue, false,consumer);
          如何工作
          在步驟1中,我們創建了一個新的交換器,在步驟2中我們綁定到了下面的交換器:
          1. ref_exchange_c1_6 (廣播消息) 與exchange綁定.
          2. ref_exchange_c1_8 (使用topic來處理消息路由)與exchange綁定 .
          在步驟3中, 消費者可以綁定一個隊列到exchange上以任意地獲取所有消息.
          交換器到交換器擴展的工作方式與交換器到隊列綁定過程類似,你也可以指定一個路由鍵來過濾消息.在步驟2中,我們可以使用#(匹配所有消息)來作為路由鍵。通過改變路由鍵你可以使用制作一個filter!
          在消息中內嵌消息目的地
          在本例子中,我們會展示如何發送單個發布帶路由鍵的的消息.標準AMQP不提供此特性,但幸運的是,RabbitMQ使用消息屬性header提供了此特性. 這種擴展稱為sender-selected分發.
          此擴展的行為類似于電子郵件邏輯.它使用Carbon Copy (CC)和Blind Carbon Copy (BCC).這也是為什么能在 Chapter02/Recipe09/Java/src/rmqexample中找到CC和BCC consumers的理由:
          1. Producer.java
          2. Consumer.java
          3. StatsConsumer.java
          4. CCStatsConsumer.java
          5. BCCStatsConsumer.java
          準備
          To use this recipe, we need to set up the Java development environment as indicated in the Introduction section of Chapter 1, Working with AMQP.
          如何做
          完成下面的步驟來使用單個發布帶路由鍵的的消息:
          1. 使用下面的代碼來創建或聲明交換器:
          channel.exchangeDeclare(exchange, "direct", false);
          2. 在消息的header屬性中指定CC , BCC路由鍵:
          List<String> ccList = new ArrayList<String>();
          ccList.add(backup_alert_routing_key);
          headerMap.put("CC", ccList);
          List<String> ccList = new ArrayList<String>();
          bccList.add(send_alert_routing_key);
          headerMap.put("BCC", bccList);
          BasicProperties messageProperties = new BasicProperties.Builder().headers(headerMap).build();
          channel.basicPublish(exchange, alert_routing_key,messageProperties, statMsg.getBytes());
          3. 使用下面的三個路由鍵來綁定三個隊列three queues to the exchange using the following three routing keys:
          channel.queueBind(myqueue,exchange, alert_routing_key);
          channel.queueBind(myqueueCC_BK,exchange,backup_alert_routing_key);
          channel.queueBind(myqueueBCC_SA,exchange,send_alert_routing_key);
          4. 使用三個消費者來消費消息
          如何工作
          當生產者使用CC和BCC消息屬性來發送消息時,broker會在所有路由鍵的隊列上拷貝消息 。在本例中,stat類會直接使用路由鍵alert_routing_key來向交換器發送消息,同時它也會將消息拷貝到使用CC和BCC參數信息來將消息拷貝到myqueueCC_BK,myqueueBCC_SA隊列中。
          當像e-mails一樣發生時,在分發消息到隊列前,BCC信息會被broker從消息頭中刪除,你可查看所有我們示例消費者的輸出來觀察這種行為。
          更多
          正常情況下,AMQP不會改變消息頭,但BCC擴展是例外。這種擴展可減少發往broker的消息數目。沒有此擴展,生產者只能使用不同的路由鍵來發送多個消息的拷貝。
          posted @ 2016-06-05 19:51 胡小軍 閱讀(1341) | 評論 (0)編輯 收藏

          概述

          RabbitMQ Java client 將com.rabbitmq.client作為其頂層包. 關鍵類和接口有:

          • Channel
          • Connection
          • ConnectionFactory
          • Consumer
          協議操作可通過Channel接口來進行.Connection用于開啟channels,注冊connection生命周期事件處理, 并在不需要時關閉connections.
          Connections是通過ConnectionFactory來初始化的,在ConnectionFactory中,你可以配置不同的connection設置,如:虛擬主機和用戶名等等.

          Connections 和 Channels

          核心API類是Connection和Channel, 它們代表對應AMQP 0-9-1 connection 和 channel. 在使用前,可像下面這樣來導入:

          import com.rabbitmq.client.Connection; 
          import com.rabbitmq.client.Channel;

          連接到broker

          下面的代碼會使用給定的參數連接到AMQP broker:

          ConnectionFactory factory = new ConnectionFactory(); 
          factory.setUsername(userName);
          factory.setPassword(password);
          factory.setVirtualHost(virtualHost);
          factory.setHost(hostName);
          factory.setPort(portNumber);
          Connection conn = factory.newConnection();

          也可以使用URIs 來設置連接參數:

          ConnectionFactory factory = new ConnectionFactory(); 
          factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
          Connection conn = factory.newConnection();


          Connection 接口可用來打開一個channel:

          Channel channel = conn.createChannel(); 

          channel現在可用來發送和接收消息,正如后續章節中描述的一樣.

          要斷開連接,只需要簡單地關閉channel和connection:

          channel.close(); conn.close();

          關閉channel被認為是最佳實踐,但在這里不是嚴格必須的 - 當底層連接關閉的時候,channel也會自動關閉.

          使用 Exchanges 和 Queues

          采用交換器和隊列工作的客戶端應用程序,是AMQP高級別構建模塊。在使用前,必須先聲明.聲明每種類型的對象都需要確保名稱存在,如果有必要須進行創建.

          繼續上面的例子,下面的代碼聲明了一個交換器和一個隊列,然后再將它們進行綁定.

          channel.exchangeDeclare(exchangeName, "direct", true); 
          String queueName = channel.queueDeclare().getQueue();
          channel.queueBind(queueName, exchangeName, routingKey);

          這實際上會聲明下面的對象,它們兩者都可以可選參數來定制. 在這里,它們兩個都沒有特定參數。

          1. 一個類型為direct,且持久化,非自動刪除的交換器
          2. 采用隨機生成名稱,且非持久化,私有的,自動刪除隊列

          上面的函數然后使用給定的路由鍵來綁定隊列和交換器.

          注意,當只有一個客戶端時,這是一種典型聲明隊列的方式:它不需要一個已知的名稱,其它的客戶端也不會使用它(exclusive),并會被自動清除(autodelete).
          如果多個客戶端想共享帶有名稱的隊列,下面的代碼應該更適合:

          channel.exchangeDeclare(exchangeName, "direct", true); 
          channel.queueDeclare(queueName, true, false, false, null);
          channel.queueBind(queueName, exchangeName, routingKey);

          這實際上會聲明:

          1. 一個類型為direct,且持久化,非自動刪除的交換器
          2. 一個已知名稱,且持久化的,非私有,非自動刪除隊列

          注意,Channel API 的方法都是重載的。這些 exchangeDeclarequeueDeclare 和queueBind 都使用的是預設行為.
          這里也有更多參數的長形式,它們允許你按需覆蓋默認行為,允許你完全控制。


          發由消息

          要向交換器中發布消息,可按下面這樣來使用Channel.basicPublish方法:

          byte[] messageBodyBytes = "Hello, world!".getBytes(); 
          channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

          為了更好的控制,你可以使用重載方法來指定mandatory標志,或使用預先設置的消息屬性來發送消息:

          channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

          這會使用分發模式2(持久化)來發送消息, 優先級為1,且content-type 為"text/plain".你可以使用Builder類來構建你自己的消息屬性對象:

          channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);

          下面的例子使用自定義的headers來發布消息:

          Map<String, Object> headers = new HashMap<String, Object>(); 
          headers.put("latitude", 51.5252949);
          headers.put("longitude", -0.0905493);
          channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

          下面的例子使用expiration來發布消息:

          channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

          BasicProperties is an inner class of the autogenerated holder class AMQP.

          Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.

          Channels 和并發考慮(線程安全性)

          Channel 實例不能在多個線程間共享。應用程序必須在每個線程中使用不同的channel實例,而不能將同個channel實例在多個線程間共享。 有些channl上的操作是線程安全的,有些則不是,這會導致傳輸時出現錯誤的幀交叉。
          在多個線程共享channels也會干擾Publisher Confirms.

          通過訂閱來來接收消息

          import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;

          接收消息最高效的方式是用Consumer接口來訂閱。當消息到達時,它們會自動地進行分發,而不需要顯示地請求

          當在調用Consumers的相關方法時, 個別訂閱總是通過它們的consumer tags來確定的, consumer tags可通過客戶端或服務端來生成,參考 the AMQP specification document.
          同一個channel上的消費者必須有不同的consumer tags.

          實現Consumer的最簡單方式是繼承便利類DefaultConsumer.子類可通過在設置訂閱時,將其傳遞給basicConsume調用:

          boolean autoAck = false; 
          channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {
          @Override
          publicvoid handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{
          String routingKey = envelope.getRoutingKey();
          String contentType = properties.getContentType();
          long deliveryTag = envelope.getDeliveryTag();
          // (process the message components here ...)
          channel.basicAck(deliveryTag, false);
          }
          });

          在這里,由于我們指定了autoAck = false,因此消費者有必要應答分發的消息,最便利的方式是在handleDelivery 方法中處理.

          更復雜的消費者可能需要覆蓋更多的方法,實踐中,handleShutdownSignal會在channels和connections關閉時調用,handleConsumeOk 會在其它消費者之前

          調用
          ,傳遞consumer tag(不明白,要研究)。

           

          消費者可實現handleCancelOk 和 handleCancel方法來接收顯示和隱式取消操作通知。

          你可以使用Channel.basicCancel來顯示地取消某個特定的消費者:

          channel.basicCancel(consumerTag);

          passing the consumer tag.

          消費者回調是在單獨線程上處理的,這意味著消費者可以安全地在Connection或Channel, 如queueDeclare, txCommit, basicCancel或basicPublish上調用阻塞方法。

          每個Channel都有其自己的dispatch線程.對于一個消費者一個channel的大部分情況來說,這意味著消費者不會阻擋其它的消費者。如果在一個channel上多個消費者,則必須意識到長時間運行的消費者可能阻擋此channel上其它消費者回調調度.

          獲取單個消息

          要顯示地獲取一個消息,可使用Channel.basicGet.返回值是一個GetResponse實例, 在它之中,header信息(屬性) 和消息body都可以提取:

          boolean autoAck = false; 
          GetResponse response = channel.basicGet(queueName, autoAck);
          if (response == null) {
          // No message retrieved.
          } else {
          AMQP.BasicProperties props = response.getProps();
          byte[] body = response.getBody();
          long deliveryTag = response.getEnvelope().getDeliveryTag(); ...

          因為autoAck = false,你必須調用Channel.basicAck來應答你已經成功地接收了消息:

          channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message }

          處理未路由消息

          如果發布消息時,設置了"mandatory"標志,但如果消息不能路由的話,broker會將其返回到發送客戶端 (通過 AMQP.Basic.Return 命令).

          要收到這種返回的通知, clients可實現ReturnListener接口,并調用Channel.setReturnListener.如果channel沒有配置return listener,那么返回的消息會默默地丟棄。

          channel.setReturnListener(new ReturnListener() {     
              publicvoid handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {
          ...
              }
          });

           return listener將被調用,例如,如果client使用"mandatory"標志向未綁定隊列的direct類型交換器發送了消息.

          關閉協議

          AMQP client 關閉概述

          AMQP 0-9-1 connection和channel 使用相同的方法來管理網絡故障,內部故障,以及顯示本地關閉.

          AMQP 0-9-1 connection  和 channel 有如下的生命周期狀態:

          • open: 準備要使用的對象
          • closing: 對象已顯示收到收到本地關閉通知, 并向任何支持的底層對象發出關閉請求,并等待其關閉程序完成
          • closed: 對象已收到所有底層對象的完成關閉通知,最終將執行關閉操作

          這些對象總是以closed狀態結束的,不管基于什么原因引發的關閉,比如:應用程序請求,內部client library故障, 遠程網絡請求或網絡故障.

          AMQP connection 和channel 對象會持有下面與關閉相關的方法:

          • addShutdownListener(ShutdownListener 監聽器)和removeShutdownListener(ShutdownListener 監聽器),用來管理監聽器,當對象轉為closed狀態時,將會觸發這些監聽器.注意,在已經關閉的對象上添加一個ShutdownListener將會立即觸發監聽器
          • getCloseReason(), 允許同其交互以了解對象關閉的理由
          • isOpen(), 用于測試對象是否處于open狀態
          • close(int closeCode, String closeMessage), 用于顯示通知對象關閉

          可以像這樣來簡單使用監聽器:

          import com.rabbitmq.client.ShutdownSignalException; 
          import com.rabbitmq.client.ShutdownListener;
          connection.addShutdownListener(new ShutdownListener() {
          public void shutdownCompleted(ShutdownSignalException cause) { ... } }
          );

          關閉環境信息

          可通過顯示調用getCloseReason()方法或通過使用ShutdownListener類中的業務方法的cause參數來ShutdownSignalException中獲取關閉原因的有用信息.

          ShutdownSignalException 類提供方法來分析關閉的原因.通過調用isHardError()方法,我們可以知道是connection錯誤還是channel錯誤.getReason()會返回相關cause的相關信息,這些引起cause的方法形式-要么是AMQP.Channel.Close方法,要么是AMQP.Connection.Close (或者是null,如果是library中引發的異常,如網絡通信故障,在這種情況下,可通過getCause()方法來獲取信息).

          public void shutdownCompleted(ShutdownSignalException cause) {   if (cause.isHardError())   {     
          Connection conn = (Connection)cause.getReference();
          if (!cause.isInitiatedByApplication()) {
          Method reason = cause.getReason(); ... } ... }
          else { Channel ch = (Channel)cause.getReference(); ... } }

          原子使用isOpen()方法

          channel和connection對象的isOpen()方法不建議在在生產代碼中使用,因為此方法的返回值依賴于shutdown cause的存在性. 下面的代碼演示了竟爭條件的可能性:

          public void brokenMethod(Channel channel) {     if (channel.isOpen())     {         // The following code depends on the channel being in open state.         // However there is a possibility of the change in the channel state         // between isOpen() and basicQos(1) call         ...         channel.basicQos(1);     } }

          相反,我們應該忽略這種檢查,并簡單地嘗試這種操作.如果代碼執行期間,connection的channel關閉了,那么將拋出ShutdownSignalException,這就表明對象處于一種無效狀態了.當broker意外關閉連接時,我們也應該捕獲由SocketException引發的IOException,或者當broker清理關閉時,捕獲ShutdownSignalException.

          public void validMethod(Channel channel) {     try {         ...         channel.basicQos(1);     } catch (ShutdownSignalException sse) {         // possibly check if channel was closed         // by the time we started action and reasons for         // closing it         ...     } catch (IOException ioe) {         // check why connection was closed         ...     } }

          高級連接選項

          Consumer線程池

          Consumer 線程默認是通過一個新的ExecutorService線程池來自動分配的(參考下面的Receiving).如果需要在newConnection() 方法中更好地控制ExecutorService,可以使用定制的線程池.下面的示例展示了一個比正常分配稍大的線程池:

          ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); 
          Executors 和 ExecutorService 都是java.util.concurrent包中的類.

          當連接關閉時,默認的ExecutorService將會被shutdown(), 但用戶自定義的ExecutorService (如上面所示)將不會被shutdown(). 提供自定義ExecutorService的Clients必須確保最終它能被關閉(通過調用它的shutdown() 方法), 否則池中的線程可能會阻止JVM終止.

          同一個executor service,可在多個連接之間共享,或者連續地在重新連接上重用,但在shutdown()后,則不能再使用.

          使用這種特性時,唯一需要考慮的是:在消費者回調的處理過程中,是否有證據證明有嚴重的瓶頸. 如果沒有消費者執行回調,或很少,默認的配置是綽綽有余. 開銷最初是很小的,分配的全部線程資源也是有界限的,即使偶爾可能出現一陣消費活動.

          使用Host列表

          可以傳遞一個Address數組給newConnection()Address只是 com.rabbitmq.client 包中包含host和port組件的簡單便利類. 例如:

          Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)                                  , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr); 
          將會嘗試連接hostname1:portnumber1, 如果不能連接,則會連接hostname2:portnumber2,然后會返回數組中第一個成功連接(不會拋出IOException)上broker的連接.這完全等價于在factory上重復調用factory.newConnection()方法來設置host和port, 直到有一個成功返回.

          如果提供了ExecutorService(在factory.newConnection(es, addrArr)中使用),那么線程池將與第一個成功連接相關聯.

          心跳超時

          參考Heartbeats guide 來了解更多關于心跳及其在Java client中如何配置的更多信息.

          自定義線程工廠

          像Google App Engine (GAE)這樣的環境會限制線程直接實例化. 在這樣的環境中使用RabbitMQ Java client, 需要配置一個定制的ThreadFactory,即使用合適的方法來實例化線程,如: GAE's ThreadManager. 下面是Google App Engine的相關代碼.

          import com.google.appengine.api.ThreadManager;  ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory()); 

          網絡故障時自動恢復

          Connection恢復

          clients和RabbitMQ節點之間的連接可發生故障. RabbitMQ Java client 支持連接和拓撲(queues, exchanges, bindings, 和consumers)的自動恢復. 大多數應用程序的連接自動恢復過程會遵循下面的步驟:

          1. 重新連接
          2. 恢復連接監聽器
          3. 重新打開通道
          4. 恢復通道監聽器
          5. 恢復通道basic.qos 設置,發布者確認和事務設置
          拓撲恢復包含下面的操作,每個通道都會執行下面的步驟:
          1. 重新聲明交換器(except for predefined ones)
          2. 重新聲明隊列
          3. 恢復所有綁定
          4. 恢復所有消費者
          要啟用自動連接恢復,須使用factory.setAutomaticRecoveryEnabled(true):
          ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();
          如果恢復因異常失敗(如. RabbitMQ節點仍然不可達),它會在固定時間間隔后進行重試(默認是5秒). 時間間隔可以進行配置:
          ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);
          當提供了address列表時,將會在所有address上逐個進行嘗試:
          ConnectionFactory factory = new ConnectionFactory();  Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")}; factory.newConnection(addresses);

          恢復監聽器

          可在可恢復連接和通道上注冊一個或多個恢復監聽器. 當啟用了連接恢復時,ConnectionFactory#newConnection 和 Connection#createChannel 的連接已實現了com.rabbitmq.client.Recoverable,并提供了兩個方法:

          • addRecoveryListener
          • removeRecoveryListener
          注意,在使用這些方法時,你需要將connections和channels強制轉換為Recoverable.

          發布影響

          當連接失敗時,使用Channel.basicPublish方法發送的消息將會丟失. client不會保證在連接恢復后,消息會得到分發.要確保發布的消息到達了RabbitMQ,應用程序必須使用Publisher Confirms 


          拓撲恢復

          拓撲恢復涉及交換器,隊列,綁定以及消費者恢復.默認是啟用的,但也可以禁用:

          ConnectionFactory factory = new ConnectionFactory();  Connection conn = factory.newConnection(); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(false);

          手動應答和自動恢復

          當使用手動應答時,在消息分發與應答之間可能存在網絡連接中斷. 在連接恢復后,RabbitMQ會在所有通道上重設delivery標記. 也就是說,使用舊delivery標記的basic.ackbasic.nack, 以及basic.reject將會引發channel exception. 為了避免發生這種情況, RabbitMQ Java client可以跟蹤,更新,以使它們在恢復期間單調地增長. Channel.basicAckChannel.basicNack, 以及Channel.basicReject 然后可以轉換這些 delivery tags,并且不再發送過期的delivery tags. 使用手動應答和自動恢復的應用程序必須負責處理重新分發.

          未處理異常

          關于connection, channel, recovery, 和consumer lifecycle 的異常將會委派給exception handler,Exception handler是實現了ExceptionHandler接口的任何對象. 默認情況下,將會使用DefaultExceptionHandler實例,它會將異常細節輸出到標準輸出上.

          可使用ConnectionFactory#setExceptionHandler來覆蓋原始handler,它將被用于由此factory創建的所有連接:

          ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);         
          Exception handlers 應該用于異常日志.

          Google App Engine上的RabbitMQ Java Client

          在Google App Engine (GAE) 上使用RabbitMQ Java client,必須使用一個自定義的線程工廠來初始化線程,如使用GAE's ThreadManager. 此外,還需要設置一個較小的心跳間隔(4-5 seconds) 來避免InputStream 上讀超時:

          ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);         

          警告和限制

          為了能使拓撲恢復, RabbitMQ Java client 維持了聲明隊列,交換器,綁定的緩存. 緩存是基于每個連接的.某些RabbitMQ的特性使得客戶端不能觀察到拓撲的變化,如,當隊列因TTL被刪除時. RabbitMQ Java client 會嘗試在下面的情況中使用緩存實體失效:

          • 當隊列被刪除時.
          • 當交換器被刪除時.
          • 當綁定被刪除時.
          • 當消費者在自動刪除隊列上退出時.
          • 當隊列或交換器不再綁定到自動刪除的交換器上時.
          然而, 除了單個連接外,client不能跟蹤這些拓撲變化. 依賴于自動刪除隊列或交換器的應用程序,正如TTL隊列一樣 (注意:不是消息TTL!), 如果使用了自動連接恢復,在知道不再使用或要刪除時,必須明確地刪除這些緩存實體,以凈化 client-side 拓撲cache. 這些可通過Channel#queueDeleteChannel#exchangeDelete,Channel#queueUnbind, and Channel#exchangeUnbind來進行.

          RPC (Request/Reply) 模式

          為了便于編程, Java client API提供了一個使用臨時回復隊列的RpcClient類來提供簡單的RPC-style communication.

          此類不會在RPC參數和返回值上強加任何特定格式. 它只是簡單地提供一種機制來向帶特定路由鍵的交換器發送消息,并在回復隊列上等待響應.

          import com.rabbitmq.client.RpcClient;  
          RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

          (其實現細節為:請求消息使用basic.correlation_id唯一值字段來發送消息,并使用basic.reply_to來設置響應隊列的名稱.)

          一旦你創建此類的實例,你可以使用下面的任意一個方法來發送RPC請求:

          byte[] primitiveCall(byte[] message); 
          String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)

          primitiveCall 方法會將原始byte數組轉換為請求和響應的消息體. stringCall只是一個primitiveCall的簡單包裝,將消息體視為帶有默認字符集編碼的String實例.

          mapCall 變種稍為有些復雜: 它會將原始java值包含在java.util.Map中,并將其編碼為AMQP 0-9-1二進制表示形式,并以同樣的方式來解碼response. (注意:在這里,對一些值對象類型有所限制,具體可參考javadoc.)

          所有的編組/解組便利方法都使用primitiveCall來作為傳輸機制,其它方法只是在它上面的做了一個封裝.

          posted @ 2016-06-04 00:37 胡小軍 閱讀(15639) | 評論 (1)編輯 收藏
          僅列出標題
          共5頁: 上一頁 1 2 3 4 5 下一頁 
          主站蜘蛛池模板: 蒲江县| 陕西省| 来凤县| 始兴县| 定远县| 千阳县| 泸溪县| 衡阳市| 丰台区| 西和县| 长宁县| 临颍县| 阳东县| 东兰县| 德保县| 广东省| 贵阳市| 巍山| 拉孜县| 蛟河市| 舒兰市| 武川县| 静安区| 德安县| 安吉县| 怀集县| 渝中区| 乐业县| 凉城县| 乌鲁木齐县| 景东| 新乡市| 余江县| 洪洞县| 九江市| 千阳县| 文成县| 锦屏县| 新绛县| 龙里县| 泾阳县|