本章我們將覆蓋:
- 如何使用消息過期
- 如何使指定隊(duì)列上的消息過期
- 如何讓隊(duì)列過期
- 管理駁回的(rejected)或過期的消息
- 理解其它備用交換器擴(kuò)展
- 理解有效user-ID擴(kuò)展
- 通知隊(duì)列消息者失敗
- 理解交換器到交換器擴(kuò)展
- 在消息中嵌入消息目的地
介紹
在本章中,我們將展示關(guān)于RabbitMQ擴(kuò)展上的一些食譜.這些擴(kuò)展不是AMQP 0-9-1標(biāo)準(zhǔn)的一部分,使用它們會破壞其它AMQPbroker的兼容性。
另一方面, 在AMQP 0-10 (http://www.amqp.org/specification/0-10/amqp-org-download)中也出現(xiàn)了輕微的變化,這是一個簡單通往那里的路徑.最后, 它們通常是優(yōu)化問題的有效解決方案。
另一方面, 在AMQP 0-10 (http://www.amqp.org/specification/0-10/amqp-org-download)中也出現(xiàn)了輕微的變化,這是一個簡單通往那里的路徑.最后, 它們通常是優(yōu)化問題的有效解決方案。
本章中的例子將更為真實(shí),例如,配置參數(shù),如列表和交換器, 以及路由鍵名稱將定義在Constants接口中。事實(shí)上,一個真正的應(yīng)用程序會遵循這樣的準(zhǔn)則從配置文件中讀取配置文件,以在不同應(yīng)用程序中共享。
然而,在下面的例子中,為了更簡短和較好的可讀性,我們并沒有指定Constants的命名空間。
如何讓消息過期
在本食譜中,我們將展示如何讓消息過期.食譜的資源可在Chapter02/Recipe01/Java/src/rmqexample中找到,如:
- Producer.java
- Consumer.java
- GetOne.java
準(zhǔn)備
為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
如何做
本示例的核心是Producer.java文件.為了產(chǎn)生在給定生存時(shí)間(TTL)后過期的消息,我們需要執(zhí)行下面的步驟:
1. 創(chuàng)建或聲明一個用來發(fā)送消息的交換器, 并將其綁定到隊(duì)列上,就像第1章使用AMQP看到的一樣:
channel.exchangeDeclare(exchange, "direct", false);
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, exchange, routingKey);
2. 像下面這樣初始化可選消息屬性TTL:
BasicPropertiesmsgProperties = new BasicProperties.Builder().expiration("20000").build();
3. 使用下面的代碼來發(fā)布消息:
channel.basicPublish(exchange, routingKey, msgProperties,statMsg.getBytes());
如何工作
在這個例子中,生產(chǎn)者創(chuàng)建了一個交換器,一個命名隊(duì)列,并將它們進(jìn)行了綁定,當(dāng)隊(duì)列上沒有附著任何消費(fèi)者,過期消息就顯得非常有意義了。
設(shè)置過期時(shí)間TTL (以毫秒設(shè)置),會促使RabbitMQ在消息過期時(shí),如果消息沒有被客戶端及時(shí)消費(fèi),立即刪除消息.
在我們的例子中,我們假設(shè)應(yīng)用程序發(fā)布了JVM資源統(tǒng)計(jì)信息到給定隊(duì)列,如果存在消費(fèi)者,那么會像平時(shí)一樣,獲取到實(shí)時(shí)數(shù)據(jù),反之,如果不存在這樣的消費(fèi)者,那么消息會給定生存時(shí)間后立即過期。通過這種方式,可以避免我們收集大量的數(shù)據(jù)。一旦消息者綁定到了隊(duì)列中,它會得到先前的消息(未過期)。進(jìn)一步的試驗(yàn),你可以用GetOne.java文件來替換Consumer.java文件運(yùn)行.
在調(diào)用 channel.basicGet() 時(shí),會使你一次只能消費(fèi)一個消息。
TIP
可使用channel.basicGet()方法來檢查未消費(fèi)消息的隊(duì)列.也可以通過為第二參數(shù)傳遞false來調(diào)用,即autoAck標(biāo)志.
在這里我們可以通過調(diào)用rabbitmqctl list_queues來監(jiān)控RabbitMQ隊(duì)列的狀態(tài)。
也可參考
默認(rèn)情況下,過期消息會丟失,但它們可以路由到其它地方。可參考管理拒絕消息或過期消息食譜來了解更多信息.
如何讓指定隊(duì)列上的消息過期
在本食譜中,我們將展示指定消息TTL的第二種方式.這次,我們不再通過消息屬性來指定,而是通過緩存消息的隊(duì)列來進(jìn)行指定。在這種情況下,生產(chǎn)者只是簡單地發(fā)布消息到交換器中,因此,在交換器上綁定標(biāo)準(zhǔn)隊(duì)列和過期消息隊(duì)列是可行的。
要在這方面進(jìn)行備注,須存在一個創(chuàng)建自定義的隊(duì)列的消費(fèi)者。生產(chǎn)者是相當(dāng)標(biāo)準(zhǔn)的.
像前面的食譜一樣,你可以在Chapter02/Recipe02/Java/src/rmqexample找到這三個源碼。
準(zhǔn)備
為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
如何做現(xiàn)在我們將展示創(chuàng)建特定消息TTL隊(duì)列的必要步驟。在我們的例子中,需要在Consumer.java文件中執(zhí)行下面的步驟:
1. 按下面來聲明交換器:
channel.exchangeDeclare(exchange, "direct", false);
2. 創(chuàng)建或聲明隊(duì)列,像下在這樣為x-message-ttl可選參數(shù)指定10,000毫秒的超時(shí)時(shí)間:
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queue, false, false, false, arguments);
3. 綁定隊(duì)列到交換器上:
channel.queueBind(queue, exchange, routingKey);
如何工作
在這個例子中,為了最終分析,我們再次假設(shè)生產(chǎn)者發(fā)送了JVM統(tǒng)計(jì)數(shù)據(jù)給RabbitMQ。最終因?yàn)?span style="line-height: 1.5; background-color: inherit;">Producer.java文件將其發(fā)到一個交換機(jī),如果無消費(fèi)者連接的話,消息最終會丟失。
想要監(jiān)控或分析這些統(tǒng)計(jì)數(shù)據(jù)的消費(fèi)有下面三種選擇:
- 綁定到一個臨時(shí)隊(duì)列,即調(diào)用無參的channel.queueDeclare()方法
- 綁定到一個非自動刪除的命名隊(duì)列
- 綁定到一個非自動刪除的命名隊(duì)列,并且指定x-message-ttl ,如步驟2中展示的一樣.
在第一種情況中,消費(fèi)者將獲取實(shí)時(shí)統(tǒng)計(jì)數(shù)據(jù),但當(dāng)它掉線期間,它將不能在數(shù)據(jù)上執(zhí)行分析。
在第二種情況中,為了能讓它掉線期間,能獲取到發(fā)送的消息,可以使用一個命名隊(duì)列(最終是持久化的).但在掉線較長時(shí)間后,再重啟時(shí),它將有巨大的backlog來進(jìn)行恢復(fù),因此在隊(duì)列中可能存在大部分舊消息的垃圾。
在第三種情況中,舊消息垃圾會通過RabbitMQ自己來執(zhí)行,以使我們從消費(fèi)者和broker中獲益。
更多
當(dāng)設(shè)置per-queue TTL, 就像本食譜中看到的一樣,只要未到超時(shí)時(shí)間,消息就不會被丟棄,此時(shí)消費(fèi)者還可以嘗試消費(fèi)它們。
當(dāng)使用queue TTL時(shí), 這里有一個細(xì)微的變化,但使用per-message TTL時(shí),在broker隊(duì)列中可能會存在過期消息.
在這種情況下,這些過期消息仍然會占據(jù)資源(內(nèi)存),同時(shí)broker統(tǒng)計(jì)數(shù)據(jù)中仍然會計(jì)數(shù),直到它們不會到隊(duì)列頭部時(shí)。
也中參考
在這種情況下,過期消息也會恢復(fù)。參考管理駁回或過期消息食譜.
如何讓隊(duì)列過期
在第三種情況中,TTL不關(guān)聯(lián)任何消息,只關(guān)聯(lián)對列。這種情況對于服務(wù)器重啟和更新,是一個完美的選擇。一旦TTL超時(shí),在最后一個消費(fèi)者停止消費(fèi)后,RabbitMQ會丟棄隊(duì)列.
前面TTL相關(guān)食譜,你可在Chapter02/Recipe03/Java/src/rmqexample 中找到 Producer.java , Consumer.java ,and GetOne.java 相關(guān)文件。
準(zhǔn)備
為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
如何做
在前面的例子中,擴(kuò)展只需要關(guān)注Consumer.java :
1. 使用下面的代碼來創(chuàng)建或聲明交換器:
channel.exchangeDeclare(exchange, "direct", false);
2. 創(chuàng)建或聲明隊(duì)列,并為x-expires可選參數(shù)指定30,000毫秒的超時(shí)時(shí)間:
Map<String, Object> arguments = new HashMap<String,Object>();
arguments.put("x-expires", 30000);
channel.queueDeclare(queue, false, false, false,arguments);
3. 將隊(duì)列綁定到交換器上:
channel.queueBind(queue, exchange, routingKey);
如何工作
當(dāng)我們運(yùn)行Consumer.java或 GetOne.java 文件的時(shí)候, 超時(shí)隊(duì)列已經(jīng)創(chuàng)建好了,在消費(fèi)者附著到隊(duì)列上或調(diào)用channel.basicGet()時(shí),它將持續(xù)存在.
只有當(dāng)我們停止這兩個操作超過30秒時(shí),隊(duì)列才會被刪除,并且隊(duì)列包含的消息也會清除。
TIP
無論生產(chǎn)者是否向其發(fā)送了消息,隊(duì)列事實(shí)上都是獨(dú)立刪除的。
在這個試驗(yàn)課程中,我們可通過 rabbitmqctl list_queues 命令來監(jiān)控RabbitMQ 隊(duì)列狀態(tài).
因此,我們可以想像一種場景,有一個統(tǒng)計(jì)分析程序需要重啟來更新其代碼。由于命名隊(duì)列有較長的超時(shí)時(shí)間,因此重啟時(shí),不會丟失任何消息。如果我們停止,隊(duì)列會在超過TTL后被刪除,無價(jià)值的消息將不再存儲。
管理駁回或過期消息
在這個例子中,我們將展示如何使用死信交換器來管理過期或駁回的消息. 死信交換器是一種正常的交換器,死消息會在這里重定向,如果沒有指定,死消息會被broker丟棄。
你可以在Chapter02/Recipe04/Java/src/rmqexample中找到源碼文件:
- Producer.java
- Consumer.java
要嘗試過期消息,你可以使用第一個代碼來發(fā)送帶TTL的消息,就如如何使指定隊(duì)列上消息過期食譜中描述的一樣.
一旦啟動了,消費(fèi)者不允許消息過期,但可以可以駁回消息,最終導(dǎo)致成為死消息。
準(zhǔn)備
為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
如何做
下面的步驟展示了使用死信交換器來管理過期或駁回消息:
1. 創(chuàng)建一個工作交換品節(jié)和死信交換器:
channel.exchangeDeclare(Constants.exchange, "direct", false);
channel.exchangeDeclare(Constants.exchange_dead_letter,"direct", false);
2. 創(chuàng)建使用使用死信交換器和 x-message-ttle參數(shù)的隊(duì)列:
arguments.put("x-message-ttl", 10000);
arguments.put("x-dead-letter-exchange",exchange_dead_letter);
channel.queueDeclare(queue, false, false, false,arguments);
3. 然后像下面這樣綁定隊(duì)列:
channel.queueBind(queue, exchange, "");
4. 最后使用channel.basicPublish()來向交換器發(fā)送消息 .
5. 要嘗試駁回消息,我們需要配置一個消費(fèi)者,就像前面例子中看到的一樣,并使用下面的代碼來駁回消息:
basicReject(envelope.getDeliveryTag(), false);
如何工作
我們先從第一個場景開始(單獨(dú)使用producer): the expired messages. 在步驟中,我們創(chuàng)建兩個交換器,工作交換器和死信交換器。在步驟2中,我們使用下面兩個可選參數(shù)來創(chuàng)建隊(duì)列:
- 使用arguments.put("x-message-ttl", 10000)來設(shè)置消息TTL ,正如如何使指定隊(duì)列上消息過期食譜中描述的一樣.
- 使用arguments.put("x-dead-letter-exchange", exchange_dead_letter)來設(shè)置死信交換器名稱;
正如你所看到的,我們只是在配置中添加了可選的隊(duì)列參數(shù)。因此,當(dāng)生產(chǎn)者發(fā)送消息到交換器時(shí),它會隊(duì)列參數(shù)來路由。消息會在10秒后過期,之后它會重定向到exchange_dead_letter
TIP
死信交換器是一個標(biāo)準(zhǔn)的交換器,因此你可以基于任何目的來使用.
對于第二種場景,食譜的消費(fèi)者會駁回消息.當(dāng)消費(fèi)者得到消息后, 它會使用basicReject()方法來發(fā)回一個否定應(yīng)答(nack),當(dāng)broker收到nack時(shí),它會將消息重定向到exchange_dead_letter. 通過在死信交換器上綁定隊(duì)列,你可以管理這些消息。
當(dāng)消息重定向到死信隊(duì)列時(shí),broker會修改header消息,并在x-dead鍵中增加下面的值:
- reason : 表示隊(duì)列是否過期的或駁回的(requeue =false )
- queue : 表示隊(duì)列源,例如stat_queue_02/05
- time : 表示消息變?yōu)樗佬诺娜掌诤蜁r(shí)間
- exchange : 表示交換器,如monitor_exchange_02/05
- routing-keys : 表示發(fā)送消息時(shí)原先使用的路由鍵
要在實(shí)踐中查看這些值,你可使用GetOneDeadLetterQ 類.這可以創(chuàng)建queue_dead_letter隊(duì)列并會綁定到exchange_dead_letter
更多
你也可以使用arguments.put("x-dead-letter-routing-key", "myroutingkey")來指定死信路由鍵 ,它將會代替原來的路由鍵.這也就意味著你可以用不同的路由鍵來將不同消息路由到同一個隊(duì)列中。相當(dāng)棒。
理解交替交換器擴(kuò)展
目前,在第1章使用 AMQP中我們已經(jīng)展示了如何來處理未路由消息(消息發(fā)布到了交換器,但未能達(dá)到隊(duì)列). AMQP讓生產(chǎn)者通過此條件進(jìn)行應(yīng)答,并最終決定是否有需要再次將消息分發(fā)到不同的目的地。通過這種擴(kuò)展,我們可在broker中指定一個交替交換器來路由消息,而并不會對生產(chǎn)者造成更多的干預(yù),本食譜的代碼在Chapter02/Recipe05/Java/src/rmqexample .
準(zhǔn)備
為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
如何做
在本食譜中,我們會在Producer.java中聲明交替交換器.
1. 將交換器的名字(無目的地路由消息)-alternateExchange ,放到可選參數(shù)map的"alternate-exchange"中,如下所示:
Map<String, Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange", alternateExchange);
2. 通過傳遞arguments map來聲明交換器來發(fā)送消息:
channel.exchangeDeclare(exchange, "direct", false, false,arguments);
3. 聲明alternateExchange自身(已經(jīng)在步驟1中指定了),如下所示:
channel.exchangeDeclare(alternateExchange, "direct",false);
4. 聲明標(biāo)準(zhǔn)化持久化隊(duì)列,并使用路由鍵alertRK將其綁定到alternateExchange交換器中:
channel.queueDeclare(missingAlertQueue, true, false, false,null);
channel.queueBind(missingAlertQueue, alternateExchange,alertRK);
如何工作
在這個例子中,我們再次使用了生成統(tǒng)計(jì)數(shù)據(jù)的producer,正如先前的例子一樣.但這次,我們添加了路由鍵來讓producer指定一個重要的級別,名為infoRK或alertRK (在例子中是隨機(jī)分配的).如果你運(yùn)行一個producer以及至少一個consumer,將不會丟失任何消息,并且一切都會正常工作.
TIP
Consumers在交換器和隊(duì)列的聲明中,必須傳遞相同的可選參數(shù),否則會拋出異常。
但如果沒有消費(fèi)者監(jiān)聽的話,而我們不想丟失報(bào)警的話,這就是為什么必須選擇讓producer創(chuàng)建alternateExchange (步驟3)并將其綁定到持久隊(duì)列-missingAlertQueue的原因 (步驟4).
在單獨(dú)運(yùn)行producer的時(shí)候,你將看到報(bào)警存儲在這里.alternate交換器讓我們在不丟失消息的情況下可以路由消息.你可通過調(diào)用rabbitmqctllist_queues或運(yùn)行CheckAlerts.java來檢查狀態(tài) .
最后的代碼讓我們可以查看隊(duì)列的內(nèi)容和第一個消息,但不會進(jìn)行消費(fèi)。完成這種行為是簡單的,它足可以避免這種事實(shí):RabbitMQ client發(fā)送了ack,消息未消費(fèi),而只是進(jìn)行監(jiān)控。
現(xiàn)在,如果我們再次運(yùn)行Consumer.java文件,它會從missingAlertQueue隊(duì)列中獲取并消費(fèi)消息.這不是自動的,我們可以選擇性地從此隊(duì)列中獲取消息。
通過創(chuàng)建第二個消費(fèi)者實(shí)例( missingAlertConsumer ) 并使用相同的代碼來從兩個不同隊(duì)列消費(fèi)消息就可以完成這種效果。如果在處理實(shí)時(shí)消息時(shí),想要得到不同的行為,那么我們可以創(chuàng)建一個不同的消費(fèi)者。
更多
在這個例子中,步驟3和步驟4是可選的。 當(dāng)定義交換器時(shí),可為交替交換器指定名稱,對于其是否存在或是否綁定到任何隊(duì)列上,并不作強(qiáng)制要求 。如果交替交換器不存在,生產(chǎn)者可通過在丟失消息上設(shè)置mandatory標(biāo)志來得到應(yīng)答,就如在第1章中處理未路由消息食譜中看到的一樣。
甚至有可能出現(xiàn)另一種交換器-它自己的備用交換器,備用交換器可以是鏈?zhǔn)降模⑶覠o目的地消息在按序地重試,直到找到一個目的地。
如果在交換器鏈的末尾仍然沒有找到目的地,消息將會丟失,生產(chǎn)者可通過調(diào)設(shè)置mandatory 標(biāo)志和指定一個合適的ReturnListener參數(shù)得到通知。
理解經(jīng)過驗(yàn)證的user-ID擴(kuò)展
依據(jù)AMQP, 當(dāng)消費(fèi)者得到消息時(shí),它是不知道發(fā)送者信息的。一般說來,消費(fèi)者不應(yīng)該關(guān)心是誰生產(chǎn)的消息,對于生產(chǎn)者-消費(fèi)者解藕來說是相當(dāng)有利的。然而,有時(shí)出于認(rèn)證需要,為了達(dá)到此目的,RabbitMQ 提供了有效的user-ID擴(kuò)展。
在本例中,我們使用有效user-IDs模擬了訂單。你可在Chapter02/Recipe06/Java/src/rmqexample中找到源碼.
準(zhǔn)備
為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
如何做
完成下面的步驟,以使用經(jīng)過驗(yàn)證的user IDs來模擬訂單:
1. 像下面一樣聲明或使用持久化隊(duì)列:
channel.queueDeclare(queue, true, false, false, null);
2.發(fā)送消息時(shí),使用BasicProperties對象,在消息頭中指定一個user ID:
BasicProperties messageProperties = new BasicProperties.Builder()
.timestamp(new Date())
.userId("guest");
channel.basicPublish("",queue, messageProperties,bookOrderMsg.getBytes());
3. 消費(fèi)者獲取到訂單后,可像下面這樣打印訂單數(shù)據(jù)和消息頭:
System.out.println("The message has been placed by "+properties.getUserId());
如何工作
當(dāng)設(shè)置了user-ID時(shí),RabbitMQ 會檢查是否是同一個用戶打開的連接。在這個例子中,用戶是guest ,即RabbitMQ默認(rèn)用戶.
通過調(diào)用properties.getUserId() 方法,消費(fèi)者可以訪問發(fā)送者user ID。如果你想在步驟2中設(shè)置非當(dāng)前用戶的userId,channel.basicPublish()會拋出異常.
TIP
如果不使用user-ID屬性,用戶將是非驗(yàn)證的,properties.getUserId()方法會返回null.
也可參考
要更好的理解這個例子,你應(yīng)該知道用戶和虛擬機(jī)管理,這部分內(nèi)容將在下個章節(jié)中講解。在下個章節(jié)中,我們將了解如何通過在應(yīng)用程序中使用SSL來提高程序的安全性。只使用user-ID屬性,我們可保證用戶已認(rèn)證,但所有信息都是未加密的,因此很容易暴露。
隊(duì)列失敗時(shí)通知消費(fèi)者
根據(jù)AMQP標(biāo)準(zhǔn),消費(fèi)者不會得到隊(duì)列刪除的通知。一個正在刪除隊(duì)列上等待消息的消費(fèi)者不會收到任何錯誤信息,并會無限期地等待。然而,RabbitMQ client提供了一種擴(kuò)展來讓消息收到一個cancel參數(shù)-即消費(fèi)者cancel通知。我們馬上就會看到這個例子,你可在Chapter02/Recipe07/Java/src/rmqexample 中找到代碼.
準(zhǔn)備
為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
如何做
為了能讓擴(kuò)展工作,你只需要執(zhí)行下面的步驟:
1.在自定義的消費(fèi)者中覆蓋handleCancel()方法,可繼承于com.rabbitmq.client.DefaultConsumer (指的是ActualConsumer.java ):
public void handleCancel(String consumerTag) throws IOException {
...
}
如何工作
在我們的例子中,我們選擇實(shí)現(xiàn)一個消費(fèi)者,這個消費(fèi)者只在生產(chǎn)者是持久化的,且隊(duì)列是由生產(chǎn)者創(chuàng)建的情況下才能工作。
因此,如果隊(duì)列是非持久化的,Consumer.java文件會立即錯誤退出. 此行為可以通過調(diào)用channel.queueDeclarePassive()來完成 .
Producer.java類在其啟動時(shí)會創(chuàng)建隊(duì)列,并在其關(guān)閉時(shí)調(diào)用channel.queueDelete()方法刪除隊(duì)列,如果當(dāng)隊(duì)列關(guān)閉時(shí),而消費(fèi)者正在消費(fèi)隊(duì)列,那么RabbitMQ client會調(diào)用步驟1中覆蓋的handleCancel()方法來立即通知消費(fèi)者。
相對于顯示調(diào)用channel.basicCancel() 消費(fèi)者使用handleCancel()方法可以任意理由來退出。只有在這種情況下,RabbitMQ client library會調(diào)用Consumer接口的方法: handleCancelOK()
更多
消費(fèi)者cancel通知是client library的擴(kuò)展,而不是AMQP client libraries的常規(guī)方法.一個實(shí)例它們的library必須將其聲明為可選屬性(參考 http://www.rabbitmq.com/consumer-cancel. html#capabilities ).
RabbitMQ client library 支持并聲明了這種特性。
也可參考
在集群中,如果一個節(jié)點(diǎn)失效了,也會發(fā)生同樣的事情:client在隊(duì)列刪除后仍然得不到通知,除非它定義了覆蓋了自己的handleCancel()方法。關(guān)于這點(diǎn)的更多信息,可參考Chapter 6,開發(fā)可伸縮性應(yīng)用程序。
理解交換器到交換器擴(kuò)展
默認(rèn)情況下,AMQP支持交換器到隊(duì)列,但不支持交換器到交換器綁定。在本例中,我們將展示如何使用RabbitMQ 交換機(jī)到交換機(jī)擴(kuò)展.
在本例中,我們將合并來自兩個不同交換器的消息到第三個交換器中.你可以在Chapter02/Recipe08/Java/src/rmqexample找到源碼.
準(zhǔn)備
為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣,并像廣播消息食譜中來運(yùn)行生產(chǎn)者以及使用topic交換器來處理消息路由。
如何做
完成下面的步驟來使用RabbitMQ 交換器到交換器擴(kuò)展:
1. 使用下面的代碼來聲明我們需要追蹤消息的交換器:
channel.exchangeDeclare(exchange, "topic", false);
2. 使用exchangeBind()來綁定其它例子中的交換器 :
channel.exchangeBind(exchange,ref_exchange_c1_8,"#");
channel.exchangeBind(exchange,ref_exchange_c1_6,"#");
3. 啟動追蹤消費(fèi)者:
TraceConsumer consumer = new TraceConsumer(channel);
String consumerTag = channel.basicConsume(myqueue, false,consumer);
如何工作
在步驟1中,我們創(chuàng)建了一個新的交換器,在步驟2中我們綁定到了下面的交換器:
- ref_exchange_c1_6 (廣播消息) 與exchange綁定.
- ref_exchange_c1_8 (使用topic來處理消息路由)與exchange綁定 .
在步驟3中, 消費(fèi)者可以綁定一個隊(duì)列到exchange上以任意地獲取所有消息.
交換器到交換器擴(kuò)展的工作方式與交換器到隊(duì)列綁定過程類似,你也可以指定一個路由鍵來過濾消息.在步驟2中,我們可以使用#(匹配所有消息)來作為路由鍵。通過改變路由鍵你可以使用制作一個filter!
在消息中內(nèi)嵌消息目的地
在本例子中,我們會展示如何發(fā)送單個發(fā)布帶路由鍵的的消息.標(biāo)準(zhǔn)AMQP不提供此特性,但幸運(yùn)的是,RabbitMQ使用消息屬性header提供了此特性. 這種擴(kuò)展稱為sender-selected分發(fā).
此擴(kuò)展的行為類似于電子郵件邏輯.它使用Carbon Copy (CC)和Blind Carbon Copy (BCC).這也是為什么能在 Chapter02/Recipe09/Java/src/rmqexample中找到CC和BCC consumers的理由:
- Producer.java
- Consumer.java
- StatsConsumer.java
- CCStatsConsumer.java
- BCCStatsConsumer.java
準(zhǔn)備
To use this recipe, we need to set up the Java development environment as indicated in the Introduction section of Chapter 1, Working with AMQP.
如何做
完成下面的步驟來使用單個發(fā)布帶路由鍵的的消息:
1. 使用下面的代碼來創(chuàng)建或聲明交換器:
channel.exchangeDeclare(exchange, "direct", false);
2. 在消息的header屬性中指定CC , BCC路由鍵:
List<String> ccList = new ArrayList<String>();
ccList.add(backup_alert_routing_key);
headerMap.put("CC", ccList);
List<String> ccList = new ArrayList<String>();
bccList.add(send_alert_routing_key);
headerMap.put("BCC", bccList);
BasicProperties messageProperties = new BasicProperties.Builder().headers(headerMap).build();
channel.basicPublish(exchange, alert_routing_key,messageProperties, statMsg.getBytes());
3. 使用下面的三個路由鍵來綁定三個隊(duì)列three queues to the exchange using the following three routing keys:
channel.queueBind(myqueue,exchange, alert_routing_key);
channel.queueBind(myqueueCC_BK,exchange,backup_alert_routing_key);
channel.queueBind(myqueueBCC_SA,exchange,send_alert_routing_key);
4. 使用三個消費(fèi)者來消費(fèi)消息
如何工作
當(dāng)生產(chǎn)者使用CC和BCC消息屬性來發(fā)送消息時(shí),broker會在所有路由鍵的隊(duì)列上拷貝消息 。在本例中,stat類會直接使用路由鍵alert_routing_key來向交換器發(fā)送消息,同時(shí)它也會將消息拷貝到使用CC和BCC參數(shù)信息來將消息拷貝到myqueueCC_BK,myqueueBCC_SA隊(duì)列中。
當(dāng)像e-mails一樣發(fā)生時(shí),在分發(fā)消息到隊(duì)列前,BCC信息會被broker從消息頭中刪除,你可查看所有我們示例消費(fèi)者的輸出來觀察這種行為。
更多
正常情況下,AMQP不會改變消息頭,但BCC擴(kuò)展是例外。這種擴(kuò)展可減少發(fā)往broker的消息數(shù)目。沒有此擴(kuò)展,生產(chǎn)者只能使用不同的路由鍵來發(fā)送多個消息的拷貝。