在本章中,將包含下面的內(nèi)容:
- 連接中間件
- 生產(chǎn)消息
- 消費(fèi)消息
- 使用JSON序列化消息
- 使用RPC消息
- 廣播消息
- 使用direct交換器來處理消息路由
- 使用topic交換器來處理消息路由
- 保證消息處理
- 分發(fā)消息到多個(gè)消費(fèi)者
- 使用消息屬性
- 事務(wù)消息
- 處理未路由消息
- 安裝Java JDK 1.6+
- 安裝Java RabbitMQ client library
- 正確地配置CLASSPATH 以及你喜歡的開發(fā)環(huán)境(Eclipse,NetBeans, 等等)
- 在某臺(tái)機(jī)器上安裝RabbitMQ server (也可以是同一個(gè)本地機(jī)器)
每個(gè)使用AMQP的應(yīng)用程序都必須建立一個(gè)與AMQP中間件的連接.默認(rèn)情況下,RabbitMQ (以及任何其它1.0版本之前的AMQP中間件) 通過運(yùn)行于5672端口之上且相當(dāng)可靠傳輸協(xié)議-TCP來工作的, 即IANA分配的端口.
要?jiǎng)?chuàng)建一個(gè)連接RabbitMQ中間件的Java客戶端,你必須執(zhí)行下面的步驟:
1. 從Java RabbitMQ client library中必須的類:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
2. 創(chuàng)建客戶端ConnectionFactory的實(shí)例:
ConnectionFactory factory = new ConnectionFactory();
3. 設(shè)置ConnectionFactory 選項(xiàng):
factory.setHost(rabbitMQhostname);
4. 連接RabbitMQ broker:
Connection connection = factory.newConnection();
5. 從剛創(chuàng)建的連接中創(chuàng)建一個(gè)通道:
Channel channel = connection.createChannel();
6. 一旦在RabbitMQ上完成了工作,就需要釋放通道和連接:
channel.close();
connection.close();
How it works…
使用Java client API, 應(yīng)用程序必須創(chuàng)建一個(gè)ConnectionFactory實(shí)例,并且使用setHost()方法來設(shè)置運(yùn)行RabbitMQ的主機(jī).在導(dǎo)入相關(guān)類后(第1步),我們實(shí)例化了工廠對(duì)象(第2步).在這個(gè)例子中,我們只是用可選的命令行參數(shù)來設(shè)置主機(jī)名稱,但是,在后面的章節(jié)中,你可以找到更多關(guān)于連接選項(xiàng)的信息.第4步,實(shí)際上我們已經(jīng)創(chuàng)建了連接到RabbitMQ中間件的連接.
1. 從Java RabbitMQ client library中必須的類:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
2. 創(chuàng)建客戶端ConnectionFactory的實(shí)例:
ConnectionFactory factory = new ConnectionFactory();
3. 設(shè)置ConnectionFactory 選項(xiàng):
factory.setHost(rabbitMQhostname);
4. 連接RabbitMQ broker:
Connection connection = factory.newConnection();
5. 從剛創(chuàng)建的連接中創(chuàng)建一個(gè)通道:
Channel channel = connection.createChannel();
6. 一旦在RabbitMQ上完成了工作,就需要釋放通道和連接:
channel.close();
connection.close();
How it works…
使用Java client API, 應(yīng)用程序必須創(chuàng)建一個(gè)ConnectionFactory實(shí)例,并且使用setHost()方法來設(shè)置運(yùn)行RabbitMQ的主機(jī).在導(dǎo)入相關(guān)類后(第1步),我們實(shí)例化了工廠對(duì)象(第2步).在這個(gè)例子中,我們只是用可選的命令行參數(shù)來設(shè)置主機(jī)名稱,但是,在后面的章節(jié)中,你可以找到更多關(guān)于連接選項(xiàng)的信息.第4步,實(shí)際上我們已經(jīng)創(chuàng)建了連接到RabbitMQ中間件的連接.
在這里,我們使用了默認(rèn)的連接參數(shù),用戶:guest,密碼:guest,以及虛擬主機(jī):/,后面我們會(huì)討論這些參數(shù).
但現(xiàn)在我還沒有準(zhǔn)備好與中間件通信,我們必須設(shè)置一個(gè)通信的channel(第5步).這是AMQP中的一個(gè)高級(jí)概念,使用此抽象,可以讓多個(gè)不同的消息會(huì)話使用同一個(gè)邏輯connection.
實(shí)際上, Java client library 中的所有通信操作都是通過channel實(shí)例的方法來執(zhí)行的.如果你正在開發(fā)多線程應(yīng)用程序,強(qiáng)烈建議在每個(gè)線程中使用不同的channel.如果多個(gè)線程使用同一個(gè)channel,在channel方法調(diào)用中會(huì)順序執(zhí)行,從而導(dǎo)致性能損失.最佳實(shí)踐是打開一個(gè)connection,并將其在多個(gè)不同線程之間分享.每個(gè)線程負(fù)責(zé)其獨(dú)立channel的創(chuàng)建,使用和銷毀.
實(shí)際上, Java client library 中的所有通信操作都是通過channel實(shí)例的方法來執(zhí)行的.如果你正在開發(fā)多線程應(yīng)用程序,強(qiáng)烈建議在每個(gè)線程中使用不同的channel.如果多個(gè)線程使用同一個(gè)channel,在channel方法調(diào)用中會(huì)順序執(zhí)行,從而導(dǎo)致性能損失.最佳實(shí)踐是打開一個(gè)connection,并將其在多個(gè)不同線程之間分享.每個(gè)線程負(fù)責(zé)其獨(dú)立channel的創(chuàng)建,使用和銷毀.
可對(duì)任何RabbitMQ connection指定多個(gè)不同的可選屬性.你可以在在線文檔(http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc)上找到它們. 除了AMQP 虛擬主機(jī)外,其它選項(xiàng)都不需要說明.
虛擬主機(jī)是一個(gè)管理容器,在單個(gè)RabbitMQ實(shí)例中,允許配置多個(gè)邏輯上獨(dú)立的中間件主機(jī), 以讓多個(gè)不同獨(dú)立應(yīng)用程序能夠共享同一個(gè)RabbitMQ server. 每個(gè)虛擬主機(jī)都能獨(dú)立地配置權(quán)限,交換器,隊(duì)列,并在邏輯上獨(dú)立的環(huán)境中工作.
也可以連接字符串(連接URI)來指定連接選項(xiàng),即使用factory.setUri() 方法:
ConnectionFactory factory = new ConnectionFactory();
String uri="amqp://user:pass@hostname:port/vhost";
factory.setUri(uri);
URI必須與 RFC3 (http://www.ietf.org/rfc/rfc3986.txt)的語(yǔ)法規(guī)范保持一致.
ConnectionFactory factory = new ConnectionFactory();
String uri="amqp://user:pass@hostname:port/vhost";
factory.setUri(uri);
URI必須與 RFC3 (http://www.ietf.org/rfc/rfc3986.txt)的語(yǔ)法規(guī)范保持一致.
生產(chǎn)消息
在本配方中, 我們將學(xué)習(xí)了如何將消息發(fā)送到AMQP隊(duì)列. 我們將介紹AMQP消息的構(gòu)建塊:消息,隊(duì)列,以及交換器.你可以在Chapter01/Recipe02/src/rmqexample中找到代碼.
w to do it…
在連接到中間件后, 像前面配方中看到的一樣,你可以按下面的步驟來來發(fā)送消息:
1. 聲明隊(duì)列, 在 com.rabbitmq.client.Channel上調(diào)用queueDeclare()方法:
String myQueue = "myFirstQueue";
channel.queueDeclare(myQueue, true, false, false, null); //創(chuàng)建一個(gè)名為myFirstQueue,持久化的,非限制的,不自動(dòng)刪除的隊(duì)列,
2. 發(fā)送第一個(gè)消息到RabbitMQ broker:
String message = "My message to myFirstQueue";
channel.basicPublish("",myQueue, null, message.getBytes());
3. 使用不同的選項(xiàng)發(fā)送第二個(gè)消息:
channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
注意:隊(duì)列名稱是大小寫敏感的: MYFIRSTQUEUE與myFirstQueue是不同的.
如何工作
w to do it…
在連接到中間件后, 像前面配方中看到的一樣,你可以按下面的步驟來來發(fā)送消息:
1. 聲明隊(duì)列, 在 com.rabbitmq.client.Channel上調(diào)用queueDeclare()方法:
String myQueue = "myFirstQueue";
channel.queueDeclare(myQueue, true, false, false, null); //創(chuàng)建一個(gè)名為myFirstQueue,持久化的,非限制的,不自動(dòng)刪除的隊(duì)列,
2. 發(fā)送第一個(gè)消息到RabbitMQ broker:
String message = "My message to myFirstQueue";
channel.basicPublish("",myQueue, null, message.getBytes());
3. 使用不同的選項(xiàng)發(fā)送第二個(gè)消息:
channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
注意:隊(duì)列名稱是大小寫敏感的: MYFIRSTQUEUE與myFirstQueue是不同的.
如何工作
在第一個(gè)基本例子中,我們能夠發(fā)送一個(gè)消息到RabbitMQ.在信道建立后,第一個(gè)步驟可以確保目標(biāo)隊(duì)列存在,這項(xiàng)任務(wù)是通過調(diào)用queueDeclare()方法來聲明隊(duì)列的(步驟1).
如果隊(duì)列已經(jīng)存在的話,此方法不會(huì)做任何事情,否則,它會(huì)自己創(chuàng)建一個(gè)隊(duì)列.如果隊(duì)列已存在,但使用了不同的參數(shù)進(jìn)行創(chuàng)建,queueDeclare() 方法會(huì)拋出異常.
如果隊(duì)列已經(jīng)存在的話,此方法不會(huì)做任何事情,否則,它會(huì)自己創(chuàng)建一個(gè)隊(duì)列.如果隊(duì)列已存在,但使用了不同的參數(shù)進(jìn)行創(chuàng)建,queueDeclare() 方法會(huì)拋出異常.
注意,大部分的AMQP操作只是Channel Java接口的方法.
所有與broker交互的操作都需要通過channel來實(shí)施.
讓我們來深入探討queueDeclare() 方法. 其模板可以在http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/查看. 文檔看起來像下面這樣:

實(shí)際上我們使用了第二個(gè)重載的方法:
AMQP.Queue.DeclareOk queueDeclare(java.lang.String queue, boolean durable, boolean exclusive, booleanautoDelete,java.util.Map<java.lang.String,java.lang.Object> arguments)
AMQP.Queue.DeclareOk queueDeclare(java.lang.String queue, boolean durable, boolean exclusive, booleanautoDelete,java.util.Map<java.lang.String,java.lang.Object> arguments)
throws java.io.IOException
其參數(shù)含義是:
其參數(shù)含義是:
- queue: 用來存儲(chǔ)消息的隊(duì)列名稱
- durable: 用于指定當(dāng)服務(wù)器重啟時(shí),隊(duì)列是否能復(fù)活.注意,當(dāng)服務(wù)器重啟時(shí),如果要保證消息持久性,必須將隊(duì)列聲明為持久化的
- exclusive: 用于指定此隊(duì)列是否只限制于當(dāng)前連接.
- autoDelete:當(dāng)隊(duì)列不再使用時(shí),用于指示RabbitMQ broker是否要自動(dòng)刪除隊(duì)列.
- arguments: 這是一個(gè)可選的隊(duì)列構(gòu)建參數(shù)map.
在第二步中,實(shí)際上我們才會(huì)將消息發(fā)送到RabbitMQ broker.
RabbitMQ絕不打開消息體,對(duì)它來說,消息是透明的,因此你可以使用任何喜歡的序列化格式.通常我們會(huì)使用JSON, 但也可以使用XML, ASN.1, 標(biāo)準(zhǔn)的或自定義的ASCII或二進(jìn)制格式. 最重要的事情是客戶端程序需要知道如何來解析數(shù)據(jù).
現(xiàn)在我們來深度解析 basicPublish()方法:
void basicPublish(java.lang.String exchange,java.lang.String routingKey, AMQP.BasicProperties props, byte[] body) throws java.io.IOException
在我們的例子中,exchange參數(shù)被設(shè)置成空字符串"", 即默認(rèn)的交換器, routingKey 參數(shù)設(shè)置成了隊(duì)列的名稱. 在這種情況下,消息將直接發(fā)送到routingKey指定的隊(duì)列中. body 參數(shù)設(shè)置成了字符串的字節(jié)數(shù)組,也就是我們想要發(fā)送的消息. props 參數(shù)默認(rèn)設(shè)置成了null; 這些是消息屬性,我們將在Using message properties中深入討論.
在步驟3中,我們發(fā)送了完全相同的消息,但將消息屬性設(shè)置成了MessageProperties.PERSISTENT_TEXT_PLAIN;通過這種方式我們要求RabbitMQ將此消息標(biāo)記為持久化消息.
兩個(gè)消息已經(jīng)分發(fā)到了RabbitMQ broker, 邏輯上已經(jīng)在myFirstQueue隊(duì)列上排隊(duì)了. 消息會(huì)駐留在緩沖區(qū)中,直到有一個(gè)客戶端來消費(fèi)(通常來說,是一個(gè)不同的客戶端).
如果隊(duì)列和消息都聲明為持久化,消息就會(huì)被標(biāo)記為持久化的,broker會(huì)將其存儲(chǔ)在磁盤上.如果兩個(gè)條件中的任何一個(gè)缺失,消息將會(huì)存儲(chǔ)在內(nèi)存中. 對(duì)于后者來說,當(dāng)服務(wù)器重啟時(shí),緩沖消息將不會(huì)復(fù)活,但消息的投遞和獲取會(huì)更快.我們將Chapter 8, Performance Tuning for RabbitMQ來深入探討這個(gè)主題.
更多
在本章節(jié)中,我們將討論檢查RabbitMQ狀態(tài)的方法,以及隊(duì)列是否存在的方法.
在本章節(jié)中,我們將討論檢查RabbitMQ狀態(tài)的方法,以及隊(duì)列是否存在的方法.
如何檢查RabbitMQ狀態(tài)
要檢查RabbitMQ狀態(tài),你可以使用rabbitmqctl命令行工具.在Linux設(shè)置中,它應(yīng)該在PATH環(huán)境變量中.在Windows中,可在programs |
RabbitMQ Server | RabbitMQ Command Prompt (sbin dir). 我們可從命令行提示窗口中運(yùn)行rabbitmqctl.bat.
我們可以使用rbbitmqclt list_queues來檢查隊(duì)列狀態(tài).在下面的截屏中,顯示了運(yùn)行例子之前和之后的情景.

在上面的截屏中,我們可以看到myfirstqueue隊(duì)列,其后跟著數(shù)字2, 它表示緩存在我們隊(duì)列中的消息數(shù)目(待發(fā)送消息數(shù)目).
現(xiàn)在我們可以嘗試重啟RabbitMQ, 或者重啟主機(jī).成功重啟RabbitMQ依賴于使用的OS:
在Linux, RedHat, Centos, Fedora, Raspbian上:
service rabbitmq-server restart
在Linux, Ubuntu, Debian上:
/etc/init.d/rabbitmq restart
在Windows上:
sc stop rabbitmq / sc start rabbitmq
當(dāng)我們?cè)俅芜\(yùn)行rabbitmqclt list_queues 時(shí),能期望有多少個(gè)消息呢?
現(xiàn)在我們可以嘗試重啟RabbitMQ, 或者重啟主機(jī).成功重啟RabbitMQ依賴于使用的OS:
在Linux, RedHat, Centos, Fedora, Raspbian上:
service rabbitmq-server restart
在Linux, Ubuntu, Debian上:
/etc/init.d/rabbitmq restart
在Windows上:
sc stop rabbitmq / sc start rabbitmq
當(dāng)我們?cè)俅芜\(yùn)行rabbitmqclt list_queues 時(shí),能期望有多少個(gè)消息呢?
檢查隊(duì)列是否已經(jīng)存在
要確定特定隊(duì)列是否已經(jīng)存在, 用channel.queueDeclarePassive()來代替channel.queueDeclare(). 兩個(gè)方法在隊(duì)列已經(jīng)存在的情況下,會(huì)表現(xiàn)出相同的行為,否則,channel.queueDeclare()會(huì)創(chuàng)建隊(duì)列,但channel.queueDeclarePassive()會(huì)拋出異常.
消費(fèi)消息
在本配方中,我們將關(guān)閉此回路.我們已經(jīng)知道了如何將消息發(fā)送到RabbitMQ—或者任何AMQP broker—現(xiàn)在,我們要學(xué)習(xí)如何獲取這些消息.
你可以在Chapter01/Recipe03/src/rmqexample/ nonblocking 找到源碼.
你可以在Chapter01/Recipe03/src/rmqexample/ nonblocking 找到源碼.
如何做
要消費(fèi)前面配方中發(fā)送的消息,我們需要執(zhí)行下面的步驟:
1. 聲明我們要從哪里消費(fèi)消息的隊(duì)列:
String myQueue="myFirstQueue";
channel.queueDeclare(myQueue, true, false, false, null);
2. 定義一個(gè)繼承自DefaultConsumer的消費(fèi)類:
public class ActualConsumer extends DefaultConsumer {
public ActualConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws java.io.IOException {
String message = new String(body);
System.out.println("Received: " + message);
}
}
3. 創(chuàng)建一個(gè)消費(fèi)對(duì)象實(shí)例,再綁定到我們的channel上:
ActualConsumer consumer = new ActualConsumer(channel);
4. 開始消費(fèi)消息:
String consumerTag = channel.basicConsume(myQueue, true,consumer);
5. 一旦完成,取消消費(fèi)者(其API含義為:取消當(dāng)前消費(fèi)者(不能再收到此隊(duì)列上的消息,重新運(yùn)行消費(fèi)者可以收到消息)并調(diào)用consumer的handleCancelOk方法):
1. 聲明我們要從哪里消費(fèi)消息的隊(duì)列:
String myQueue="myFirstQueue";
channel.queueDeclare(myQueue, true, false, false, null);
2. 定義一個(gè)繼承自DefaultConsumer的消費(fèi)類:
public class ActualConsumer extends DefaultConsumer {
public ActualConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws java.io.IOException {
String message = new String(body);
System.out.println("Received: " + message);
}
}
3. 創(chuàng)建一個(gè)消費(fèi)對(duì)象實(shí)例,再綁定到我們的channel上:
ActualConsumer consumer = new ActualConsumer(channel);
4. 開始消費(fèi)消息:
String consumerTag = channel.basicConsume(myQueue, true,consumer);
5. 一旦完成,取消消費(fèi)者(其API含義為:取消當(dāng)前消費(fèi)者(不能再收到此隊(duì)列上的消息,重新運(yùn)行消費(fèi)者可以收到消息)并調(diào)用consumer的handleCancelOk方法):
channel.basicCancel(consumerTag);
如何運(yùn)作
在我們建立了與AMQP broker的connection 和channel后,我們需要確保我們從哪個(gè)隊(duì)列來消費(fèi)消息(step 1).事實(shí)上,消費(fèi)者可以在生產(chǎn)者發(fā)送消息到隊(duì)列之前就已經(jīng)啟動(dòng)了,此時(shí)有可能隊(duì)列還不存在,為了避免隊(duì)列上后續(xù)操作失敗,我們需要聲明隊(duì)列(譯者注:但消費(fèi)聲明隊(duì)列這個(gè)動(dòng)作并不必須的,只要生產(chǎn)者聲明了隊(duì)列,消費(fèi)者不需要調(diào)用queueDeclare方法同樣可以消費(fèi)消息,在這里只能認(rèn)為是一種保險(xiǎn)措施).
TIP:
如何運(yùn)作
在我們建立了與AMQP broker的connection 和channel后,我們需要確保我們從哪個(gè)隊(duì)列來消費(fèi)消息(step 1).事實(shí)上,消費(fèi)者可以在生產(chǎn)者發(fā)送消息到隊(duì)列之前就已經(jīng)啟動(dòng)了,此時(shí)有可能隊(duì)列還不存在,為了避免隊(duì)列上后續(xù)操作失敗,我們需要聲明隊(duì)列(譯者注:但消費(fèi)聲明隊(duì)列這個(gè)動(dòng)作并不必須的,只要生產(chǎn)者聲明了隊(duì)列,消費(fèi)者不需要調(diào)用queueDeclare方法同樣可以消費(fèi)消息,在這里只能認(rèn)為是一種保險(xiǎn)措施).
TIP:
通過允許生產(chǎn)者和消費(fèi)者聲明相同的隊(duì)列,我們可以解藕其存在性,同時(shí)啟動(dòng)的順序也不重要.
步驟2的核心,我們通過覆蓋handleDelivery()方法定義了我們特定的消費(fèi)者,以及在步驟3中我們進(jìn)行實(shí)例化。在Java client API中,消費(fèi)者回調(diào)是通過com.rabbitmq.client.Consumer接口定義的.我們從 DefaultConsumer擴(kuò)展了我們的消費(fèi)者,DefaultConsumer提供了Consumer 接口所有方法中無具體操作的實(shí)現(xiàn).在步驟3中,通過調(diào)用channel.basicConsume(),我們讓消費(fèi)者開始了消費(fèi)消息.每個(gè)channel的消費(fèi)者總是同一個(gè)線程上執(zhí)行,而且是獨(dú)立于調(diào)用者的.
現(xiàn)在我們已經(jīng)從myQueue中激活了一個(gè)消費(fèi)者,Java client library就會(huì)開始從RabbitMQ broker的隊(duì)列中獲取消息,并且會(huì)對(duì)每個(gè)消費(fèi)者都調(diào)用handleDelivery().
在channel.basicConsume()方法調(diào)用后,我們會(huì)坐等主線程結(jié)束. 消息正在以非阻塞方式進(jìn)行消費(fèi)。
只有當(dāng)我們按Enter之后, 執(zhí)行過程會(huì)到步驟5,然后消費(fèi)者退出.在這個(gè)時(shí)刻,消費(fèi)者線程會(huì)停止調(diào)用我們的消費(fèi)者對(duì)象,因此我們可以釋放資源并退出。
現(xiàn)在我們已經(jīng)從myQueue中激活了一個(gè)消費(fèi)者,Java client library就會(huì)開始從RabbitMQ broker的隊(duì)列中獲取消息,并且會(huì)對(duì)每個(gè)消費(fèi)者都調(diào)用handleDelivery().
在channel.basicConsume()方法調(diào)用后,我們會(huì)坐等主線程結(jié)束. 消息正在以非阻塞方式進(jìn)行消費(fèi)。
只有當(dāng)我們按Enter之后, 執(zhí)行過程會(huì)到步驟5,然后消費(fèi)者退出.在這個(gè)時(shí)刻,消費(fèi)者線程會(huì)停止調(diào)用我們的消費(fèi)者對(duì)象,因此我們可以釋放資源并退出。
更多
在本章節(jié)中,我們將了解更多關(guān)于消費(fèi)者線程以及阻塞語(yǔ)義的用法.
更多的消費(fèi)者線程
在連接定義期間,RabbitMQ Java API 會(huì)按消費(fèi)者線程需要分配一個(gè)線程池。所有綁定到同一個(gè)channel的消費(fèi)者都會(huì)使用線程池中的單個(gè)線程來運(yùn)行;但是,有可能不同channel的消費(fèi)者也可通過同一個(gè)線程來處理. 這就是為什么要在消費(fèi)方法避免長(zhǎng)時(shí)間操作的原因,為了避免阻塞其它消費(fèi)者,可以在我們的自己定義的線程池中進(jìn)行處理,就像我們例子中展示的一樣,但這不是必須的。我們已經(jīng)定義了一個(gè)線程池, java.util.concurrent.ExecutorService, 因此可在連接期間將其傳入:
ExecutorService eService = Executors.newFixedThreadPool(10);
Connection connection = factory.newConnection(eService);
這是由我們來進(jìn)行管理的,因此我們要負(fù)責(zé)對(duì)其終止:
eService.shutdown();
但是,必須要記住的是,如果你沒有定義你自己的ExecutorService線程池,Java client library會(huì)在連接創(chuàng)建期間創(chuàng)建一個(gè),并會(huì)在銷毀對(duì)應(yīng)連接時(shí),自動(dòng)銷毀連接池。
更多的消費(fèi)者線程
在連接定義期間,RabbitMQ Java API 會(huì)按消費(fèi)者線程需要分配一個(gè)線程池。所有綁定到同一個(gè)channel的消費(fèi)者都會(huì)使用線程池中的單個(gè)線程來運(yùn)行;但是,有可能不同channel的消費(fèi)者也可通過同一個(gè)線程來處理. 這就是為什么要在消費(fèi)方法避免長(zhǎng)時(shí)間操作的原因,為了避免阻塞其它消費(fèi)者,可以在我們的自己定義的線程池中進(jìn)行處理,就像我們例子中展示的一樣,但這不是必須的。我們已經(jīng)定義了一個(gè)線程池, java.util.concurrent.ExecutorService, 因此可在連接期間將其傳入:
ExecutorService eService = Executors.newFixedThreadPool(10);
Connection connection = factory.newConnection(eService);
這是由我們來進(jìn)行管理的,因此我們要負(fù)責(zé)對(duì)其終止:
eService.shutdown();
但是,必須要記住的是,如果你沒有定義你自己的ExecutorService線程池,Java client library會(huì)在連接創(chuàng)建期間創(chuàng)建一個(gè),并會(huì)在銷毀對(duì)應(yīng)連接時(shí),自動(dòng)銷毀連接池。
阻塞語(yǔ)義
也可以使用阻塞語(yǔ)義,但如果不是用于簡(jiǎn)單程序和測(cè)試用例,我們強(qiáng)烈反對(duì)這種方法;本配方中的消息消費(fèi)是非阻塞的。然而,如果你要查找阻塞方案的源代碼的話,可以參考Chapter01/Recipe03/
src/rmqexample/blocking.
See also
在官方的http://www.rabbitmq.com/releases/rabbitmq-java-client/currentjavadoc/com/rabbitmq/client/Consumer.html 的Javadoc文檔中,你可以找到消費(fèi)者接口的所有可用方法。
使用JSON來序列化消息體(y)tion with JSON
在AMQP中,消息是不透明的實(shí)體,AMQP不提供任何標(biāo)準(zhǔn)的方式來編解碼消息.但是,web應(yīng)用程序經(jīng)常使用JSON來作為應(yīng)用程序?qū)痈袷剑琂avaSciprt序列化格式已經(jīng)變成了事實(shí)上的標(biāo)準(zhǔn),在這種情況下,RabbitMQ client Java library 可以包含一些實(shí)用函數(shù).另一方面,這也不是唯一的協(xié)議,任何程序可以選擇它自己的協(xié)議(XML, Google Protocol Buffers, ASN.1, or proprietary).
在這個(gè)例子中,我們將展示如何使用JSON協(xié)議來編解碼消息 體. 我們會(huì)使用Java編寫的發(fā)布者(Chapter01/Recipe04/Java_4/src/rmqexample)來發(fā)送消息,并用 Python語(yǔ)言編寫的消費(fèi)者來消費(fèi)消息 (Chapter01/Recipe04/Python04).
在AMQP中,消息是不透明的實(shí)體,AMQP不提供任何標(biāo)準(zhǔn)的方式來編解碼消息.但是,web應(yīng)用程序經(jīng)常使用JSON來作為應(yīng)用程序?qū)痈袷剑琂avaSciprt序列化格式已經(jīng)變成了事實(shí)上的標(biāo)準(zhǔn),在這種情況下,RabbitMQ client Java library 可以包含一些實(shí)用函數(shù).另一方面,這也不是唯一的協(xié)議,任何程序可以選擇它自己的協(xié)議(XML, Google Protocol Buffers, ASN.1, or proprietary).
在這個(gè)例子中,我們將展示如何使用JSON協(xié)議來編解碼消息 體. 我們會(huì)使用Java編寫的發(fā)布者(Chapter01/Recipe04/Java_4/src/rmqexample)來發(fā)送消息,并用 Python語(yǔ)言編寫的消費(fèi)者來消費(fèi)消息 (Chapter01/Recipe04/Python04).
如何做How to do it…
要實(shí)現(xiàn)一個(gè)Java生產(chǎn)者和一個(gè)Python消費(fèi)者, 你可以執(zhí)行下面的步驟:
1. Java: 除了導(dǎo)入Connecting to the broker配方中提到的包外,我們還要導(dǎo)入:
import com.rabbitmq.tools.json.JSONWriter;
2. Java: 創(chuàng)建一個(gè)非持久化隊(duì)列:
String myQueue="myJSONBodyQueue_4";
channel.queueDeclare(MyQueue, false, false, false, null);
3. Java: 創(chuàng)建一個(gè)使用樣例數(shù)據(jù)的Book列表:
List<Book>newBooks = new ArrayList<Book>();
for (inti = 1; i< 11; i++) {
Book book = new Book();
book.setBookID(i);
book.setBookDescription("History VOL: " + i );
book.setAuthor("John Doe");
newBooks.add(book);
}
4. Java: 使用JSONwriter來序列化newBooks實(shí)例:
JSONWriter rabbitmqJson = new JSONWriter();
String jsonmessage = rabbitmqJson.write(newBooks);
5. Java: 最后發(fā)送jsonmessage:
channel.basicPublish("",MyQueue,null,jsonmessage.getBytes());
6. Python: 要使用Pika library,我們必須要導(dǎo)入下面的包:
import pika;
import json;
Python 有JSON處理的內(nèi)鍵包.
7. Python: 創(chuàng)建RabbitMQ的連接,使用下面的代碼:
connection =pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
8. Python: 聲明隊(duì)列,綁定消費(fèi)者,然后再注冊(cè)回調(diào):
channel = connection.channel()
my_queue = "myJSONBodyQueue_4"
channel.queue_declare(queue=my_queue)
channel.basic_consume(consumer_callback, queue=my_queue,no_ack=True)
channel.start_consuming()
How it works…
在我們?cè)O(shè)置環(huán)境后(步驟1和步驟2),我們使用了write(newbooks)來序列化newbooks類。此方法返回返回的JSON字符串,就像下面的展示的一樣:
[
{
"author" : "John Doe",
"bookDescription" : "History VOL: 1",
"bookID" : 1
},
{
"author" : "John Doe",
"bookDescription" : "History VOL: 2",
"bookID" : 2
}
]
步驟4中,我們發(fā)布了一個(gè)jsonmessage到myJSONBodyQueue_4隊(duì)列中.現(xiàn)在Python消費(fèi)者可以從同一個(gè)隊(duì)列中獲取消息。在Python中我們看如何操作:
connection =pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
channel = connection.channel()
queue_name = "myJSONBodyQueue_4"
channel.queue_declare(queue=my_queue)
..
channel.basic_consume(consumer_callback, queue=my_queue,no_ack=True)
channel.start_consuming()
正如Java實(shí)現(xiàn)中看到的一樣,我們必須創(chuàng)建一個(gè)連接,然后再創(chuàng)建一個(gè)通道.channel.queue_declare(queue=myQueue)方法,我們聲明了非持久化,不受連接限制,不會(huì)自己刪除的隊(duì)列。 如果要改變隊(duì)列的屬性,我們方法中添加參數(shù),就像下面這樣:
channel.queue_declare(queue=myQueue,durable=True)
當(dāng)不同AMQP clients聲明了相同隊(duì)列時(shí),那么確保有相同的durable, exclusive, 和autodelete 屬性是相當(dāng)重要的(如果隊(duì)列名稱相同,但屬性不同會(huì)拋異常),否則, channel.queue_declare()會(huì)拋異常。
對(duì)于channel.basic_consume()方法, client會(huì)從給定的隊(duì)列中消費(fèi)消息,當(dāng)接收到消息后,會(huì)調(diào)用consumer_callback()回調(diào)方法。
在Java中我們是在消費(fèi)者接口中定義的回調(diào),但在Python中,它們只是傳遞給basic_consume()方法, 更多的功能,更少的聲明,是Python的典范.
consumer_callback回調(diào)如下:
def consumer_callback(ch, method, properties, body):
newBooks=json.loads(body);
print" Count books:",len(newBooks);
for item in newBooks:
print 'ID:',item['bookID'], '-
Description:',item['bookDescription'],' -
Author:',item['author']
回調(diào)接收到消息后,使用json.loads()來反序列化消息,然后就可以準(zhǔn)備讀取newBooks的結(jié)構(gòu)了。
要實(shí)現(xiàn)一個(gè)Java生產(chǎn)者和一個(gè)Python消費(fèi)者, 你可以執(zhí)行下面的步驟:
1. Java: 除了導(dǎo)入Connecting to the broker配方中提到的包外,我們還要導(dǎo)入:
import com.rabbitmq.tools.json.JSONWriter;
2. Java: 創(chuàng)建一個(gè)非持久化隊(duì)列:
String myQueue="myJSONBodyQueue_4";
channel.queueDeclare(MyQueue, false, false, false, null);
3. Java: 創(chuàng)建一個(gè)使用樣例數(shù)據(jù)的Book列表:
List<Book>newBooks = new ArrayList<Book>();
for (inti = 1; i< 11; i++) {
Book book = new Book();
book.setBookID(i);
book.setBookDescription("History VOL: " + i );
book.setAuthor("John Doe");
newBooks.add(book);
}
4. Java: 使用JSONwriter來序列化newBooks實(shí)例:
JSONWriter rabbitmqJson = new JSONWriter();
String jsonmessage = rabbitmqJson.write(newBooks);
5. Java: 最后發(fā)送jsonmessage:
channel.basicPublish("",MyQueue,null,jsonmessage.getBytes());
6. Python: 要使用Pika library,我們必須要導(dǎo)入下面的包:
import pika;
import json;
Python 有JSON處理的內(nèi)鍵包.
7. Python: 創(chuàng)建RabbitMQ的連接,使用下面的代碼:
connection =pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
8. Python: 聲明隊(duì)列,綁定消費(fèi)者,然后再注冊(cè)回調(diào):
channel = connection.channel()
my_queue = "myJSONBodyQueue_4"
channel.queue_declare(queue=my_queue)
channel.basic_consume(consumer_callback, queue=my_queue,no_ack=True)
channel.start_consuming()
How it works…
在我們?cè)O(shè)置環(huán)境后(步驟1和步驟2),我們使用了write(newbooks)來序列化newbooks類。此方法返回返回的JSON字符串,就像下面的展示的一樣:
[
{
"author" : "John Doe",
"bookDescription" : "History VOL: 1",
"bookID" : 1
},
{
"author" : "John Doe",
"bookDescription" : "History VOL: 2",
"bookID" : 2
}
]
步驟4中,我們發(fā)布了一個(gè)jsonmessage到myJSONBodyQueue_4隊(duì)列中.現(xiàn)在Python消費(fèi)者可以從同一個(gè)隊(duì)列中獲取消息。在Python中我們看如何操作:
connection =pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host));
channel = connection.channel()
queue_name = "myJSONBodyQueue_4"
channel.queue_declare(queue=my_queue)
..
channel.basic_consume(consumer_callback, queue=my_queue,no_ack=True)
channel.start_consuming()
正如Java實(shí)現(xiàn)中看到的一樣,我們必須創(chuàng)建一個(gè)連接,然后再創(chuàng)建一個(gè)通道.channel.queue_declare(queue=myQueue)方法,我們聲明了非持久化,不受連接限制,不會(huì)自己刪除的隊(duì)列。 如果要改變隊(duì)列的屬性,我們方法中添加參數(shù),就像下面這樣:
channel.queue_declare(queue=myQueue,durable=True)
當(dāng)不同AMQP clients聲明了相同隊(duì)列時(shí),那么確保有相同的durable, exclusive, 和autodelete 屬性是相當(dāng)重要的(如果隊(duì)列名稱相同,但屬性不同會(huì)拋異常),否則, channel.queue_declare()會(huì)拋異常。
對(duì)于channel.basic_consume()方法, client會(huì)從給定的隊(duì)列中消費(fèi)消息,當(dāng)接收到消息后,會(huì)調(diào)用consumer_callback()回調(diào)方法。
在Java中我們是在消費(fèi)者接口中定義的回調(diào),但在Python中,它們只是傳遞給basic_consume()方法, 更多的功能,更少的聲明,是Python的典范.
consumer_callback回調(diào)如下:
def consumer_callback(ch, method, properties, body):
newBooks=json.loads(body);
print" Count books:",len(newBooks);
for item in newBooks:
print 'ID:',item['bookID'], '-
Description:',item['bookDescription'],' -
Author:',item['author']
回調(diào)接收到消息后,使用json.loads()來反序列化消息,然后就可以準(zhǔn)備讀取newBooks的結(jié)構(gòu)了。
更多
包含在RabbitMQ client library中的JSON幫助類是非常簡(jiǎn)單的,在真實(shí)項(xiàng)目中,你可以使用外部JSON library.如:強(qiáng)大的google-gson (https://code.google.com/p/google-gson/) 或 jackson (http://jackson.codehaus.org/).
包含在RabbitMQ client library中的JSON幫助類是非常簡(jiǎn)單的,在真實(shí)項(xiàng)目中,你可以使用外部JSON library.如:強(qiáng)大的google-gson (https://code.google.com/p/google-gson/) 或 jackson (http://jackson.codehaus.org/).
使用RPC消息
遠(yuǎn)程過程調(diào)用(RPC)通常用于client-server架構(gòu). client提出需要執(zhí)行服務(wù)器上的某些操作請(qǐng)求,然后等待服務(wù)器響應(yīng).
消息架構(gòu)試圖使用發(fā)后即忘(fire-and-forget)的消息形式來實(shí)施一種完全不同的解決方案,但是可以使用設(shè)計(jì)合理的AMQP隊(duì)列和增加型RPC來實(shí)施,如下所示:

上面的圖形描述了request queue是與responder相關(guān)聯(lián)的,reply queues 與callers是相聯(lián)的.但是,當(dāng)我們?cè)谑褂肦abbitMQ的時(shí)候,所有的涉及的端點(diǎn)(callers和responders) 都是AMQP clients.現(xiàn)在我們將描述Chapter01/Recipe05/Java_5/src/rmqexample/rpc例子中的操作步驟.
遠(yuǎn)程過程調(diào)用(RPC)通常用于client-server架構(gòu). client提出需要執(zhí)行服務(wù)器上的某些操作請(qǐng)求,然后等待服務(wù)器響應(yīng).
消息架構(gòu)試圖使用發(fā)后即忘(fire-and-forget)的消息形式來實(shí)施一種完全不同的解決方案,但是可以使用設(shè)計(jì)合理的AMQP隊(duì)列和增加型RPC來實(shí)施,如下所示:

上面的圖形描述了request queue是與responder相關(guān)聯(lián)的,reply queues 與callers是相聯(lián)的.但是,當(dāng)我們?cè)谑褂肦abbitMQ的時(shí)候,所有的涉及的端點(diǎn)(callers和responders) 都是AMQP clients.現(xiàn)在我們將描述Chapter01/Recipe05/Java_5/src/rmqexample/rpc例子中的操作步驟.
如何做
執(zhí)行下面的步驟來實(shí)現(xiàn)RPC responder:
1. 聲明一個(gè)請(qǐng)求隊(duì)列, responder會(huì)在此處來等候RPC請(qǐng)求:
channel.queueDeclare(requestQueue, false, false, false,null);
2. 通過覆蓋DefaultConsumer.handleDelivery()來定義我們特定的RpcResponderConsumer消費(fèi)者, 在接收到每個(gè)RPC請(qǐng)求的時(shí),消費(fèi)者將:
? 執(zhí)行RPC請(qǐng)求中的操作
? 準(zhǔn)備回復(fù)消息
? 通過下面的代碼在回復(fù)屬性中設(shè)置correlation ID:
BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
將答案發(fā)送到回復(fù)隊(duì)列中:
getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
?發(fā)送應(yīng)答給RPC request:
getChannel().basicAck(envelope.getDeliveryTag(), false);
3. 開始消費(fèi)消息,直到我們看到了回復(fù)消息才停止:
現(xiàn)在讓我們來執(zhí)行下面的步驟來實(shí)現(xiàn)RPC caller:
1. 聲明請(qǐng)求隊(duì)列,在這里responder會(huì)等待RPC請(qǐng)求:
channel.queueDeclare(requestQueue, false, false, false,null);
2. 創(chuàng)建一個(gè)臨時(shí)的,私有的,自動(dòng)刪除的回復(fù)隊(duì)列:
String replyQueue = channel.queueDeclare().getQueue();
3. 定義我們特定的消費(fèi)者RpcCallerConsumer, 它用于接收和處理RPC回復(fù). 它將:
? 當(dāng)收到回復(fù)時(shí),通過覆蓋handleDelivery()用于指明要做什么(在我們的例子中,定義了AddAction()):
執(zhí)行下面的步驟來實(shí)現(xiàn)RPC responder:
1. 聲明一個(gè)請(qǐng)求隊(duì)列, responder會(huì)在此處來等候RPC請(qǐng)求:
channel.queueDeclare(requestQueue, false, false, false,null);
2. 通過覆蓋DefaultConsumer.handleDelivery()來定義我們特定的RpcResponderConsumer消費(fèi)者, 在接收到每個(gè)RPC請(qǐng)求的時(shí),消費(fèi)者將:
? 執(zhí)行RPC請(qǐng)求中的操作
? 準(zhǔn)備回復(fù)消息
? 通過下面的代碼在回復(fù)屬性中設(shè)置correlation ID:
BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
將答案發(fā)送到回復(fù)隊(duì)列中:
getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
?發(fā)送應(yīng)答給RPC request:
getChannel().basicAck(envelope.getDeliveryTag(), false);
3. 開始消費(fèi)消息,直到我們看到了回復(fù)消息才停止:
現(xiàn)在讓我們來執(zhí)行下面的步驟來實(shí)現(xiàn)RPC caller:
1. 聲明請(qǐng)求隊(duì)列,在這里responder會(huì)等待RPC請(qǐng)求:
channel.queueDeclare(requestQueue, false, false, false,null);
2. 創(chuàng)建一個(gè)臨時(shí)的,私有的,自動(dòng)刪除的回復(fù)隊(duì)列:
String replyQueue = channel.queueDeclare().getQueue();
3. 定義我們特定的消費(fèi)者RpcCallerConsumer, 它用于接收和處理RPC回復(fù). 它將:
? 當(dāng)收到回復(fù)時(shí),通過覆蓋handleDelivery()用于指明要做什么(在我們的例子中,定義了AddAction()):
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws java.io.IOException {
String messageIdentifier =properties.getCorrelationId();
String action = actions.get(messageIdentifier);
actions.remove(messageIdentifier);
String response = new String(body);
OnReply(action, response);
}
4. 調(diào)用channel.basicConsume()方法啟動(dòng)消息消費(fèi).
5. 準(zhǔn)備和序列化請(qǐng)求(我們例子中是messageRequest).
6. 初始化一個(gè)任意的唯一消息標(biāo)識(shí)符(messageIdentifier).
7. 當(dāng)消費(fèi)者收到相應(yīng)回復(fù)的時(shí)候,定義應(yīng)該做什么,通過使用messageIdentifier來綁定動(dòng)作.在我們的例子中,我們通過調(diào)用我們自定義的方法 consumer.AddAction()來完成的.
8. 發(fā)布消息到請(qǐng)求隊(duì)列,設(shè)置其屬性:
BasicProperties props = new BasicProperties.Builder().correlationId(messageIdentifier)replyTo(replyQueue).build();
channel.basicPublish("", requestQueue,props,messageRequest.getBytes());
如何工作
在我們的例子中,RPC responder扮演的是RPC server的角色; responder會(huì)監(jiān)聽requestQueue公共隊(duì)列(步驟1),這里放置了調(diào)用者的請(qǐng)求.
另一方面,每個(gè)調(diào)用者會(huì)在其私有隊(duì)列上消費(fèi)responder的回復(fù)信息(步驟5).當(dāng)caller發(fā)送消息時(shí)(步驟11),它包含兩個(gè)屬性:一個(gè)是用于監(jiān)聽的臨時(shí)回復(fù)隊(duì)列 (replyTo())名稱,另一個(gè)是消息標(biāo)識(shí)(correlationId()),當(dāng)回復(fù)消息時(shí),用于標(biāo)識(shí)caller.事實(shí)上,在我們的例子中,我們已經(jīng)實(shí)現(xiàn)了一個(gè)異步的RPC caller. The action to be performed by the RpcCallerConsumer (step 6) when the reply comes back is recorded by the nonblocking consumer by calling AddAction() (step 10).
回到responder, RPC邏輯全在RpcResponderConsumer中.這不同于特定的非阻塞consumer,就像我們?cè)谙M(fèi)消息配方中看到的一樣,但不同的是下面兩點(diǎn)細(xì)節(jié):回復(fù)的隊(duì)列名稱是通過消息屬性來獲取的,即properties.getReplyTo().其值已經(jīng)被caller設(shè)成了私有,臨時(shí)回復(fù)隊(duì)列.
回復(fù)消息必須包含在correlation ID標(biāo)識(shí)的隊(duì)列中(待查)
TIP
RPC responde不會(huì)使用correlation ID;它只用來讓caller收到對(duì)應(yīng)請(qǐng)求的回復(fù)消息
更多
本章節(jié)我們會(huì)討論阻塞RPC的使用.
使用阻塞RPC
有時(shí),簡(jiǎn)單性比可擴(kuò)展性更重要.在這種情況下,可以使用包含在 Java RabbitMQ client library中實(shí)現(xiàn)了阻塞RPC語(yǔ)義的幫助類:
com.rabbitmq.client.RpcClient
com.rabbitmq.client.StringRpcServer
邏輯是相同的,但沒有非阻塞消費(fèi)者參與, 并且臨時(shí)隊(duì)列和correlation IDs 的處理對(duì)于用戶來說是透明的.
你可以在Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc找到相關(guān)的例子.
擴(kuò)展注意
當(dāng)有多個(gè)callers會(huì)發(fā)生什么呢?它主要以標(biāo)準(zhǔn)的RPC client/server 架構(gòu)來工作.但如果運(yùn)行多個(gè)reponders會(huì)怎樣呢?
在這種情況下,所有的responders都會(huì)從請(qǐng)求隊(duì)列中關(guān)注消費(fèi)消息. 此外, responders可位于不同的主機(jī). 這個(gè)主題的更多信息請(qǐng)參考配方-分發(fā)消息給多個(gè)消費(fèi)者.
本章節(jié)我們會(huì)討論阻塞RPC的使用.
使用阻塞RPC
有時(shí),簡(jiǎn)單性比可擴(kuò)展性更重要.在這種情況下,可以使用包含在 Java RabbitMQ client library中實(shí)現(xiàn)了阻塞RPC語(yǔ)義的幫助類:
com.rabbitmq.client.RpcClient
com.rabbitmq.client.StringRpcServer
邏輯是相同的,但沒有非阻塞消費(fèi)者參與, 并且臨時(shí)隊(duì)列和correlation IDs 的處理對(duì)于用戶來說是透明的.
你可以在Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc找到相關(guān)的例子.
擴(kuò)展注意
當(dāng)有多個(gè)callers會(huì)發(fā)生什么呢?它主要以標(biāo)準(zhǔn)的RPC client/server 架構(gòu)來工作.但如果運(yùn)行多個(gè)reponders會(huì)怎樣呢?
在這種情況下,所有的responders都會(huì)從請(qǐng)求隊(duì)列中關(guān)注消費(fèi)消息. 此外, responders可位于不同的主機(jī). 這個(gè)主題的更多信息請(qǐng)參考配方-分發(fā)消息給多個(gè)消費(fèi)者.
廣播消息es
在本例中,我們看到如何將同一個(gè)消息發(fā)送給有可能很大量的消費(fèi)者.這是一個(gè)典型的廣播消息到大量客戶端的消息應(yīng)用.舉例來說,在大型多人游戲中更新記分板的時(shí)候,或在一個(gè)社交網(wǎng)絡(luò)應(yīng)用中發(fā)布新聞的時(shí)候,都需要將消息廣播給多個(gè)消費(fèi)者.
在本例中,我們看到如何將同一個(gè)消息發(fā)送給有可能很大量的消費(fèi)者.這是一個(gè)典型的廣播消息到大量客戶端的消息應(yīng)用.舉例來說,在大型多人游戲中更新記分板的時(shí)候,或在一個(gè)社交網(wǎng)絡(luò)應(yīng)用中發(fā)布新聞的時(shí)候,都需要將消息廣播給多個(gè)消費(fèi)者.
在本配方中,我們同時(shí)探討生產(chǎn)者和消費(fèi)者實(shí)現(xiàn).因?yàn)樗欠浅5湫偷南M(fèi)者可以使用不同的技術(shù)和編程語(yǔ)言,在AMQP中,我們將使用Java, Python, 以及Ruby來展示這種互通性.
我們會(huì)感謝AMQP中隔離交換器和隊(duì)列帶來的好處.在Chapter01/Recipe06/中找到源碼.
如何做
要做這道菜,我們需要四個(gè)不同的代碼:
1. 聲明一個(gè)fanout類型的交換器:
channel.exchangeDeclare(myExchange, "fanout");
2. 發(fā)送一個(gè)消息到交換器:
channel.basicPublish(myExchange, "", null,jsonmessage.getBytes());
然后準(zhǔn)備Java消費(fèi)者:
1. 聲明同一個(gè)生產(chǎn)者聲明的fanout交換器:
channel.exchangeDeclare(myExchange, "fanout");
2. 自動(dòng)創(chuàng)建一個(gè)新的臨時(shí)隊(duì)列:
String queueName = channel.queueDeclare().getQueue();
3. 將隊(duì)列綁定到交換器上:
channel.queueBind(queueName, myExchange, "");
4. 定義一個(gè)自定義,非阻塞消費(fèi)者,這部分內(nèi)容已經(jīng)在消費(fèi)消息食譜中看到過了.
5. 調(diào)用channel.basicConsume()來消費(fèi)消息
相對(duì)于Java消費(fèi)者來說,Python消費(fèi)者的源碼是非常簡(jiǎn)單的,因此這里沒必要再重復(fù)必要的步驟,只需要遵循Java消費(fèi)者的步驟,可參考Chapter01/Recipe06/Python_6/PyConsumer.py的代碼.
Ruby消費(fèi)者中,你必須使用"bunny" 然后再使用URI連接.
可查看在Chapter01/Recipe06/Ruby_6/RbConsumer.rb的源碼
現(xiàn)在我們要把這些整合到一起來看食譜:
1. 啟動(dòng)一個(gè)Java生產(chǎn)者的實(shí)例; 消息將立即進(jìn)行發(fā)布.
2. 啟動(dòng)一個(gè)或多個(gè)Java/Python/Ruby的消費(fèi)者實(shí)例; 消費(fèi)者只有當(dāng)它們運(yùn)行的時(shí)候,才能接接收到消息.
3. 停止其中一個(gè)消費(fèi)者,而生產(chǎn)者繼續(xù)運(yùn)行,然后再重啟這個(gè)消費(fèi)者,我們可以看到消費(fèi)者在停止期間會(huì)丟失消息.
要做這道菜,我們需要四個(gè)不同的代碼:
- Java發(fā)布者
- Java消費(fèi)者
- Python消費(fèi)者
- Ruby消費(fèi)者
1. 聲明一個(gè)fanout類型的交換器:
channel.exchangeDeclare(myExchange, "fanout");
2. 發(fā)送一個(gè)消息到交換器:
channel.basicPublish(myExchange, "", null,jsonmessage.getBytes());
然后準(zhǔn)備Java消費(fèi)者:
1. 聲明同一個(gè)生產(chǎn)者聲明的fanout交換器:
channel.exchangeDeclare(myExchange, "fanout");
2. 自動(dòng)創(chuàng)建一個(gè)新的臨時(shí)隊(duì)列:
String queueName = channel.queueDeclare().getQueue();
3. 將隊(duì)列綁定到交換器上:
channel.queueBind(queueName, myExchange, "");
4. 定義一個(gè)自定義,非阻塞消費(fèi)者,這部分內(nèi)容已經(jīng)在消費(fèi)消息食譜中看到過了.
5. 調(diào)用channel.basicConsume()來消費(fèi)消息
相對(duì)于Java消費(fèi)者來說,Python消費(fèi)者的源碼是非常簡(jiǎn)單的,因此這里沒必要再重復(fù)必要的步驟,只需要遵循Java消費(fèi)者的步驟,可參考Chapter01/Recipe06/Python_6/PyConsumer.py的代碼.
Ruby消費(fèi)者中,你必須使用"bunny" 然后再使用URI連接.
可查看在Chapter01/Recipe06/Ruby_6/RbConsumer.rb的源碼
現(xiàn)在我們要把這些整合到一起來看食譜:
1. 啟動(dòng)一個(gè)Java生產(chǎn)者的實(shí)例; 消息將立即進(jìn)行發(fā)布.
2. 啟動(dòng)一個(gè)或多個(gè)Java/Python/Ruby的消費(fèi)者實(shí)例; 消費(fèi)者只有當(dāng)它們運(yùn)行的時(shí)候,才能接接收到消息.
3. 停止其中一個(gè)消費(fèi)者,而生產(chǎn)者繼續(xù)運(yùn)行,然后再重啟這個(gè)消費(fèi)者,我們可以看到消費(fèi)者在停止期間會(huì)丟失消息.
如何運(yùn)作
生產(chǎn)者和消費(fèi)者都通過單個(gè)連接連上了RabbitMQ,消息的邏輯路徑如下圖所示:

生產(chǎn)者和消費(fèi)者都通過單個(gè)連接連上了RabbitMQ,消息的邏輯路徑如下圖所示:

在步驟1中,我們已經(jīng)聲明了交換器,與隊(duì)列聲明的邏輯一樣: 如果指定的交換器不存在,將會(huì)進(jìn)行創(chuàng)建;否則,不做任何事情.exchangeDeclare()方法的第二個(gè)參數(shù)是一個(gè)字符串, 它用于指定交換器的類型,在我們這里,交換器類型是fanout.
在步驟2中,生產(chǎn)者向交換器發(fā)送了一條消息. 你可以使用下面的命令來查看它以及其它已定義的交換器:
rabbitmqctl list_exchanges
channel.basicPublish() 方法的第二個(gè)參數(shù)是路由鍵(routing key),在使用fanout交換器時(shí),此參數(shù)通常會(huì)忽略.第三個(gè)設(shè)置為null的參數(shù), 此參數(shù)代表可選的消息屬性(更多信息可參考使用消息屬性食譜).第四個(gè)參數(shù)是消息本身.
當(dāng)我們啟動(dòng)一個(gè)消費(fèi)者的時(shí)候,它創(chuàng)建一個(gè)它自己的臨時(shí)隊(duì)列(步驟9). 使用channel.queueDeclare()空重載,我們會(huì)創(chuàng)建一個(gè)非持久化,私有的,自動(dòng)刪除的,隊(duì)列名稱自動(dòng)生成的隊(duì)列.
運(yùn)行一對(duì)消費(fèi)者,并用rabbitmqctl list_queues查看,我們可以兩個(gè)隊(duì)列,每個(gè)消費(fèi)者一個(gè), 還有奇怪的名字,還有前面食譜中用到的持久化隊(duì)列myFirstQueue ,如下圖所示:
在步驟2中,生產(chǎn)者向交換器發(fā)送了一條消息. 你可以使用下面的命令來查看它以及其它已定義的交換器:
rabbitmqctl list_exchanges
channel.basicPublish() 方法的第二個(gè)參數(shù)是路由鍵(routing key),在使用fanout交換器時(shí),此參數(shù)通常會(huì)忽略.第三個(gè)設(shè)置為null的參數(shù), 此參數(shù)代表可選的消息屬性(更多信息可參考使用消息屬性食譜).第四個(gè)參數(shù)是消息本身.
當(dāng)我們啟動(dòng)一個(gè)消費(fèi)者的時(shí)候,它創(chuàng)建一個(gè)它自己的臨時(shí)隊(duì)列(步驟9). 使用channel.queueDeclare()空重載,我們會(huì)創(chuàng)建一個(gè)非持久化,私有的,自動(dòng)刪除的,隊(duì)列名稱自動(dòng)生成的隊(duì)列.
運(yùn)行一對(duì)消費(fèi)者,并用rabbitmqctl list_queues查看,我們可以兩個(gè)隊(duì)列,每個(gè)消費(fèi)者一個(gè), 還有奇怪的名字,還有前面食譜中用到的持久化隊(duì)列myFirstQueue ,如下圖所示:

在步驟5中,我們將隊(duì)列綁定到了myExchange交換器上.可以用下面的命令來監(jiān)控這些綁定:
rabbitmqctl list_bindings
監(jiān)控是AMQP非常重要的一面; 消息是通過交換器來路由到綁定隊(duì)列的,且會(huì)在隊(duì)列中緩存.
TIP
rabbitmqctl list_bindings
監(jiān)控是AMQP非常重要的一面; 消息是通過交換器來路由到綁定隊(duì)列的,且會(huì)在隊(duì)列中緩存.
TIP
交換器不會(huì)緩存消息,它只是邏輯元素.
fanout交換器在通過消息拷貝,來將消息路由到每個(gè)綁定的隊(duì)列中,因此,如果沒有綁定隊(duì)列,消息就不會(huì)被消費(fèi)者接收(參考處理未路由消息食譜來了解更多信息).
一旦我們關(guān)閉了消費(fèi)者,我們暗中地銷毀了其私有臨時(shí)隊(duì)列(這就是為什么隊(duì)列是自動(dòng)刪除的,否則,這些隊(duì)列在未使用后會(huì)保留下來,broker上的隊(duì)列數(shù)目會(huì)無限地增長(zhǎng)), 消息也不會(huì)緩存了.
當(dāng)重啟消費(fèi)者的時(shí)候,它會(huì)創(chuàng)建一個(gè)新的獨(dú)立的隊(duì)列,只要我們將其綁定到myExchange上,發(fā)布者發(fā)送的消息就會(huì)緩存到這個(gè)隊(duì)列上,并被消費(fèi)者消費(fèi).
更多
fanout交換器在通過消息拷貝,來將消息路由到每個(gè)綁定的隊(duì)列中,因此,如果沒有綁定隊(duì)列,消息就不會(huì)被消費(fèi)者接收(參考處理未路由消息食譜來了解更多信息).
一旦我們關(guān)閉了消費(fèi)者,我們暗中地銷毀了其私有臨時(shí)隊(duì)列(這就是為什么隊(duì)列是自動(dòng)刪除的,否則,這些隊(duì)列在未使用后會(huì)保留下來,broker上的隊(duì)列數(shù)目會(huì)無限地增長(zhǎng)), 消息也不會(huì)緩存了.
當(dāng)重啟消費(fèi)者的時(shí)候,它會(huì)創(chuàng)建一個(gè)新的獨(dú)立的隊(duì)列,只要我們將其綁定到myExchange上,發(fā)布者發(fā)送的消息就會(huì)緩存到這個(gè)隊(duì)列上,并被消費(fèi)者消費(fèi).
更多
當(dāng)RabbitMQ第一次啟動(dòng)的時(shí)候,它創(chuàng)建一些預(yù)定的交換器. 執(zhí)行rabbitmqctl list_exchanges命令,我們可以觀察到許多存在的交換器,也包含了我們?cè)诒臼匙V中定義的交換器:


所有出現(xiàn)在這里的amq.*交換器都是由AMQP brokers預(yù)先定義的,它可用來代替你定義你自己的交換器;它們不需要聲明.
我們可以使用amq.fanout來替換myLastnews.fanout_6, 對(duì)于簡(jiǎn)單應(yīng)用程序來說,這是很好的選擇. 但一般來說,應(yīng)用程序來聲明和使用它們自己的交換器.
本食譜使用的重載,交換器是非自動(dòng)刪除的(won't be deleted as soon as the last client detaches it) 和非持久化的(won't survive server restarts). 你可以在http://www.rabbitmq.com/releases/ rabbitmq-java-client/current-javadoc/找到更多的選項(xiàng)和重載.
我們可以使用amq.fanout來替換myLastnews.fanout_6, 對(duì)于簡(jiǎn)單應(yīng)用程序來說,這是很好的選擇. 但一般來說,應(yīng)用程序來聲明和使用它們自己的交換器.
本食譜使用的重載,交換器是非自動(dòng)刪除的(won't be deleted as soon as the last client detaches it) 和非持久化的(won't survive server restarts). 你可以在http://www.rabbitmq.com/releases/ rabbitmq-java-client/current-javadoc/找到更多的選項(xiàng)和重載.
使用Direct交換器來路由消息
要本食譜中,我們將看到如何選擇消費(fèi)消息子集(部分消息), 只路由那些感涂在的AMQP隊(duì)列,以及忽略其它隊(duì)列.
一個(gè)典型的使用場(chǎng)景是實(shí)現(xiàn)一個(gè)聊天器, 在這里每個(gè)隊(duì)列代表了一個(gè)用戶.我們可以查看下面的目錄找到相關(guān)的例子:Chapter01/Recipe07/Java_7/src/rmqexample/direct
一個(gè)典型的使用場(chǎng)景是實(shí)現(xiàn)一個(gè)聊天器, 在這里每個(gè)隊(duì)列代表了一個(gè)用戶.我們可以查看下面的目錄找到相關(guān)的例子:Chapter01/Recipe07/Java_7/src/rmqexample/direct
我們將展示如何同時(shí)實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者.實(shí)現(xiàn)生產(chǎn)者,執(zhí)行下面的步驟:
1. 聲明一個(gè)direct交換器:
channel.exchangeDeclare(exchangeName, "direct", false,false, null);
2. 發(fā)送一些消息到交換器,使用任意的routingKey 值:
channel.basicPublish(exchangeName, routingKey, null,jsonBook.getBytes());
要實(shí)現(xiàn)消費(fèi)者,執(zhí)行下面的步驟:
1. 聲明同樣的交換器,步驟與上面步驟相同.
2. 創(chuàng)建一個(gè)臨時(shí)隊(duì)列:
String myQueue = channel.queueDeclare().getQueue();
3. 使用bindingKey將隊(duì)列綁定到交換器上. 假如你要使用多個(gè)binding key,可按需要多次執(zhí)行這個(gè)操作:
channel.queueBind(myQueue,exchangeName,bindingKey);
4. 在創(chuàng)建了適當(dāng)?shù)南M(fèi)對(duì)象后,可以參考消費(fèi)消息食譜來消費(fèi)消息.
如何工作
在本食譜中,我們使用任意的字符串(也稱為路由鍵)來向direct交換器發(fā)布消息(step 2).在fanout交換器中,如果沒有綁定隊(duì)列的話,消息是不是存儲(chǔ)的,但在這里,根據(jù)在綁定時(shí)指定的綁定鍵,消費(fèi)者可以選擇消息轉(zhuǎn)發(fā)這些隊(duì)列(步驟5).
僅當(dāng)路由鍵與綁定鍵相同的消息才會(huì)被投遞到這些隊(duì)列.
僅當(dāng)路由鍵與綁定鍵相同的消息才會(huì)被投遞到這些隊(duì)列.
TIP
過濾操作是由AMQP broker來操作,而不是消費(fèi)者;路由鍵與綁定鍵不同的消息是不會(huì)放置到隊(duì)列中的.但是,可允許多個(gè)隊(duì)列使用相同的綁定鍵,broker會(huì)將匹配的消息進(jìn)行拷貝,并投遞給它們.也允許在同一個(gè)隊(duì)列/交換綁定上綁定多個(gè)不同的綁定鍵,這樣就可以投遞所有相應(yīng)的消息.
過濾操作是由AMQP broker來操作,而不是消費(fèi)者;路由鍵與綁定鍵不同的消息是不會(huì)放置到隊(duì)列中的.但是,可允許多個(gè)隊(duì)列使用相同的綁定鍵,broker會(huì)將匹配的消息進(jìn)行拷貝,并投遞給它們.也允許在同一個(gè)隊(duì)列/交換綁定上綁定多個(gè)不同的綁定鍵,這樣就可以投遞所有相應(yīng)的消息.
更多
假如我們使用指定的路由鍵來將消息投遞到交換器,但在這個(gè)指定鍵上卻沒有綁定隊(duì)列,那么消息會(huì)默默的銷毀.
然而, 當(dāng)發(fā)生這種情況時(shí),生產(chǎn)者可以檢測(cè)這種行為,正如處理未路由消息食譜中描述的一樣.
然而, 當(dāng)發(fā)生這種情況時(shí),生產(chǎn)者可以檢測(cè)這種行為,正如處理未路由消息食譜中描述的一樣.
使用topic交換器來路由消息
Direct 和topic 交換器在概念上有點(diǎn)相似,最大的不同點(diǎn)是direct交換器使用精準(zhǔn)匹配來選擇消息的目的地,而topic交換器允許使用通配符來進(jìn)行模式匹配.
例如, BBC使用使用topic交換器來將新故事路由到恰當(dāng)?shù)腞SS訂閱.
你可以在這里找到topic交換器的例子:Chapter01/Recipe08/Java_8/src/rmqexample/topic
如何做
例如, BBC使用使用topic交換器來將新故事路由到恰當(dāng)?shù)腞SS訂閱.
你可以在這里找到topic交換器的例子:Chapter01/Recipe08/Java_8/src/rmqexample/topic
如何做
我們先從生產(chǎn)者開始:
1. 聲明一個(gè)topic交換器:
channel.exchangeDeclare(exchangeName, "topic", false,false, null);
2. 使用任意的路由鍵將消息發(fā)送到交換器:
channel.basicPublish(exchangeName, routingKey, null,jsonBook.getBytes());
接下來,消費(fèi)者:
1. 聲明相同的交換,如步驟1做的一樣.
2. 創(chuàng)建一個(gè)臨時(shí)隊(duì)列:
String myQueue = channel.queueDeclare().getQueue();
3. 使用綁定鍵將隊(duì)列綁定到交換器上,這里也可以包含通配符:
channel.queueBind(myQueue,exchangeName,bindingKey);
4. 在創(chuàng)建適當(dāng)?shù)南M(fèi)者對(duì)象后,可以像消息消息食譜中一樣來消費(fèi)消息.
1. 聲明一個(gè)topic交換器:
channel.exchangeDeclare(exchangeName, "topic", false,false, null);
2. 使用任意的路由鍵將消息發(fā)送到交換器:
channel.basicPublish(exchangeName, routingKey, null,jsonBook.getBytes());
接下來,消費(fèi)者:
1. 聲明相同的交換,如步驟1做的一樣.
2. 創(chuàng)建一個(gè)臨時(shí)隊(duì)列:
String myQueue = channel.queueDeclare().getQueue();
3. 使用綁定鍵將隊(duì)列綁定到交換器上,這里也可以包含通配符:
channel.queueBind(myQueue,exchangeName,bindingKey);
4. 在創(chuàng)建適當(dāng)?shù)南M(fèi)者對(duì)象后,可以像消息消息食譜中一樣來消費(fèi)消息.
如何工作
以先前的食譜中,用字符串標(biāo)記來將消息發(fā)送到topic交換器中(步驟2),但對(duì)于topic交換器來說,組合多個(gè)逗號(hào)分隔的單詞也是很重要的;它們會(huì)被當(dāng)作主題消息.例如,在我們的例子中,我們用:
technology.rabbitmq.ebook
sport.golf.paper
sport.tennis.ebook
要消息這些消息,消費(fèi)者需要將myQueue綁定到交換器上(步驟5)
使用topic交換器, 步驟5中指定的訂閱綁定/綁定鍵可以是一系列逗號(hào)分隔的單詞或通配符. AMQP通配符只包括:
technology.rabbitmq.ebook
sport.golf.paper
sport.tennis.ebook
要消息這些消息,消費(fèi)者需要將myQueue綁定到交換器上(步驟5)
使用topic交換器, 步驟5中指定的訂閱綁定/綁定鍵可以是一系列逗號(hào)分隔的單詞或通配符. AMQP通配符只包括:
- #: 匹配0或多個(gè)單詞
- *: 只精確匹配一個(gè)單詞
- #.ebook 和 *.*.ebook 可匹配第一個(gè)和第三個(gè)發(fā)送消息
- sport.# and sport.*.* 可匹配第二個(gè)和第三個(gè)發(fā)送消息
- # 可匹配任何消息
更多
再次說明,如果消息不能投遞到任何隊(duì)列,它們會(huì)被默默地銷毀.當(dāng)發(fā)生此種情況時(shí),生產(chǎn)者可以檢測(cè)這種行為,就如處理未路由消息食譜中描述的一樣.
保證消息處理
在這個(gè)例子中,我們將展示在消費(fèi)消息時(shí),我們?nèi)绾蝸硎褂妹鞔_的應(yīng)答.消息在消費(fèi)者獲取并對(duì)broker作出應(yīng)答前,它會(huì)一直存在于隊(duì)列中.應(yīng)答可以是明確的或隱含的.在先前的例子中,我們使用的是隱含應(yīng)答.為了能實(shí)際查看這個(gè)例子,你可以運(yùn)行生產(chǎn)消息食譜中的發(fā)布者,然后你運(yùn)行消費(fèi)者來獲取消息,可在Chapter01/Recipe09/Java_9/中找到.
如何做
為了能保證消費(fèi)者處理完消息后能應(yīng)答消息,你可以執(zhí)行下面的步驟:
1. 聲明一個(gè)隊(duì)列:
channel.queueDeclare(myQueue, true, false, false,null);
2. 綁定消費(fèi)者與隊(duì)列,并設(shè)置basicConsume()方法的autoAck參數(shù)為false:
ActualConsumer consumer = new ActualConsumer(channel);
boolean autoAck = false; // n.b.
channel.basicConsume(MyQueue, autoAck, consumer);
3. 消費(fèi)消息,并發(fā)送應(yīng)答:
public void handleDelivery(String consumerTag,Envelope envelope, BasicPropertiesproperties,byte[] body) throws java.io.IOException {
String message = new String(body);
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
1. 聲明一個(gè)隊(duì)列:
channel.queueDeclare(myQueue, true, false, false,null);
2. 綁定消費(fèi)者與隊(duì)列,并設(shè)置basicConsume()方法的autoAck參數(shù)為false:
ActualConsumer consumer = new ActualConsumer(channel);
boolean autoAck = false; // n.b.
channel.basicConsume(MyQueue, autoAck, consumer);
3. 消費(fèi)消息,并發(fā)送應(yīng)答:
public void handleDelivery(String consumerTag,Envelope envelope, BasicPropertiesproperties,byte[] body) throws java.io.IOException {
String message = new String(body);
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
如何工作
如何工作
在創(chuàng)建隊(duì)列后(步驟1),我們將消費(fèi)者加入到隊(duì)列中,并且定義了應(yīng)答行為(步驟2).
參數(shù)autoack = false表明RabbitMQ client API會(huì)自己來發(fā)送明確的應(yīng)答.
在我們從隊(duì)列收到消息后,我們必須向RabbitMQ發(fā)送應(yīng)答,以表示我們收到到消息并適當(dāng)?shù)靥幚砹?因此我們調(diào)用了channel.basicAck()(步驟3).
RabbitMQ只有在收到了應(yīng)答后,才會(huì)從隊(duì)列中刪除消息.
參數(shù)autoack = false表明RabbitMQ client API會(huì)自己來發(fā)送明確的應(yīng)答.
在我們從隊(duì)列收到消息后,我們必須向RabbitMQ發(fā)送應(yīng)答,以表示我們收到到消息并適當(dāng)?shù)靥幚砹?因此我們調(diào)用了channel.basicAck()(步驟3).
RabbitMQ只有在收到了應(yīng)答后,才會(huì)從隊(duì)列中刪除消息.
TIP
如果在消費(fèi)者不發(fā)送應(yīng)答,消費(fèi)者會(huì)繼續(xù)接收后面的消息;但是,當(dāng)你斷開了消費(fèi)者后,所有的消息仍會(huì)保留在隊(duì)列中.消息在RabbitMQ收到應(yīng)答前,都認(rèn)為沒有被消費(fèi).可以注解basicAck()調(diào)用來演示這種行為.
channel.basicAck()方法有兩個(gè)參數(shù):
如果multiple設(shè)置為false,client只會(huì)應(yīng)答deliveryTag參數(shù)的消息, 否則,client會(huì)應(yīng)答此消息之前的所有消息. 通過向RabbitMQ應(yīng)答一組消息而不是單個(gè)消息,此標(biāo)志允許我們優(yōu)化消費(fèi)消息.
如果在消費(fèi)者不發(fā)送應(yīng)答,消費(fèi)者會(huì)繼續(xù)接收后面的消息;但是,當(dāng)你斷開了消費(fèi)者后,所有的消息仍會(huì)保留在隊(duì)列中.消息在RabbitMQ收到應(yīng)答前,都認(rèn)為沒有被消費(fèi).可以注解basicAck()調(diào)用來演示這種行為.
channel.basicAck()方法有兩個(gè)參數(shù):
- deliveryTag
- multiple
如果multiple設(shè)置為false,client只會(huì)應(yīng)答deliveryTag參數(shù)的消息, 否則,client會(huì)應(yīng)答此消息之前的所有消息. 通過向RabbitMQ應(yīng)答一組消息而不是單個(gè)消息,此標(biāo)志允許我們優(yōu)化消費(fèi)消息.
TIP
消息只能應(yīng)答一次,如果對(duì)同一個(gè)消息應(yīng)答了多次,方法會(huì)拋出preconditionfailed 異常.
調(diào)用channel.basicAck(0,true),則所有未應(yīng)答的消息都會(huì)得到應(yīng)答,0 代表所有消息.此外,調(diào)用channel.basicAck(0,false) 會(huì)引發(fā)異常.
消息只能應(yīng)答一次,如果對(duì)同一個(gè)消息應(yīng)答了多次,方法會(huì)拋出preconditionfailed 異常.
調(diào)用channel.basicAck(0,true),則所有未應(yīng)答的消息都會(huì)得到應(yīng)答,0 代表所有消息.此外,調(diào)用channel.basicAck(0,false) 會(huì)引發(fā)異常.
更多
下面的章節(jié),我們還會(huì)討論basicReject()方法,此方法是RabbitMQ擴(kuò)展,它允許更好的靈活性.
也可參考
分發(fā)消息到多個(gè)消費(fèi)者食譜是一個(gè)更好解釋明確應(yīng)答真實(shí)例子.
分發(fā)消息到多個(gè)消費(fèi)者
在這個(gè)例子中,我們將展示如何來創(chuàng)建一個(gè)動(dòng)態(tài)負(fù)責(zé)均衡器,以及如何將消息分發(fā)到多個(gè)消費(fèi)者.我們將創(chuàng)建一個(gè)文件下載器.
你可在Chapter01/Recipe10/Java_10/找到源碼.
你可在Chapter01/Recipe10/Java_10/找到源碼.
如何做
為了能讓兩個(gè)以上的RabbitMQ clients能盡可能的負(fù)載均衡來消費(fèi)消息,你必須遵循下面的步驟:
1. 聲明一個(gè)命令隊(duì)列, 并按下面這樣指定basicQos:
channel.queueDeclare(myQueue, false, false, false,null);
channel.basicQos(1);
2. 使用明確應(yīng)答來綁定一個(gè)消費(fèi)者:
channel.basicConsume(myQueue, false, consumer);
3. 使用channel.basicPublish()來發(fā)送一個(gè)或多個(gè)消息.
4. 運(yùn)行兩個(gè)或多個(gè)消費(fèi)者.
1. 聲明一個(gè)命令隊(duì)列, 并按下面這樣指定basicQos:
channel.queueDeclare(myQueue, false, false, false,null);
channel.basicQos(1);
2. 使用明確應(yīng)答來綁定一個(gè)消費(fèi)者:
channel.basicConsume(myQueue, false, consumer);
3. 使用channel.basicPublish()來發(fā)送一個(gè)或多個(gè)消息.
4. 運(yùn)行兩個(gè)或多個(gè)消費(fèi)者.
如何工作
發(fā)布者發(fā)送了一條帶下載地址的消息:
String messageUrlToDownload="http://www.rabbitmq.com/releases/rabbitmq-dotnetclient/v3.0.2/rabbitmq-dotnet-client-3.0.2-user-guide.pdf";
channel.basicPublish("",MyQueue,null,messageUrlToDownload.getBytes());
消費(fèi)者獲取到了這個(gè)消息:
System.out.println("Url to download:" + messageURL);
downloadUrl(messageURL);
一旦下載完成,消費(fèi)者將向broker發(fā)送應(yīng)答,并開始準(zhǔn)備下載下一個(gè):
getChannel().basicAck(envelope.getDeliveryTag(),false);
System.out.println("Ack sent!");
System.out.println("Wait for the next download...");
消費(fèi)者按塊的方式能獲取消息,但實(shí)際上,當(dāng)消費(fèi)者發(fā)送應(yīng)答時(shí),消息就會(huì)從隊(duì)列中刪除,在先前的食譜中,我們已經(jīng)看過這種情況了.
另一個(gè)方面,在本食譜中使用了多個(gè)消費(fèi)才,第一個(gè)會(huì)預(yù)先提取消息,其它后啟動(dòng)的消費(fèi)者在隊(duì)列中找不到任何可用的消息.為了平等地發(fā)分發(fā)消息,我們需要使用channel.basicQos(1)來指定一次只預(yù)先提取一個(gè)消息.
也可參考
String messageUrlToDownload="http://www.rabbitmq.com/releases/rabbitmq-dotnetclient/v3.0.2/rabbitmq-dotnet-client-3.0.2-user-guide.pdf";
channel.basicPublish("",MyQueue,null,messageUrlToDownload.getBytes());
消費(fèi)者獲取到了這個(gè)消息:
System.out.println("Url to download:" + messageURL);
downloadUrl(messageURL);
一旦下載完成,消費(fèi)者將向broker發(fā)送應(yīng)答,并開始準(zhǔn)備下載下一個(gè):
getChannel().basicAck(envelope.getDeliveryTag(),false);
System.out.println("Ack sent!");
System.out.println("Wait for the next download...");
消費(fèi)者按塊的方式能獲取消息,但實(shí)際上,當(dāng)消費(fèi)者發(fā)送應(yīng)答時(shí),消息就會(huì)從隊(duì)列中刪除,在先前的食譜中,我們已經(jīng)看過這種情況了.
另一個(gè)方面,在本食譜中使用了多個(gè)消費(fèi)才,第一個(gè)會(huì)預(yù)先提取消息,其它后啟動(dòng)的消費(fèi)者在隊(duì)列中找不到任何可用的消息.為了平等地發(fā)分發(fā)消息,我們需要使用channel.basicQos(1)來指定一次只預(yù)先提取一個(gè)消息.
也可參考
在Chapter 8, Performance Tuning for RabbitMQ中可以找到更多負(fù)載均衡的信息.
使用消息屬性
使用消息屬性
在這個(gè)例子中,我們將展示如何AMQP消息是如何分解的,以及如何使用消息屬性.
你可在Chapter01/Recipe11/Java_11/找到源碼.
如何做
你可在Chapter01/Recipe11/Java_11/找到源碼.
如何做
要訪問消息屬性,你必須執(zhí)行下面的步驟:
1. 聲明一個(gè)隊(duì)列:
channel.queueDeclare(MyQueue, false, false, false,null);
2. 創(chuàng)建一個(gè)BasicProperties類:
Map<String,Object>headerMap = new HashMap<String,Object>();
headerMap.put("key1", "value1");
headerMap.put("key2", new Integer(50) );
headerMap.put("key3", new Boolean(false));
headerMap.put("key4", "value4");
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.contentType("text/plain")
.userId("guest")
.appId("app id: 20")
.deliveryMode(1)
.priority(1)
.headers(headerMap)
.clusterId("cluster id: 1")
.build();
3. 使用消息屬性來發(fā)布消息:
channel.basicPublish("",myQueue,messageProperties,message.getBytes())
4. 消費(fèi)消息并打印屬性:
System.out.println("Property:" + properties.toString());
如何工作
1. 聲明一個(gè)隊(duì)列:
channel.queueDeclare(MyQueue, false, false, false,null);
2. 創(chuàng)建一個(gè)BasicProperties類:
Map<String,Object>headerMap = new HashMap<String,Object>();
headerMap.put("key1", "value1");
headerMap.put("key2", new Integer(50) );
headerMap.put("key3", new Boolean(false));
headerMap.put("key4", "value4");
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.contentType("text/plain")
.userId("guest")
.appId("app id: 20")
.deliveryMode(1)
.priority(1)
.headers(headerMap)
.clusterId("cluster id: 1")
.build();
3. 使用消息屬性來發(fā)布消息:
channel.basicPublish("",myQueue,messageProperties,message.getBytes())
4. 消費(fèi)消息并打印屬性:
System.out.println("Property:" + properties.toString());
如何工作
AMQP 消息(也稱為內(nèi)容)被分成了兩部分:
Map<String,Object>headerMap = new HashMap<String, Object>();
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.userId("guest")
.deliveryMode(1)
.priority(1)
.headers(headerMap)
.build();
在這個(gè)對(duì)象中,我們?cè)O(shè)置了下面的屬性:
- 內(nèi)容頭
- 內(nèi)容體(先前例子我們已經(jīng)看到過了)
Map<String,Object>headerMap = new HashMap<String, Object>();
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.userId("guest")
.deliveryMode(1)
.priority(1)
.headers(headerMap)
.build();
在這個(gè)對(duì)象中,我們?cè)O(shè)置了下面的屬性:
- timestamp: 消息時(shí)間戳.
- userId: 哪個(gè)用戶發(fā)送的消息(默認(rèn)是"guest"). 在下面的章節(jié)中,我們將了解用戶管理.
- deliveryMode: 如果設(shè)置為1,則消息是非持久化的, 如果設(shè)置為2,則消息是持久化的(你可以參考食譜連接broker).
- priority: 用于定義消息的優(yōu)先級(jí),其值可以是0到9.
- headers: 一個(gè)HashMap<String, Object>頭,你可以在其中自由地定義字段.
RabbitMQ BasicProperties 類是一個(gè)AMQP內(nèi)容頭實(shí)現(xiàn).BasicProperties的屬性可通過BasicProperties.Builder()構(gòu)建.頭準(zhǔn)備好了,我們可使用
channel.basicPublish("",myQueue, messageProperties,message.getBytes())來發(fā)送消息,在這里,messageProperties是消息頭,message是消息體.
在步驟中,消費(fèi)者獲得了一個(gè)消息:
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws java.io.IOException {
System.out.println("***********message header****************");
System.out.println("Message sent at:"+ properties.getTimestamp());
System.out.println("Message sent by user:"+ properties.getUserId());
System.out.println("Message sent by App:"+properties.getAppId());
System.out.println("all properties :" + properties.toString());
System.out.println("**********message body**************");
String message = new String(body);
System.out.println("Message Body:"+message);
}
參數(shù)properties包含了消息頭,body包含了消息體.
更多
在步驟中,消費(fèi)者獲得了一個(gè)消息:
public void handleDelivery(String consumerTag,Envelope envelope,BasicProperties properties,byte[] body) throws java.io.IOException {
System.out.println("***********message header****************");
System.out.println("Message sent at:"+ properties.getTimestamp());
System.out.println("Message sent by user:"+ properties.getUserId());
System.out.println("Message sent by App:"+properties.getAppId());
System.out.println("all properties :" + properties.toString());
System.out.println("**********message body**************");
String message = new String(body);
System.out.println("Message Body:"+message);
}
參數(shù)properties包含了消息頭,body包含了消息體.
更多
使用消息屬性可以優(yōu)化性能.將審計(jì)信息或日志信息寫入body,通常是一種典型的錯(cuò)誤,因?yàn)橄M(fèi)者需要解析body來獲取它們.
body 消息只可以包含應(yīng)用程序數(shù)據(jù)(如,一個(gè)Book class),而消息屬性可以持有消息機(jī)制相關(guān)或其它實(shí)現(xiàn)細(xì)節(jié)的相關(guān)信息.
例如 ,如果消費(fèi)者想知道消息是何時(shí)發(fā)送的,那么你可以使用timestamp屬性, 或者消費(fèi)者需要根據(jù)一個(gè)定制標(biāo)記來區(qū)分消息,你可以將它們放入header HashMap屬性中.
也可參考
body 消息只可以包含應(yīng)用程序數(shù)據(jù)(如,一個(gè)Book class),而消息屬性可以持有消息機(jī)制相關(guān)或其它實(shí)現(xiàn)細(xì)節(jié)的相關(guān)信息.
例如 ,如果消費(fèi)者想知道消息是何時(shí)發(fā)送的,那么你可以使用timestamp屬性, 或者消費(fèi)者需要根據(jù)一個(gè)定制標(biāo)記來區(qū)分消息,你可以將它們放入header HashMap屬性中.
也可參考
MessageProperties類對(duì)于標(biāo)準(zhǔn)情況,包含了一些預(yù)先構(gòu)建的BasicProperties類. 可查看http://www.rabbitmq.com/releases//rabbitmq-java-client/current-javadoc/com/rabbitmq/client/
MessageProperties.html
在這個(gè)例子中,我們只是使用了一些屬性,你可在http://www.rabbitmq.com/releases//rabbitmq-java-client/currentjavadoc/com/rabbitmq/client/AMQP.BasicProperties.html獲取更多信息.
MessageProperties.html
在這個(gè)例子中,我們只是使用了一些屬性,你可在http://www.rabbitmq.com/releases//rabbitmq-java-client/currentjavadoc/com/rabbitmq/client/AMQP.BasicProperties.html獲取更多信息.
消息事務(wù)
在本例中,我們將討論如何使用channel事務(wù). 在生產(chǎn)消息食譜中,我們已經(jīng)了解了如何來使用持久化消息,但如果broker不能將消息寫入磁盤的話,那么你就會(huì)丟失消息.使用AQMP事務(wù),你可以確保消息不會(huì)丟失.
你可在Chapter01/Recipe12/Java_12/找到相關(guān)源碼.
如何做
你可在Chapter01/Recipe12/Java_12/找到相關(guān)源碼.
如何做
通過下面的步驟,你可以使用事務(wù)性消息:
1. 創(chuàng)建持久化隊(duì)列
channel.queueDeclare(myQueue, true, false, false, null);
2. 設(shè)置channel為事務(wù)模式:
channel.txSelect();
3. 發(fā)送消息到隊(duì)列,然后提交操作:
channel.basicPublish("", myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
channel.txCommit();
1. 創(chuàng)建持久化隊(duì)列
channel.queueDeclare(myQueue, true, false, false, null);
2. 設(shè)置channel為事務(wù)模式:
channel.txSelect();
3. 發(fā)送消息到隊(duì)列,然后提交操作:
channel.basicPublish("", myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
channel.txCommit();
如何工作
在創(chuàng)建了持久化隊(duì)列后(step 1),我們將channel設(shè)置成了事務(wù)模式,使用的方法是txSelect() (step 2). 使用 txCommit()確保消息存儲(chǔ)在隊(duì)列并寫入磁盤,然后消息將投遞給消費(fèi)者.在txCommit() 或txRollback()之前,必須至少調(diào)用一次txSelect().
在一個(gè)DBMS中,你可以使用回滾方法.在下面的情況下,消息不會(huì)被存儲(chǔ)或投遞:
channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN ,message.getBytes());
channel.txRollback();
更多
在創(chuàng)建了持久化隊(duì)列后(step 1),我們將channel設(shè)置成了事務(wù)模式,使用的方法是txSelect() (step 2). 使用 txCommit()確保消息存儲(chǔ)在隊(duì)列并寫入磁盤,然后消息將投遞給消費(fèi)者.在txCommit() 或txRollback()之前,必須至少調(diào)用一次txSelect().
在一個(gè)DBMS中,你可以使用回滾方法.在下面的情況下,消息不會(huì)被存儲(chǔ)或投遞:
channel.basicPublish("",myQueue,MessageProperties.PERSISTENT_TEXT_PLAIN ,message.getBytes());
channel.txRollback();
更多
事務(wù)會(huì)降低應(yīng)用程序的性能,因?yàn)閎roker不會(huì)緩存消息,且tx操作是同步的.
也可參考
在后面的章節(jié)中,我們會(huì)討論發(fā)布確認(rèn)插件,這是一種較快確認(rèn)操作的方式.
處理未路由消息
在這個(gè)例子中,我們將展示如何管理未路由的消息. 未路由消息指的是沒有目的地的消息.如,一個(gè)消息發(fā)送到了無任何綁定隊(duì)列的交換器上.
未路由消息不同于死消息 ,前者是發(fā)送到無任何隊(duì)列目的地的交換器上,而后者指的是消息到達(dá)了隊(duì)列,但由于消費(fèi)者的決策,過期TTL,或者超過隊(duì)列長(zhǎng)度限制而被拒絕的消息 你可以在Chapter01/Recipe13/Java_13/找到源碼.
如何做
未路由消息不同于死消息 ,前者是發(fā)送到無任何隊(duì)列目的地的交換器上,而后者指的是消息到達(dá)了隊(duì)列,但由于消費(fèi)者的決策,過期TTL,或者超過隊(duì)列長(zhǎng)度限制而被拒絕的消息 你可以在Chapter01/Recipe13/Java_13/找到源碼.
如何做
為了處理未路由的消息,你需要執(zhí)行下面的操作:
1. 第一步實(shí)現(xiàn)ReturnListener接口:
public class HandlingReturnListener implements ReturnListener
@Override
public void handleReturn…
2. 將HandlingReturnListener類添加到channel.addReturnListener():
channel.addReturnListener(new HandlingReturnListener());
3. 然后創(chuàng)建一個(gè)交換機(jī):
channel.exchangeDeclare(myExchange, "direct", false, false,null);
4. 最后發(fā)布一個(gè)強(qiáng)制消息到交換器:
boolean isMandatory = true;
channel.basicPublish(myExchange, "",isMandatory, null,message.getBytes());
1. 第一步實(shí)現(xiàn)ReturnListener接口:
public class HandlingReturnListener implements ReturnListener
@Override
public void handleReturn…
2. 將HandlingReturnListener類添加到channel.addReturnListener():
channel.addReturnListener(new HandlingReturnListener());
3. 然后創(chuàng)建一個(gè)交換機(jī):
channel.exchangeDeclare(myExchange, "direct", false, false,null);
4. 最后發(fā)布一個(gè)強(qiáng)制消息到交換器:
boolean isMandatory = true;
channel.basicPublish(myExchange, "",isMandatory, null,message.getBytes());
如何工作
當(dāng)我們運(yùn)行發(fā)布者的時(shí)候,發(fā)送到myExchange的消息因?yàn)闆]有綁定任何隊(duì)列不會(huì)到達(dá)任何目的地.但這些消息不會(huì),它們會(huì)被重定向到一個(gè)內(nèi)部隊(duì)列. .HandlingReturnListener類會(huì)使用handleReturn()來處理這些消息.ReturnListener類綁定到了一個(gè)發(fā)布者channel上, 且它會(huì)獵捕那些不能路由的消息.
在源碼示例中,你可以找到消費(fèi)者,你也可以一起運(yùn)行生產(chǎn)者和消費(fèi)者,然后再停止消費(fèi)者.
更多
在源碼示例中,你可以找到消費(fèi)者,你也可以一起運(yùn)行生產(chǎn)者和消費(fèi)者,然后再停止消費(fèi)者.
更多
如果沒有設(shè)置channel ReturnListener, 未路由的消息只是被broker默默的拋棄.在這種情況下,你必須注意未路由消息,將mandatory 標(biāo)記設(shè)置為true是相當(dāng)重要的,如果為false,未路由的消息也會(huì)被拋棄.