隨筆 - 41  文章 - 7  trackbacks - 0
          <2016年6月>
          2930311234
          567891011
          12131415161718
          19202122232425
          262728293012
          3456789

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          本章我們將覆蓋:
          1. 如何使用消息過期
          2. 如何使指定隊(duì)列上的消息過期
          3. 如何讓隊(duì)列過期
          4. 管理駁回的(rejected)或過期的消息
          5. 理解其它備用交換器擴(kuò)展
          6. 理解有效user-ID擴(kuò)展
          7. 通知隊(duì)列消息者失敗
          8. 理解交換器到交換器擴(kuò)展
          9. 在消息中嵌入消息目的地
          介紹
          在本章中,我們將展示關(guān)于RabbitMQ擴(kuò)展上的一些食譜.這些擴(kuò)展不是AMQP 0-9-1標(biāo)準(zhǔn)的一部分,使用它們會破壞其它AMQPbroker的兼容性。
          另一方面, 在AMQP 0-10 (http://www.amqp.org/specification/0-10/amqp-org-download)中也出現(xiàn)了輕微的變化,這是一個簡單通往那里的路徑.最后, 它們通常是優(yōu)化問題的有效解決方案。

          本章中的例子將更為真實(shí),例如,配置參數(shù),如列表和交換器, 以及路由鍵名稱將定義在Constants接口中。事實(shí)上,一個真正的應(yīng)用程序會遵循這樣的準(zhǔn)則從配置文件中讀取配置文件,以在不同應(yīng)用程序中共享。
          然而,在下面的例子中,為了更簡短和較好的可讀性,我們并沒有指定Constants的命名空間。

          如何讓消息過期
          在本食譜中,我們將展示如何讓消息過期.食譜的資源可在Chapter02/Recipe01/Java/src/rmqexample中找到,如:
          1. Producer.java
          2. Consumer.java
          3. GetOne.java
          準(zhǔn)備
          為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
          如何做
          本示例的核心是Producer.java文件.為了產(chǎn)生在給定生存時(shí)間(TTL)后過期的消息,我們需要執(zhí)行下面的步驟:
          1. 創(chuàng)建或聲明一個用來發(fā)送消息的交換器, 并將其綁定到隊(duì)列上,就像第1章使用AMQP看到的一樣:
          channel.exchangeDeclare(exchange, "direct", false);
          channel.queueDeclare(queue, false, false, false, null);
          channel.queueBind(queue, exchange, routingKey);
          2. 像下面這樣初始化可選消息屬性TTL:
          BasicPropertiesmsgProperties = new BasicProperties.Builder().expiration("20000").build();
          3. 使用下面的代碼來發(fā)布消息:
          channel.basicPublish(exchange, routingKey, msgProperties,statMsg.getBytes());
          如何工作
          在這個例子中,生產(chǎn)者創(chuàng)建了一個交換器,一個命名隊(duì)列,并將它們進(jìn)行了綁定,當(dāng)隊(duì)列上沒有附著任何消費(fèi)者,過期消息就顯得非常有意義了。
          設(shè)置過期時(shí)間TTL (以毫秒設(shè)置),會促使RabbitMQ在消息過期時(shí),如果消息沒有被客戶端及時(shí)消費(fèi),立即刪除消息.
          在我們的例子中,我們假設(shè)應(yīng)用程序發(fā)布了JVM資源統(tǒng)計(jì)信息到給定隊(duì)列,如果存在消費(fèi)者,那么會像平時(shí)一樣,獲取到實(shí)時(shí)數(shù)據(jù),反之,如果不存在這樣的消費(fèi)者,那么消息會給定生存時(shí)間后立即過期。通過這種方式,可以避免我們收集大量的數(shù)據(jù)。一旦消息者綁定到了隊(duì)列中,它會得到先前的消息(未過期)。進(jìn)一步的試驗(yàn),你可以用GetOne.java文件來替換Consumer.java文件運(yùn)行.
          在調(diào)用 channel.basicGet() 時(shí),會使你一次只能消費(fèi)一個消息。
          TIP
          可使用channel.basicGet()方法來檢查未消費(fèi)消息的隊(duì)列.也可以通過為第二參數(shù)傳遞false來調(diào)用,即autoAck標(biāo)志.

          在這里我們可以通過調(diào)用rabbitmqctl list_queues來監(jiān)控RabbitMQ隊(duì)列的狀態(tài)。  

          也可參考
          默認(rèn)情況下,過期消息會丟失,但它們可以路由到其它地方。可參考管理拒絕消息或過期消息食譜來了解更多信息.

          如何讓指定隊(duì)列上的消息過期
          在本食譜中,我們將展示指定消息TTL的第二種方式.這次,我們不再通過消息屬性來指定,而是通過緩存消息的隊(duì)列來進(jìn)行指定。在這種情況下,生產(chǎn)者只是簡單地發(fā)布消息到交換器中,因此,在交換器上綁定標(biāo)準(zhǔn)隊(duì)列和過期消息隊(duì)列是可行的。
          要在這方面進(jìn)行備注,須存在一個創(chuàng)建自定義的隊(duì)列的消費(fèi)者。生產(chǎn)者是相當(dāng)標(biāo)準(zhǔn)的.
          像前面的食譜一樣,你可以在Chapter02/Recipe02/Java/src/rmqexample找到這三個源碼。
           
          準(zhǔn)備
          為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
          如何做
          現(xiàn)在我們將展示創(chuàng)建特定消息TTL隊(duì)列的必要步驟。在我們的例子中,需要在Consumer.java文件中執(zhí)行下面的步驟:
          1. 按下面來聲明交換器:
          channel.exchangeDeclare(exchange, "direct", false);
          2. 創(chuàng)建或聲明隊(duì)列,像下在這樣為x-message-ttl可選參數(shù)指定10,000毫秒的超時(shí)時(shí)間:
          Map<String, Object> arguments = new HashMap<String, Object>();
          arguments.put("x-message-ttl", 10000);
          channel.queueDeclare(queue, false, false, false, arguments);
          3. 綁定隊(duì)列到交換器上:
          channel.queueBind(queue, exchange, routingKey);
          如何工作
          在這個例子中,為了最終分析,我們再次假設(shè)生產(chǎn)者發(fā)送了JVM統(tǒng)計(jì)數(shù)據(jù)給RabbitMQ。最終因?yàn)?span style="line-height: 1.5; background-color: inherit;">Producer.java文件將其發(fā)到一個交換機(jī),如果無消費(fèi)者連接的話,消息最終會丟失。
          想要監(jiān)控或分析這些統(tǒng)計(jì)數(shù)據(jù)的消費(fèi)有下面三種選擇:
          1. 綁定到一個臨時(shí)隊(duì)列,即調(diào)用無參的channel.queueDeclare()方法
          2. 綁定到一個非自動刪除的命名隊(duì)列
          3. 綁定到一個非自動刪除的命名隊(duì)列,并且指定x-message-ttl ,如步驟2中展示的一樣.
          在第一種情況中,消費(fèi)者將獲取實(shí)時(shí)統(tǒng)計(jì)數(shù)據(jù),但當(dāng)它掉線期間,它將不能在數(shù)據(jù)上執(zhí)行分析。
          在第二種情況中,為了能讓它掉線期間,能獲取到發(fā)送的消息,可以使用一個命名隊(duì)列(最終是持久化的).但在掉線較長時(shí)間后,再重啟時(shí),它將有巨大的backlog來進(jìn)行恢復(fù),因此在隊(duì)列中可能存在大部分舊消息的垃圾。
          在第三種情況中,舊消息垃圾會通過RabbitMQ自己來執(zhí)行,以使我們從消費(fèi)者和broker中獲益。
          更多
          當(dāng)設(shè)置per-queue TTL, 就像本食譜中看到的一樣,只要未到超時(shí)時(shí)間,消息就不會被丟棄,此時(shí)消費(fèi)者還可以嘗試消費(fèi)它們。
          當(dāng)使用queue TTL時(shí), 這里有一個細(xì)微的變化,但使用per-message TTL時(shí),在broker隊(duì)列中可能會存在過期消息.
          在這種情況下,這些過期消息仍然會占據(jù)資源(內(nèi)存),同時(shí)broker統(tǒng)計(jì)數(shù)據(jù)中仍然會計(jì)數(shù),直到它們不會到隊(duì)列頭部時(shí)。
          也中參考
          在這種情況下,過期消息也會恢復(fù)。參考管理駁回或過期消息食譜.
          如何讓隊(duì)列過期
          在第三種情況中,TTL不關(guān)聯(lián)任何消息,只關(guān)聯(lián)對列。這種情況對于服務(wù)器重啟和更新,是一個完美的選擇。一旦TTL超時(shí),在最后一個消費(fèi)者停止消費(fèi)后,RabbitMQ會丟棄隊(duì)列.
          前面TTL相關(guān)食譜,你可在Chapter02/Recipe03/Java/src/rmqexample 中找到 Producer.java ,  Consumer.java ,and  GetOne.java 相關(guān)文件。
          準(zhǔn)備
          為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
          如何做
          在前面的例子中,擴(kuò)展只需要關(guān)注Consumer.java :
          1. 使用下面的代碼來創(chuàng)建或聲明交換器:
          channel.exchangeDeclare(exchange, "direct", false);
          2. 創(chuàng)建或聲明隊(duì)列,并為x-expires可選參數(shù)指定30,000毫秒的超時(shí)時(shí)間:
          Map<String, Object> arguments = new HashMap<String,Object>();
          arguments.put("x-expires", 30000);
          channel.queueDeclare(queue, false, false, false,arguments);
          3. 將隊(duì)列綁定到交換器上:
          channel.queueBind(queue, exchange, routingKey);
          如何工作
          當(dāng)我們運(yùn)行Consumer.java或 GetOne.java 文件的時(shí)候, 超時(shí)隊(duì)列已經(jīng)創(chuàng)建好了,在消費(fèi)者附著到隊(duì)列上或調(diào)用channel.basicGet()時(shí),它將持續(xù)存在.
          只有當(dāng)我們停止這兩個操作超過30秒時(shí),隊(duì)列才會被刪除,并且隊(duì)列包含的消息也會清除。
          TIP
          無論生產(chǎn)者是否向其發(fā)送了消息,隊(duì)列事實(shí)上都是獨(dú)立刪除的。

          在這個試驗(yàn)課程中,我們可通過 rabbitmqctl list_queues 命令來監(jiān)控RabbitMQ 隊(duì)列狀態(tài).
          因此,我們可以想像一種場景,有一個統(tǒng)計(jì)分析程序需要重啟來更新其代碼。由于命名隊(duì)列有較長的超時(shí)時(shí)間,因此重啟時(shí),不會丟失任何消息。如果我們停止,隊(duì)列會在超過TTL后被刪除,無價(jià)值的消息將不再存儲。
          管理駁回或過期消息
          在這個例子中,我們將展示如何使用死信交換器來管理過期或駁回的消息. 死信交換器是一種正常的交換器,死消息會在這里重定向,如果沒有指定,死消息會被broker丟棄。
          你可以在Chapter02/Recipe04/Java/src/rmqexample中找到源碼文件:
          1. Producer.java
          2. Consumer.java
          要嘗試過期消息,你可以使用第一個代碼來發(fā)送帶TTL的消息,就如如何使指定隊(duì)列上消息過期食譜中描述的一樣.
          一旦啟動了,消費(fèi)者不允許消息過期,但可以可以駁回消息,最終導(dǎo)致成為死消息。
          準(zhǔn)備
          為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
          如何做
          下面的步驟展示了使用死信交換器來管理過期或駁回消息:
          1. 創(chuàng)建一個工作交換品節(jié)和死信交換器:
          channel.exchangeDeclare(Constants.exchange, "direct", false);
          channel.exchangeDeclare(Constants.exchange_dead_letter,"direct", false);
          2. 創(chuàng)建使用使用死信交換器和 x-message-ttle參數(shù)的隊(duì)列:
          arguments.put("x-message-ttl", 10000);
          arguments.put("x-dead-letter-exchange",exchange_dead_letter);
          channel.queueDeclare(queue, false, false, false,arguments);
          3. 然后像下面這樣綁定隊(duì)列:
          channel.queueBind(queue, exchange, "");
          4. 最后使用channel.basicPublish()來向交換器發(fā)送消息 .
          5. 要嘗試駁回消息,我們需要配置一個消費(fèi)者,就像前面例子中看到的一樣,并使用下面的代碼來駁回消息:
          basicReject(envelope.getDeliveryTag(), false);
          如何工作
          我們先從第一個場景開始(單獨(dú)使用producer): the expired messages. 在步驟中,我們創(chuàng)建兩個交換器,工作交換器和死信交換器。在步驟2中,我們使用下面兩個可選參數(shù)來創(chuàng)建隊(duì)列:
          1. 使用arguments.put("x-message-ttl", 10000)來設(shè)置消息TTL ,正如如何使指定隊(duì)列上消息過期食譜中描述的一樣.
          2. 使用arguments.put("x-dead-letter-exchange", exchange_dead_letter)來設(shè)置死信交換器名稱;
          正如你所看到的,我們只是在配置中添加了可選的隊(duì)列參數(shù)。因此,當(dāng)生產(chǎn)者發(fā)送消息到交換器時(shí),它會隊(duì)列參數(shù)來路由。消息會在10秒后過期,之后它會重定向到exchange_dead_letter 
          TIP
          死信交換器是一個標(biāo)準(zhǔn)的交換器,因此你可以基于任何目的來使用.
          對于第二種場景,食譜的消費(fèi)者會駁回消息.當(dāng)消費(fèi)者得到消息后, 它會使用basicReject()方法來發(fā)回一個否定應(yīng)答(nack),當(dāng)broker收到nack時(shí),它會將消息重定向到exchange_dead_letter. 通過在死信交換器上綁定隊(duì)列,你可以管理這些消息。
          當(dāng)消息重定向到死信隊(duì)列時(shí),broker會修改header消息,并在x-dead鍵中增加下面的值:
          1. reason : 表示隊(duì)列是否過期的或駁回的(requeue =false )
          2. queue : 表示隊(duì)列源,例如stat_queue_02/05
          3. time : 表示消息變?yōu)樗佬诺娜掌诤蜁r(shí)間
          4. exchange : 表示交換器,如monitor_exchange_02/05
          5. routing-keys : 表示發(fā)送消息時(shí)原先使用的路由鍵
          要在實(shí)踐中查看這些值,你可使用GetOneDeadLetterQ 類.這可以創(chuàng)建queue_dead_letter隊(duì)列并會綁定到exchange_dead_letter 
          更多
          你也可以使用arguments.put("x-dead-letter-routing-key", "myroutingkey")來指定死信路由鍵 ,它將會代替原來的路由鍵.這也就意味著你可以用不同的路由鍵來將不同消息路由到同一個隊(duì)列中。相當(dāng)棒。
          理解交替交換器擴(kuò)展
          目前,在第1章使用 AMQP中我們已經(jīng)展示了如何來處理未路由消息(消息發(fā)布到了交換器,但未能達(dá)到隊(duì)列). AMQP讓生產(chǎn)者通過此條件進(jìn)行應(yīng)答,并最終決定是否有需要再次將消息分發(fā)到不同的目的地。通過這種擴(kuò)展,我們可在broker中指定一個交替交換器來路由消息,而并不會對生產(chǎn)者造成更多的干預(yù),本食譜的代碼在Chapter02/Recipe05/Java/src/rmqexample .

          準(zhǔn)備
          為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
          如何做
          在本食譜中,我們會在Producer.java中聲明交替交換器.
          1. 將交換器的名字(無目的地路由消息)-alternateExchange ,放到可選參數(shù)map的"alternate-exchange"中,如下所示:
          Map<String, Object> arguments = new HashMap<String,Object>();
          arguments.put("alternate-exchange", alternateExchange);
          2. 通過傳遞arguments map來聲明交換器來發(fā)送消息:
          channel.exchangeDeclare(exchange, "direct", false, false,arguments);
          3. 聲明alternateExchange自身(已經(jīng)在步驟1中指定了),如下所示:
          channel.exchangeDeclare(alternateExchange, "direct",false);
          4. 聲明標(biāo)準(zhǔn)化持久化隊(duì)列,并使用路由鍵alertRK將其綁定到alternateExchange交換器中:
          channel.queueDeclare(missingAlertQueue, true, false, false,null);
          channel.queueBind(missingAlertQueue, alternateExchange,alertRK);
          如何工作
          在這個例子中,我們再次使用了生成統(tǒng)計(jì)數(shù)據(jù)的producer,正如先前的例子一樣.但這次,我們添加了路由鍵來讓producer指定一個重要的級別,名為infoRK或alertRK (在例子中是隨機(jī)分配的).如果你運(yùn)行一個producer以及至少一個consumer,將不會丟失任何消息,并且一切都會正常工作.
          TIP
          Consumers在交換器和隊(duì)列的聲明中,必須傳遞相同的可選參數(shù),否則會拋出異常。
          但如果沒有消費(fèi)者監(jiān)聽的話,而我們不想丟失報(bào)警的話,這就是為什么必須選擇讓producer創(chuàng)建alternateExchange (步驟3)并將其綁定到持久隊(duì)列-missingAlertQueue的原因 (步驟4).
          在單獨(dú)運(yùn)行producer的時(shí)候,你將看到報(bào)警存儲在這里.alternate交換器讓我們在不丟失消息的情況下可以路由消息.你可通過調(diào)用rabbitmqctllist_queues或運(yùn)行CheckAlerts.java來檢查狀態(tài) .
          最后的代碼讓我們可以查看隊(duì)列的內(nèi)容和第一個消息,但不會進(jìn)行消費(fèi)。完成這種行為是簡單的,它足可以避免這種事實(shí):RabbitMQ client發(fā)送了ack,消息未消費(fèi),而只是進(jìn)行監(jiān)控。
          現(xiàn)在,如果我們再次運(yùn)行Consumer.java文件,它會從missingAlertQueue隊(duì)列中獲取并消費(fèi)消息.這不是自動的,我們可以選擇性地從此隊(duì)列中獲取消息。
          通過創(chuàng)建第二個消費(fèi)者實(shí)例( missingAlertConsumer ) 并使用相同的代碼來從兩個不同隊(duì)列消費(fèi)消息就可以完成這種效果。如果在處理實(shí)時(shí)消息時(shí),想要得到不同的行為,那么我們可以創(chuàng)建一個不同的消費(fèi)者。

          更多
          在這個例子中,步驟3和步驟4是可選的。 當(dāng)定義交換器時(shí),可為交替交換器指定名稱,對于其是否存在或是否綁定到任何隊(duì)列上,并不作強(qiáng)制要求 。如果交替交換器不存在,生產(chǎn)者可通過在丟失消息上設(shè)置mandatory標(biāo)志來得到應(yīng)答,就如在第1章中處理未路由消息食譜中看到的一樣。
          甚至有可能出現(xiàn)另一種交換器-它自己的備用交換器,備用交換器可以是鏈?zhǔn)降模⑶覠o目的地消息在按序地重試,直到找到一個目的地。
          如果在交換器鏈的末尾仍然沒有找到目的地,消息將會丟失,生產(chǎn)者可通過調(diào)設(shè)置mandatory 標(biāo)志和指定一個合適的ReturnListener參數(shù)得到通知。
          理解經(jīng)過驗(yàn)證的user-ID擴(kuò)展
          依據(jù)AMQP, 當(dāng)消費(fèi)者得到消息時(shí),它是不知道發(fā)送者信息的。一般說來,消費(fèi)者不應(yīng)該關(guān)心是誰生產(chǎn)的消息,對于生產(chǎn)者-消費(fèi)者解藕來說是相當(dāng)有利的。然而,有時(shí)出于認(rèn)證需要,為了達(dá)到此目的,RabbitMQ 提供了有效的user-ID擴(kuò)展。
          在本例中,我們使用有效user-IDs模擬了訂單。你可在Chapter02/Recipe06/Java/src/rmqexample中找到源碼.
          準(zhǔn)備
          為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。
          如何做
          完成下面的步驟,以使用經(jīng)過驗(yàn)證的user IDs來模擬訂單:
          1. 像下面一樣聲明或使用持久化隊(duì)列:
          channel.queueDeclare(queue, true, false, false, null);
          2.發(fā)送消息時(shí),使用BasicProperties對象,在消息頭中指定一個user ID:
          BasicProperties messageProperties = new BasicProperties.Builder()
          .timestamp(new Date())
          .userId("guest");
          channel.basicPublish("",queue, messageProperties,bookOrderMsg.getBytes());
          3. 消費(fèi)者獲取到訂單后,可像下面這樣打印訂單數(shù)據(jù)和消息頭:
          System.out.println("The message has been placed by "+properties.getUserId());
          如何工作
          當(dāng)設(shè)置了user-ID時(shí),RabbitMQ 會檢查是否是同一個用戶打開的連接。在這個例子中,用戶是guest ,即RabbitMQ默認(rèn)用戶.
          通過調(diào)用properties.getUserId() 方法,消費(fèi)者可以訪問發(fā)送者user ID。如果你想在步驟2中設(shè)置非當(dāng)前用戶的userId,channel.basicPublish()會拋出異常.
          TIP
          如果不使用user-ID屬性,用戶將是非驗(yàn)證的,properties.getUserId()方法會返回null.
          也可參考
          要更好的理解這個例子,你應(yīng)該知道用戶和虛擬機(jī)管理,這部分內(nèi)容將在下個章節(jié)中講解。在下個章節(jié)中,我們將了解如何通過在應(yīng)用程序中使用SSL來提高程序的安全性。只使用user-ID屬性,我們可保證用戶已認(rèn)證,但所有信息都是未加密的,因此很容易暴露。
          隊(duì)列失敗時(shí)通知消費(fèi)者

          根據(jù)AMQP標(biāo)準(zhǔn),消費(fèi)者不會得到隊(duì)列刪除的通知。一個正在刪除隊(duì)列上等待消息的消費(fèi)者不會收到任何錯誤信息,并會無限期地等待。然而,RabbitMQ client提供了一種擴(kuò)展來讓消息收到一個cancel參數(shù)-即消費(fèi)者cancel通知。我們馬上就會看到這個例子,你可在Chapter02/Recipe07/Java/src/rmqexample 中找到代碼.

          準(zhǔn)備
          為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣。

          如何做
          為了能讓擴(kuò)展工作,你只需要執(zhí)行下面的步驟:
          1.在自定義的消費(fèi)者中覆蓋handleCancel()方法,可繼承于com.rabbitmq.client.DefaultConsumer (指的是ActualConsumer.java ):
          public void handleCancel(String consumerTag) throws IOException {
          ...
          }
          如何工作
          在我們的例子中,我們選擇實(shí)現(xiàn)一個消費(fèi)者,這個消費(fèi)者只在生產(chǎn)者是持久化的,且隊(duì)列是由生產(chǎn)者創(chuàng)建的情況下才能工作。
          因此,如果隊(duì)列是非持久化的,Consumer.java文件會立即錯誤退出. 此行為可以通過調(diào)用channel.queueDeclarePassive()來完成 .
          Producer.java類在其啟動時(shí)會創(chuàng)建隊(duì)列,并在其關(guān)閉時(shí)調(diào)用channel.queueDelete()方法刪除隊(duì)列,如果當(dāng)隊(duì)列關(guān)閉時(shí),而消費(fèi)者正在消費(fèi)隊(duì)列,那么RabbitMQ client會調(diào)用步驟1中覆蓋的handleCancel()方法來立即通知消費(fèi)者。
          相對于顯示調(diào)用channel.basicCancel() 消費(fèi)者使用handleCancel()方法可以任意理由來退出。只有在這種情況下,RabbitMQ client library會調(diào)用Consumer接口的方法:  handleCancelOK() 
          更多
          消費(fèi)者cancel通知是client library的擴(kuò)展,而不是AMQP client libraries的常規(guī)方法.一個實(shí)例它們的library必須將其聲明為可選屬性(參考 http://www.rabbitmq.com/consumer-cancel. html#capabilities ).
          RabbitMQ client library 支持并聲明了這種特性。
          也可參考
          在集群中,如果一個節(jié)點(diǎn)失效了,也會發(fā)生同樣的事情:client在隊(duì)列刪除后仍然得不到通知,除非它定義了覆蓋了自己的handleCancel()方法。關(guān)于這點(diǎn)的更多信息,可參考Chapter 6,開發(fā)可伸縮性應(yīng)用程序。
          理解交換器到交換器擴(kuò)展
          默認(rèn)情況下,AMQP支持交換器到隊(duì)列,但不支持交換器到交換器綁定。在本例中,我們將展示如何使用RabbitMQ 交換機(jī)到交換機(jī)擴(kuò)展.
          在本例中,我們將合并來自兩個不同交換器的消息到第三個交換器中.你可以在Chapter02/Recipe08/Java/src/rmqexample找到源碼.
          準(zhǔn)備
          為了使用本食譜,我們需要設(shè)置Java開發(fā)環(huán)境,如第1章節(jié)(使用AMQP)介紹章節(jié)中說明的一樣,并像廣播消息食譜中來運(yùn)行生產(chǎn)者以及使用topic交換器來處理消息路由。
          如何做
          完成下面的步驟來使用RabbitMQ 交換器到交換器擴(kuò)展:
          1. 使用下面的代碼來聲明我們需要追蹤消息的交換器:
          channel.exchangeDeclare(exchange, "topic", false);
          2. 使用exchangeBind()來綁定其它例子中的交換器 :
          channel.exchangeBind(exchange,ref_exchange_c1_8,"#");
          channel.exchangeBind(exchange,ref_exchange_c1_6,"#");
          3. 啟動追蹤消費(fèi)者:
          TraceConsumer consumer = new TraceConsumer(channel);
          String consumerTag = channel.basicConsume(myqueue, false,consumer);
          如何工作
          在步驟1中,我們創(chuàng)建了一個新的交換器,在步驟2中我們綁定到了下面的交換器:
          1. ref_exchange_c1_6 (廣播消息) 與exchange綁定.
          2. ref_exchange_c1_8 (使用topic來處理消息路由)與exchange綁定 .
          在步驟3中, 消費(fèi)者可以綁定一個隊(duì)列到exchange上以任意地獲取所有消息.
          交換器到交換器擴(kuò)展的工作方式與交換器到隊(duì)列綁定過程類似,你也可以指定一個路由鍵來過濾消息.在步驟2中,我們可以使用#(匹配所有消息)來作為路由鍵。通過改變路由鍵你可以使用制作一個filter!
          在消息中內(nèi)嵌消息目的地
          在本例子中,我們會展示如何發(fā)送單個發(fā)布帶路由鍵的的消息.標(biāo)準(zhǔn)AMQP不提供此特性,但幸運(yùn)的是,RabbitMQ使用消息屬性header提供了此特性. 這種擴(kuò)展稱為sender-selected分發(fā).
          此擴(kuò)展的行為類似于電子郵件邏輯.它使用Carbon Copy (CC)和Blind Carbon Copy (BCC).這也是為什么能在 Chapter02/Recipe09/Java/src/rmqexample中找到CC和BCC consumers的理由:
          1. Producer.java
          2. Consumer.java
          3. StatsConsumer.java
          4. CCStatsConsumer.java
          5. BCCStatsConsumer.java
          準(zhǔn)備
          To use this recipe, we need to set up the Java development environment as indicated in the Introduction section of Chapter 1, Working with AMQP.
          如何做
          完成下面的步驟來使用單個發(fā)布帶路由鍵的的消息:
          1. 使用下面的代碼來創(chuàng)建或聲明交換器:
          channel.exchangeDeclare(exchange, "direct", false);
          2. 在消息的header屬性中指定CC , BCC路由鍵:
          List<String> ccList = new ArrayList<String>();
          ccList.add(backup_alert_routing_key);
          headerMap.put("CC", ccList);
          List<String> ccList = new ArrayList<String>();
          bccList.add(send_alert_routing_key);
          headerMap.put("BCC", bccList);
          BasicProperties messageProperties = new BasicProperties.Builder().headers(headerMap).build();
          channel.basicPublish(exchange, alert_routing_key,messageProperties, statMsg.getBytes());
          3. 使用下面的三個路由鍵來綁定三個隊(duì)列three queues to the exchange using the following three routing keys:
          channel.queueBind(myqueue,exchange, alert_routing_key);
          channel.queueBind(myqueueCC_BK,exchange,backup_alert_routing_key);
          channel.queueBind(myqueueBCC_SA,exchange,send_alert_routing_key);
          4. 使用三個消費(fèi)者來消費(fèi)消息
          如何工作
          當(dāng)生產(chǎn)者使用CC和BCC消息屬性來發(fā)送消息時(shí),broker會在所有路由鍵的隊(duì)列上拷貝消息 。在本例中,stat類會直接使用路由鍵alert_routing_key來向交換器發(fā)送消息,同時(shí)它也會將消息拷貝到使用CC和BCC參數(shù)信息來將消息拷貝到myqueueCC_BK,myqueueBCC_SA隊(duì)列中。
          當(dāng)像e-mails一樣發(fā)生時(shí),在分發(fā)消息到隊(duì)列前,BCC信息會被broker從消息頭中刪除,你可查看所有我們示例消費(fèi)者的輸出來觀察這種行為。
          更多
          正常情況下,AMQP不會改變消息頭,但BCC擴(kuò)展是例外。這種擴(kuò)展可減少發(fā)往broker的消息數(shù)目。沒有此擴(kuò)展,生產(chǎn)者只能使用不同的路由鍵來發(fā)送多個消息的拷貝。
          posted on 2016-06-05 19:51 胡小軍 閱讀(1341) 評論(0)  編輯  收藏 所屬分類: RabbitMQ
          主站蜘蛛池模板: 桦南县| 东山县| 曲阳县| 南召县| 平武县| 堆龙德庆县| 阳江市| 宜川县| 定州市| 喀喇沁旗| 星座| 兴义市| 定陶县| 许昌市| 克拉玛依市| 贵州省| 上饶市| 维西| 枝江市| 龙陵县| 积石山| 张家港市| 青州市| 佛山市| 若羌县| 乌审旗| 象山县| 翼城县| 成武县| 西藏| 丹巴县| 大埔县| 信阳市| 河北区| 延寿县| 商南县| 阳朔县| 台南县| 眉山市| 抚松县| 海南省|