在本章節(jié)中,我們將展現(xiàn)一些RabbitMQ中的可用插件.然后,我們將展示如何使用現(xiàn)實(shí)世界中的例子來(lái)開(kāi)發(fā)新插件.
- 啟用和配置STOMP插件
- 管理RabbitMQ集群
- 監(jiān)控Shovel狀態(tài)
- 開(kāi)發(fā)新插件– 使用ODBC連接關(guān)系數(shù)據(jù)庫(kù)
介紹
多虧了插件設(shè)施,使得RabbitMQ成為了一個(gè)可擴(kuò)展平臺(tái).它提供了許多通用插件,其中一些已經(jīng)在前面的章節(jié)中解釋過(guò)了.
例如, Federation和Shovel 插件已經(jīng)在第7章節(jié)開(kāi)發(fā)高可用應(yīng)用程序中詳細(xì)講解過(guò)了.
在下面的食譜中,我們將展示其它的可用插件,以及如何開(kāi)發(fā)新插件來(lái)擴(kuò)展RabbitMQ的功能.
啟用和配置STOMP插件
從本書(shū)的一開(kāi)始,我們就已經(jīng)看過(guò)了如何使用插件。管理插件本身實(shí)際上也是一個(gè)插件。然而,我們的意圖是顯示插件的進(jìn)一步使用。
在這個(gè)食譜中,我們將看到如何使STOMP插件,以及RabbitMQ提供的進(jìn)一步可能性。
通過(guò)簡(jiǎn)單的(或流)文本定向通訊協(xié)議(STOMP)(http://stomp.github.io/),RabbitMQ增加語(yǔ)言的互操作性。
當(dāng)安裝了插件后,RabbitMQ的broker不僅可以與AMQP協(xié)議操作,也可以與STOMP協(xié)議互操作。
準(zhǔn)備
對(duì)于本食譜,你只需要最新版本的RabbitMQ.
如何做
為了運(yùn)行本食譜,你需要執(zhí)行下面的步驟:
1. 從 root (Linux)或RabbitMQ命令提示窗口(Windows), 使用下面的檢查當(dāng)前插件的狀態(tài):
rabbitmq-plugins list
2. 然后使用下面的命令來(lái)啟用STOMP插件:
rabbitmq-plugins enable rabbitmq_stomp
3. 使用下面的命令來(lái)重啟RabbitMQ:
service rabbitmq-server restart
此時(shí),你可以嘗試使用 Netcat來(lái)提交STOMP消息,nc命令( http://en.wikipedia.org/wiki/Netcat ),來(lái)自于終端會(huì)話(Windows用戶可使用telnet ):
4. 在shell提示中,輸入下面的nc命令:
nc localhost 61613
CONNECT
^@
SEND
Destination:/queue/test
This the 1st stomp message
^@
TIP
注意 ^@ 代表的是CTRL + @的組合, 相當(dāng)于用一個(gè)ASCII碼等于零的性質(zhì)。
5. 打開(kāi)第二個(gè)終端,輸入下面的nc命令:
nc localhost 61613
CONNECT
^@
SUBSCRIBE
destination:/queue/test
^@
如何工作
要列出可用插(步驟1),我們將得到下面的屏幕輸出:

上圖顯示的是所有可用的插件,其中方括號(hào)中為空的表示還未安裝. 標(biāo)記為[E]的插件是明確安裝的. 標(biāo)記為[e]的插件是隱式安裝的,也就是說(shuō),這些插件是作為其它的插件的依賴而進(jìn)行安裝的.
在安裝了STOMP插件之后,我們必須重啟broker以讓修改生效.
一旦插件激活了,你可以使用簡(jiǎn)單的文本STOMP協(xié)議來(lái)向隊(duì)列test發(fā)送消息(步驟4). 這樣你將看到下面的截圖:

在另一個(gè)終端,我們可以啟動(dòng)隊(duì)列test來(lái)消費(fèi)消息(步驟5). 這樣你將看到下面的截圖:

TIP
也可參考
管理RabbitMQ集群
如何做

只要我們創(chuàng)建了隊(duì)列(步驟5),頁(yè)面將在5秒內(nèi)自動(dòng)刷新:

在前面的食譜中,我們已經(jīng)看過(guò)了,如何使用一些現(xiàn)有的插件。現(xiàn)在,我們將了解如何來(lái)開(kāi)發(fā)自定義新插件.
注意,這不是一種典型實(shí)踐. 大部分的操作通常可通過(guò) RabbitMQ/AMQP client API來(lái)執(zhí)行.
然而,在某些必要的情況下,也需要執(zhí)行一些定制化的操作.
這是出于優(yōu)化目的,或需要嚴(yán)格監(jiān)控broker自己的行為.
在本食譜中,我們將展示如何讓RabbitMQ來(lái)消費(fèi)消息,并使用ODBC驅(qū)動(dòng)將其存入關(guān)系型數(shù)據(jù)庫(kù)中.

上圖顯示的是所有可用的插件,其中方括號(hào)中為空的表示還未安裝. 標(biāo)記為[E]的插件是明確安裝的. 標(biāo)記為[e]的插件是隱式安裝的,也就是說(shuō),這些插件是作為其它的插件的依賴而進(jìn)行安裝的.
在安裝了STOMP插件之后,我們必須重啟broker以讓修改生效.
一旦插件激活了,你可以使用簡(jiǎn)單的文本STOMP協(xié)議來(lái)向隊(duì)列test發(fā)送消息(步驟4). 這樣你將看到下面的截圖:

在另一個(gè)終端,我們可以啟動(dòng)隊(duì)列test來(lái)消費(fèi)消息(步驟5). 這樣你將看到下面的截圖:

請(qǐng)注意,從消息到最后的文本沒(méi)有被類(lèi)型化,這是實(shí)際接收到的消息。
更多在這個(gè)食譜中,我們使用了標(biāo)準(zhǔn)的STOMP配置。然而,也可以通過(guò)RabbitMQ配置文件的選項(xiàng)來(lái)定制其端口,SSL使用.
你可在http://www.rabbitmq.com/stomp.html查找到更多詳細(xì)信息.TIP
注意STOMP完全不同于Web-Stomp, Web-Stomp是封裝在WebSockets中的STOMP.參考第5章節(jié)在web應(yīng)用程序中使用STOMP來(lái)開(kāi)發(fā)web監(jiān)控程序食譜的內(nèi)容.這兩種協(xié)議不具有互操作性。
此外,也可以使用STOMP來(lái)發(fā)送其它類(lèi)型的消息。如使用臨時(shí)隊(duì)列來(lái)發(fā)送RPC消息,向交換器發(fā)送消息,或從交換器中接收消息.
也可參考
在本食譜中,我們已經(jīng)展示了如何使用Netcat作為文本客戶端來(lái)讓RabbitMQ與STOMP插件交互. 但,這不是一種典型在客戶端使用STOMP的方式.
這里有許多可用的STOMP客戶端APIs.
你可在http://stomp.github.io/implementations.html中找到可用客戶端包的列表.
這里有許多可用的STOMP客戶端APIs.
你可在http://stomp.github.io/implementations.html中找到可用客戶端包的列表.
管理RabbitMQ集群
當(dāng)部署了一個(gè)大的RabbitMQ集群時(shí),管理插件會(huì)對(duì)集群操作造成一定的開(kāi)銷(xiāo).在本食譜中,我們將展示如何減輕這種開(kāi)銷(xiāo)。
準(zhǔn)備
準(zhǔn)備
為了驗(yàn)證這個(gè)食譜,你需要一個(gè)具有至少兩個(gè)節(jié)點(diǎn)的RabbitMQ集群。如果他們已經(jīng)安裝了管理插件,你需要?jiǎng)h除它。
如何做
1. 使用下面的命令在一個(gè)節(jié)點(diǎn)上安裝管理插件:
rabbitmq-plugins enable rabbitmq_management
2.使用以下命令在所有其他節(jié)點(diǎn)上安裝管理代理插件:
rabbitmq-plugins enable rabbitmq_management_agent
rabbitmq-plugins enable rabbitmq_management
2.使用以下命令在所有其他節(jié)點(diǎn)上安裝管理代理插件:
rabbitmq-plugins enable rabbitmq_management_agent
如何工作
執(zhí)行這些步驟后,您只能從第一個(gè)節(jié)點(diǎn)監(jiān)視整個(gè)群集。其他節(jié)點(diǎn)將通過(guò)代理來(lái)更新第一個(gè)節(jié)點(diǎn)的控制臺(tái)狀態(tài),但您無(wú)法訪問(wèn)他們的端口15672。
通常,你會(huì)在幾個(gè)節(jié)點(diǎn)安裝完整的管理插件,如前端或管理節(jié)點(diǎn)和管理代理插件的其它節(jié)點(diǎn)。
監(jiān)控Shovel狀態(tài)
在第7章節(jié),開(kāi)發(fā)高可用應(yīng)用程序中,我們已經(jīng)看過(guò)了如何使用Shovel插件.在本食譜中,我們將展示如何使用一個(gè)適當(dāng)?shù)牟寮?lái)監(jiān)控其正確行為. 這是rabbitmq_management插件的擴(kuò)展.
準(zhǔn)備
要測(cè)試這個(gè)食譜,我們需要運(yùn)行兩個(gè)RabbitMQ brokers. 在本食譜中,我們將其稱為rabbit@node01和rabbit@node02.
如何做
如何做
我們假設(shè)這兩個(gè)broker已經(jīng)在他們各自的節(jié)點(diǎn)上運(yùn)行了.我們將使用配置文件來(lái)配置節(jié)點(diǎn)node01上的broker,你也可以從Chapter07/Recipe05中拷貝配置文件:
1. 在RabbitMQ配置文件中,一般是/etc/rabbit/rabbitmq.config,插入下面的Shovel配置:
[{rabbitmq_shovel,
[ {shovels, [ {my_books_shovel,
[
{sources, [ {broker, "amqp://node02"}]}
, {destinations, [ {broker, "amqp://"}]}
, {queue, <<"myBooksQueueCopy">>}
, {prefetch_count, 10}
, {reconnect_delay, 5}
]}]}].
2. 輸入下面的命令來(lái)激活插件:
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
3. 為修改的broker重啟broker以使配置生效,命令如下:
service rabbitmq-server restart
4. 使用下面的地址來(lái)訪問(wèn)RabbitMQ 管理界面:
http://node01:15672/
5. 從管理界面中,分別在節(jié)點(diǎn)node01和node02上創(chuàng)建myBooksQueueCopy隊(duì)列.
6. 在管理界面,導(dǎo)航至Admin | Shovel Status.
如何工作
1. 在RabbitMQ配置文件中,一般是/etc/rabbit/rabbitmq.config,插入下面的Shovel配置:
[{rabbitmq_shovel,
[ {shovels, [ {my_books_shovel,
[
{sources, [ {broker, "amqp://node02"}]}
, {destinations, [ {broker, "amqp://"}]}
, {queue, <<"myBooksQueueCopy">>}
, {prefetch_count, 10}
, {reconnect_delay, 5}
]}]}].
2. 輸入下面的命令來(lái)激活插件:
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
3. 為修改的broker重啟broker以使配置生效,命令如下:
service rabbitmq-server restart
4. 使用下面的地址來(lái)訪問(wèn)RabbitMQ 管理界面:
http://node01:15672/
5. 從管理界面中,分別在節(jié)點(diǎn)node01和node02上創(chuàng)建myBooksQueueCopy隊(duì)列.
6. 在管理界面,導(dǎo)航至Admin | Shovel Status.
如何工作
當(dāng)我們激活Shovel插件時(shí),可通過(guò)rabbitmq_shovel_management 插件來(lái)監(jiān)控其行為. 一旦我們激活了插件,由于重定向隊(duì)列不存在,你會(huì)看到錯(cuò)誤信息。顯示如下:

只要我們創(chuàng)建了隊(duì)列(步驟5),頁(yè)面將在5秒內(nèi)自動(dòng)刷新:

在這個(gè)頁(yè)面上, RabbitMQ 允許我們監(jiān)視所有已配置的Shovels.
開(kāi)發(fā)新插件 – 使用ODBC來(lái)操作關(guān)系型數(shù)據(jù)庫(kù)
注意,這不是一種典型實(shí)踐. 大部分的操作通常可通過(guò) RabbitMQ/AMQP client API來(lái)執(zhí)行.
然而,在某些必要的情況下,也需要執(zhí)行一些定制化的操作.
這是出于優(yōu)化目的,或需要嚴(yán)格監(jiān)控broker自己的行為.
在本食譜中,我們將展示如何讓RabbitMQ來(lái)消費(fèi)消息,并使用ODBC驅(qū)動(dòng)將其存入關(guān)系型數(shù)據(jù)庫(kù)中.
準(zhǔn)備
在這個(gè)食譜中,我們需要RabbitMQ 和PostgreSQL. 然而, 由于ODBC驅(qū)動(dòng)的普及,本食譜可以很容易地適應(yīng)到幾乎所有關(guān)系數(shù)據(jù)庫(kù)上。
另外,我們還需要安裝Mercurial (http://mercurial.selenic.com/),它是RabbitMQ的版本系統(tǒng),用來(lái)檢出RabbitMQ的源代碼.
另外,我們還需要安裝Mercurial (http://mercurial.selenic.com/),它是RabbitMQ的版本系統(tǒng),用來(lái)檢出RabbitMQ的源代碼.
雖然這里的環(huán)境是Linux,但只需要稍為修改,本食譜也能工作在Windows上.
如何做
如何做
現(xiàn)在,我們將安裝和配置PostgreSQL.
1. 安裝PostgreSQL和它的ODBC驅(qū)動(dòng). 例如,在基于yum的Linux發(fā)行版上(如RedHat, CentOS, Fedor或其它),用roo執(zhí)行下面的命令.
yum install postgresql-server
yum install postgresql-odbc
postgresql-setup initdb
service postgresql start
2. 從RabbitMQ插件中,使用下面的命令來(lái)創(chuàng)建一個(gè)新用戶(密碼為rmq_plugin_password) 和一個(gè)新數(shù)據(jù)庫(kù):
su postgres createuser --no-superuser --no-createdb --no-createrole --pwpromptrmq_plugin_user
createdb --owner rmq_plugin_userrmqdb
3. ,通過(guò)在文件/var/lib/pgsql/data/pg_hba.conf末尾追加下面的行,以允許新創(chuàng)建的用戶訪問(wèn)給定數(shù)據(jù)庫(kù)(只允許從本地訪問(wèn)):
# TYPE DATABASE USER ADDRESS METHOD
local rmqdb rmq_plugin_user md5
host rmqdb rmq_plugin_user 127.0.0.1/32 md5
4.以root身份重新加載PostgreSQL配置:
service postgresql reload
5. 此時(shí),你應(yīng)該可以從本地連接PostgreSQL了,例如,通過(guò)執(zhí)行下面的命令:
psql --username=rmq_plugin_user --dbname=rmqdb
6. 完成ODBC驅(qū)動(dòng)的配置來(lái)訪問(wèn)數(shù)據(jù)庫(kù),要做到這一點(diǎn),必須在文件/etc/odbc.ini中插入下面的行:
[rmqDSN]
Driver = PostgreSQL
Description = PostgreSQL data source for RabbitMQ
Servername = localhost
Port = 5432
Protocol = 8.4
Database = rmqdb
7.現(xiàn)在我們可以使用isql命令來(lái)測(cè)試ODBC連接,同時(shí)我們也可以直接使用Erlang來(lái)測(cè)試. erl命令如下:
odbc:start()
odbc:connect("DSN=rmqDSN;UID=rmq_plugin_user;PWD=rmq_plugin_password", [])
到目前為止,我們已經(jīng)準(zhǔn)備好了開(kāi)發(fā)插件所需要的東西.現(xiàn)在我們來(lái)了解如何啟用metronome 插件.它存在于官方RabbitMQ文檔(http://www.rabbitmq.com/plugin-development.html:
8. 使用下面的命令來(lái)檢出RabbitMQ開(kāi)發(fā)源代碼樹(shù):
cd $HOME
hg clone http://hg.rabbitmq.com/rabbitmq-public-umbrella
cd rabbitmq-public-umbrella
make co
9. 使用下面的命令來(lái)編譯metronome插件:
cd rabbitmq-metronome
make
10. 在開(kāi)發(fā)的broker中安裝插件及其依賴.
cd ../rabbitmq-server
mkdir plugins
cd plugins
ln –s ../../rabbitmq-metronome
ln–s../../rabbitmq-erlang-client
11.為了避免覆蓋產(chǎn)品環(huán)境安裝,即使是我們不用root用戶,我們也需要停止產(chǎn)品服務(wù)器,設(shè)置一些環(huán)境變量,并創(chuàng)建一些相應(yīng)的目錄以讓RabbitMQ以標(biāo)準(zhǔn)用戶啟動(dòng):
export RABBITMQ_LOG_BASE=$HOME/rmq/log
export RABBITMQ_MNESIA_BASE=$HOME/rmq/mnesia
export RABBITMQ_ENABLED_PLUGINS_FILE=$HOME/rmq/enabled_plugins
mkdir –p $RABBITMQ_LOG_BASE $RABBITMQ_MNESIA_BASE
12. 現(xiàn)在我們可以啟用開(kāi)發(fā)插件了.
cd $HOME/rabbitmq-public-umbrella/rabbitmq-server
scripts/rabbitmq-plugins list
scripts/rabbitmq-plugins enable rabbitmq_metronome
13. 最后,啟動(dòng)開(kāi)發(fā)服務(wù)器:
scripts/rabbitmq-server
14. 此刻,我們已經(jīng)準(zhǔn)備好了如何開(kāi)發(fā)一個(gè)新插件, 它使得通過(guò)ODBC的方式,將RabbitMQ與第三方數(shù)據(jù)庫(kù)連接到了一起.你可在Chapter09/Recipe04中找到本食譜的源碼. 你可從Chapter09/Recipe04/
rabbitmq-odbctap目錄拷貝源碼進(jìn)RabbitMQ開(kāi)發(fā)樹(shù)-rabbitmq-publicumbrella(在步驟8中檢出來(lái)的). 通過(guò)這種方式,你可以獲得類(lèi)似于下面截圖的源碼樹(shù):
1. 安裝PostgreSQL和它的ODBC驅(qū)動(dòng). 例如,在基于yum的Linux發(fā)行版上(如RedHat, CentOS, Fedor或其它),用roo執(zhí)行下面的命令.
yum install postgresql-server
yum install postgresql-odbc
postgresql-setup initdb
service postgresql start
2. 從RabbitMQ插件中,使用下面的命令來(lái)創(chuàng)建一個(gè)新用戶(密碼為rmq_plugin_password) 和一個(gè)新數(shù)據(jù)庫(kù):
su postgres createuser --no-superuser --no-createdb --no-createrole --pwpromptrmq_plugin_user
createdb --owner rmq_plugin_userrmqdb
3. ,通過(guò)在文件/var/lib/pgsql/data/pg_hba.conf末尾追加下面的行,以允許新創(chuàng)建的用戶訪問(wèn)給定數(shù)據(jù)庫(kù)(只允許從本地訪問(wèn)):
# TYPE DATABASE USER ADDRESS METHOD
local rmqdb rmq_plugin_user md5
host rmqdb rmq_plugin_user 127.0.0.1/32 md5
4.以root身份重新加載PostgreSQL配置:
service postgresql reload
5. 此時(shí),你應(yīng)該可以從本地連接PostgreSQL了,例如,通過(guò)執(zhí)行下面的命令:
psql --username=rmq_plugin_user --dbname=rmqdb
6. 完成ODBC驅(qū)動(dòng)的配置來(lái)訪問(wèn)數(shù)據(jù)庫(kù),要做到這一點(diǎn),必須在文件/etc/odbc.ini中插入下面的行:
[rmqDSN]
Driver = PostgreSQL
Description = PostgreSQL data source for RabbitMQ
Servername = localhost
Port = 5432
Protocol = 8.4
Database = rmqdb
7.現(xiàn)在我們可以使用isql命令來(lái)測(cè)試ODBC連接,同時(shí)我們也可以直接使用Erlang來(lái)測(cè)試. erl命令如下:
odbc:start()
odbc:connect("DSN=rmqDSN;UID=rmq_plugin_user;PWD=rmq_plugin_password", [])
到目前為止,我們已經(jīng)準(zhǔn)備好了開(kāi)發(fā)插件所需要的東西.現(xiàn)在我們來(lái)了解如何啟用metronome 插件.它存在于官方RabbitMQ文檔(http://www.rabbitmq.com/plugin-development.html:
8. 使用下面的命令來(lái)檢出RabbitMQ開(kāi)發(fā)源代碼樹(shù):
cd $HOME
hg clone http://hg.rabbitmq.com/rabbitmq-public-umbrella
cd rabbitmq-public-umbrella
make co
9. 使用下面的命令來(lái)編譯metronome插件:
cd rabbitmq-metronome
make
10. 在開(kāi)發(fā)的broker中安裝插件及其依賴.
cd ../rabbitmq-server
mkdir plugins
cd plugins
ln –s ../../rabbitmq-metronome
ln–s../../rabbitmq-erlang-client
11.為了避免覆蓋產(chǎn)品環(huán)境安裝,即使是我們不用root用戶,我們也需要停止產(chǎn)品服務(wù)器,設(shè)置一些環(huán)境變量,并創(chuàng)建一些相應(yīng)的目錄以讓RabbitMQ以標(biāo)準(zhǔn)用戶啟動(dòng):
export RABBITMQ_LOG_BASE=$HOME/rmq/log
export RABBITMQ_MNESIA_BASE=$HOME/rmq/mnesia
export RABBITMQ_ENABLED_PLUGINS_FILE=$HOME/rmq/enabled_plugins
mkdir –p $RABBITMQ_LOG_BASE $RABBITMQ_MNESIA_BASE
12. 現(xiàn)在我們可以啟用開(kāi)發(fā)插件了.
cd $HOME/rabbitmq-public-umbrella/rabbitmq-server
scripts/rabbitmq-plugins list
scripts/rabbitmq-plugins enable rabbitmq_metronome
13. 最后,啟動(dòng)開(kāi)發(fā)服務(wù)器:
scripts/rabbitmq-server
14. 此刻,我們已經(jīng)準(zhǔn)備好了如何開(kāi)發(fā)一個(gè)新插件, 它使得通過(guò)ODBC的方式,將RabbitMQ與第三方數(shù)據(jù)庫(kù)連接到了一起.你可在Chapter09/Recipe04中找到本食譜的源碼. 你可從Chapter09/Recipe04/
rabbitmq-odbctap目錄拷貝源碼進(jìn)RabbitMQ開(kāi)發(fā)樹(shù)-rabbitmq-publicumbrella(在步驟8中檢出來(lái)的). 通過(guò)這種方式,你可以獲得類(lèi)似于下面截圖的源碼樹(shù):

15. 要開(kāi)始開(kāi)發(fā)一個(gè)新插件,需要從現(xiàn)有插件rabbitmq-metronome 或 rabbitmq-shovel中拷貝和重命名文件. The makefile will be left unmodified.
16. 修改package.mk文件以反映所需的依賴與測(cè)試模塊.
17. 編輯rabbitmq_odbctap.app.src. 此文件包含Erlang項(xiàng)目所需的資源,在我們的例子中,一般和慣例都必須包含下面的配置:
{application, rabbitmq_odbctap,
[{description, "Embedded Rabbit ODBC tap"},
{vsn, "0.0.0"},
{modules, []},
{registered, []},
{mod, {rabbit_odbctap, []}},
{env, [{dsn,"DSN"},
{user,"guest"},
{password, "guest"},
{queue, "tapped_queue"}
]},
{applications, [kernel, stdlib, rabbit, amqp_client]}]}.
18. 定制rabbit_odbctap.erl, 模塊的入口點(diǎn),為了讓它啟動(dòng)或停止,需要配置插件自身.
19. 定制rabbit_odbctap_sup.erl, 對(duì)于Erlang管理源節(jié)點(diǎn),通過(guò)定義回調(diào)來(lái)定義主管行為的回調(diào).
20. 在rabbit_odbctap_worker.erl中實(shí)現(xiàn)插件邏輯.這是實(shí)際模塊的入口點(diǎn),在我們這里,它會(huì)連接RabbitMQ broker,綁定隊(duì)列,并通過(guò)ODBC來(lái)消費(fèi)指定數(shù)據(jù)庫(kù)包含的消息.
21.多個(gè)模塊共用的數(shù)據(jù)定義(也就是Erlang記錄)可放置在include目錄中.如,在rabbit_odbctap.hrl中,你可以找到Erlang記錄odbctap_config的定義,此定義是被rabbit_odbctap.erl、 rabbit_odbctap_worker.erl所共用的.
22. 準(zhǔn)備一個(gè)或多個(gè)測(cè)試模塊.在我們的例子中,在rabbit_odbctap_tests.erl中只能找到一個(gè)骨架.
23. 編譯插件并執(zhí)行自動(dòng)化測(cè)試.
cd $HOME/rabbitmq-public-umbrella/rabbimq-odbctap
make
make test
24. 在開(kāi)發(fā)服務(wù)器中安裝插件.
cd ../rabbitmq-server/plugins
ln –s ../../rabbitmq-odbctap .
25. 在設(shè)置步驟11中所示的環(huán)境后,我們可以啟用插件和(重新)啟動(dòng)服務(wù)器了.
cd $HOME/rabbitmq-public-umbrella/rabbitmq-server
scripts/rabbitmq-plugins enable rabbitmq_odbctap
scripts/rabbitmq-server
如何工作
16. 修改package.mk文件以反映所需的依賴與測(cè)試模塊.
17. 編輯rabbitmq_odbctap.app.src. 此文件包含Erlang項(xiàng)目所需的資源,在我們的例子中,一般和慣例都必須包含下面的配置:
{application, rabbitmq_odbctap,
[{description, "Embedded Rabbit ODBC tap"},
{vsn, "0.0.0"},
{modules, []},
{registered, []},
{mod, {rabbit_odbctap, []}},
{env, [{dsn,"DSN"},
{user,"guest"},
{password, "guest"},
{queue, "tapped_queue"}
]},
{applications, [kernel, stdlib, rabbit, amqp_client]}]}.
18. 定制rabbit_odbctap.erl, 模塊的入口點(diǎn),為了讓它啟動(dòng)或停止,需要配置插件自身.
19. 定制rabbit_odbctap_sup.erl, 對(duì)于Erlang管理源節(jié)點(diǎn),通過(guò)定義回調(diào)來(lái)定義主管行為的回調(diào).
20. 在rabbit_odbctap_worker.erl中實(shí)現(xiàn)插件邏輯.這是實(shí)際模塊的入口點(diǎn),在我們這里,它會(huì)連接RabbitMQ broker,綁定隊(duì)列,并通過(guò)ODBC來(lái)消費(fèi)指定數(shù)據(jù)庫(kù)包含的消息.
21.多個(gè)模塊共用的數(shù)據(jù)定義(也就是Erlang記錄)可放置在include目錄中.如,在rabbit_odbctap.hrl中,你可以找到Erlang記錄odbctap_config的定義,此定義是被rabbit_odbctap.erl、 rabbit_odbctap_worker.erl所共用的.
22. 準(zhǔn)備一個(gè)或多個(gè)測(cè)試模塊.在我們的例子中,在rabbit_odbctap_tests.erl中只能找到一個(gè)骨架.
23. 編譯插件并執(zhí)行自動(dòng)化測(cè)試.
cd $HOME/rabbitmq-public-umbrella/rabbimq-odbctap
make
make test
24. 在開(kāi)發(fā)服務(wù)器中安裝插件.
cd ../rabbitmq-server/plugins
ln –s ../../rabbitmq-odbctap .
25. 在設(shè)置步驟11中所示的環(huán)境后,我們可以啟用插件和(重新)啟動(dòng)服務(wù)器了.
cd $HOME/rabbitmq-public-umbrella/rabbitmq-server
scripts/rabbitmq-plugins enable rabbitmq_odbctap
scripts/rabbitmq-server
如何工作
為了實(shí)戰(zhàn)本食譜,我們一開(kāi)始就配置了一個(gè)PostgreSQL數(shù)據(jù)庫(kù),以及它相應(yīng)的ODBC驅(qū)動(dòng).這部分內(nèi)容已經(jīng)在食譜的前半部分中展示過(guò)了.
然后,我們展示了如何啟用樣例模板插件rabbitmq_metronome(RabbitMQ自身在開(kāi)發(fā)樹(shù)中提供的).如果一切正常, 步驟13啟動(dòng)的開(kāi)發(fā)broker將以會(huì)以我們標(biāo)準(zhǔn)用戶運(yùn)行,并在日志文件$HOME/rmq/log/rabbit@localhost.log中,你可能會(huì)看到下面的行:
=INFO REPORT==== 21-Oct-2013::03:28:50 ===
Server startup complete; 2 plugins started.
* amqp_client
* rabbitmq_metronome
這是開(kāi)發(fā)新插件的起點(diǎn),即使用Erlang ODBC client來(lái)監(jiān)控已配置PostgreSQL數(shù)據(jù)庫(kù)中的某些給定隊(duì)列.這部分內(nèi)容將在食譜的第三部分展示。
然后,我們展示了如何啟用樣例模板插件rabbitmq_metronome(RabbitMQ自身在開(kāi)發(fā)樹(shù)中提供的).如果一切正常, 步驟13啟動(dòng)的開(kāi)發(fā)broker將以會(huì)以我們標(biāo)準(zhǔn)用戶運(yùn)行,并在日志文件$HOME/rmq/log/rabbit@localhost.log中,你可能會(huì)看到下面的行:
=INFO REPORT==== 21-Oct-2013::03:28:50 ===
Server startup complete; 2 plugins started.
* amqp_client
* rabbitmq_metronome
這是開(kāi)發(fā)新插件的起點(diǎn),即使用Erlang ODBC client來(lái)監(jiān)控已配置PostgreSQL數(shù)據(jù)庫(kù)中的某些給定隊(duì)列.這部分內(nèi)容將在食譜的第三部分展示。
要實(shí)現(xiàn)我們的插件,我們選擇遵循一種公共的方案,即使用模塊來(lái)實(shí)現(xiàn)Erlang行為.實(shí)際上, Erlang 鼓勵(lì)使用通過(guò)supervisor來(lái)解藕應(yīng)用程序(這里是我們新開(kāi)發(fā)的插件)和運(yùn)行者(即RabbitMQ自身) 的松耦合架構(gòu)。
關(guān)于這方面的內(nèi)容,可參考http://www.erlang.org/doc/man/supervisor.html and http://www.erlang.org/doc/design_principles/sup_princ.html.
supervisor唯一需要定制的是,可從startup模塊中獲取配置信息,并將其傳遞給worker模塊. 為了達(dá)到目的,startup 模塊(rabbit_odbctap.erl)包含了讀取配置信息的代碼。
read_config() ->
{ok, Dsn} = application:get_env(dsn),
{ok, User} = application:get_env(user),
{ok, Password} = application:get_env(password),
{ok, Queue} = application:get_env(queue),
Config = #odbctap_config{
dsn = Dsn,
user = User,
password = Password,
queue = Queue},
Config.
調(diào)用application:get_env/1可讓模塊自動(dòng)訪問(wèn)如下設(shè)置的定義:
1.在Erlang資源文件rabbitmq_odbctap.app.src中,在編譯時(shí),設(shè)置了env鍵:
...
{env, [{dsn,"DSN"},
{user,"guest"},
{password, "guest"},
{queue, "tapped_queue"}
]},
...
在rabbitmq.config文件中是以運(yùn)行時(shí)讀取的,其配置遵循典型的RabbitMQ (即Erlang)格式 , 且在下面的代碼中指定了rabbitmq_odbctap鍵:
[{rabbitmq_odbctap,
[{dsn, "rmqDSN"},
{user,"rmq_plugin_user"},
{password, "rmq_plugin_password"},
{queue, "tapped_queue"}]}].
在本書(shū)的歸檔目錄中,你能找到完整的文件.
rabbit_odbctap_worker.erl worker 模塊實(shí)現(xiàn)gen_server行為.
你可在http://www.erlang.org/doc/design_principles/gen_server_concepts.html找到更多詳細(xì)信息.
在我們的模塊中,一旦模塊啟動(dòng)了就會(huì)調(diào)用 init/2,然后它會(huì)通過(guò)ODBC客戶端 Erlang API http://www.erlang.org/
supervisor唯一需要定制的是,可從startup模塊中獲取配置信息,并將其傳遞給worker模塊. 為了達(dá)到目的,startup 模塊(rabbit_odbctap.erl)包含了讀取配置信息的代碼。
read_config() ->
{ok, Dsn} = application:get_env(dsn),
{ok, User} = application:get_env(user),
{ok, Password} = application:get_env(password),
{ok, Queue} = application:get_env(queue),
Config = #odbctap_config{
dsn = Dsn,
user = User,
password = Password,
queue = Queue},
Config.
調(diào)用application:get_env/1可讓模塊自動(dòng)訪問(wèn)如下設(shè)置的定義:
1.在Erlang資源文件rabbitmq_odbctap.app.src中,在編譯時(shí),設(shè)置了env鍵:
...
{env, [{dsn,"DSN"},
{user,"guest"},
{password, "guest"},
{queue, "tapped_queue"}
]},
...
在rabbitmq.config文件中是以運(yùn)行時(shí)讀取的,其配置遵循典型的RabbitMQ (即Erlang)格式 , 且在下面的代碼中指定了rabbitmq_odbctap鍵:
[{rabbitmq_odbctap,
[{dsn, "rmqDSN"},
{user,"rmq_plugin_user"},
{password, "rmq_plugin_password"},
{queue, "tapped_queue"}]}].
在本書(shū)的歸檔目錄中,你能找到完整的文件.
rabbit_odbctap_worker.erl worker 模塊實(shí)現(xiàn)gen_server行為.
你可在http://www.erlang.org/doc/design_principles/gen_server_concepts.html找到更多詳細(xì)信息.
在我們的模塊中,一旦模塊啟動(dòng)了就會(huì)調(diào)用 init/2,然后它會(huì)通過(guò)ODBC客戶端 Erlang API http://www.erlang.org/
doc/apps/odbc/來(lái)連接ODBC連接器,并觸發(fā)RabbitMQ消費(fèi)者. 重要的一點(diǎn)是,在這種情況下,內(nèi)嵌RabbitMQ client是連接到本地broker的,即嵌入到同一個(gè)Erlang虛擬機(jī)中,并會(huì)執(zhí)行下面的調(diào)用:
{ok, Connection} = amqp_connection:start(#amqp_params_direct{})
通過(guò)這種方式,插件可使用內(nèi)部Erlang連接和更為高效的消息協(xié)議, 因?yàn)橥耆苊饬薃MQP通信協(xié)議的編組.
{ok, Connection} = amqp_connection:start(#amqp_params_direct{})
通過(guò)這種方式,插件可使用內(nèi)部Erlang連接和更為高效的消息協(xié)議, 因?yàn)橥耆苊饬薃MQP通信協(xié)議的編組.
TIP
為了能在RabbitMQ日志文件中記錄一些信息,你可以使用與rabbit_log:info/2相似的調(diào)用,正如init/2中定義的一樣.
通過(guò)使用gen_server行為, broker 的消息不是從接收塊中消費(fèi)的(http://www.rabbitmq.com/erlang-client-user-guide.html, 參考訂閱隊(duì)列章節(jié)), 而是通過(guò)handle_info/2回調(diào)實(shí)施的:
handle_info({#'basic.deliver'{delivery_tag = Tag},
#amqp_msg{payload = Payload}},State = #state{channel=Channel,odbcHandle = Ohandle}) ->
Query = "INSERT INTO rmqmessages VALUES ('now','"++binary_to_list(Payload) ++"')",
{updated, _} = odbc:sql_query(Ohandle, Query),
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
{noreply, State};
這是本食譜的核心.當(dāng)RabbitMQ自身初始化的時(shí)候,且數(shù)據(jù)庫(kù)處于打開(kāi)狀態(tài)時(shí),每個(gè)消費(fèi)消息都將得到存儲(chǔ)。
通過(guò)使用gen_server行為, broker 的消息不是從接收塊中消費(fèi)的(http://www.rabbitmq.com/erlang-client-user-guide.html, 參考訂閱隊(duì)列章節(jié)), 而是通過(guò)handle_info/2回調(diào)實(shí)施的:
handle_info({#'basic.deliver'{delivery_tag = Tag},
#amqp_msg{payload = Payload}},State = #state{channel=Channel,odbcHandle = Ohandle}) ->
Query = "INSERT INTO rmqmessages VALUES ('now','"++binary_to_list(Payload) ++"')",
{updated, _} = odbc:sql_query(Ohandle, Query),
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
{noreply, State};
這是本食譜的核心.當(dāng)RabbitMQ自身初始化的時(shí)候,且數(shù)據(jù)庫(kù)處于打開(kāi)狀態(tài)時(shí),每個(gè)消費(fèi)消息都將得到存儲(chǔ)。
更多
非常重要的一點(diǎn)是開(kāi)發(fā)插件是最后的選擇,最好是通過(guò)使用AMQP client library來(lái)開(kāi)發(fā)一個(gè)外部應(yīng)用程序.
TIP
一個(gè)插件可能連累RabbitMQ的穩(wěn)定性.
因此,只有當(dāng)極端的整合或?qū)π阅苡行枰獣r(shí),才可以評(píng)估是否開(kāi)發(fā)一個(gè)插件。