隨筆 - 41  文章 - 7  trackbacks - 0
          <2016年6月>
          2930311234
          567891011
          12131415161718
          19202122232425
          262728293012
          3456789

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          概述

          RabbitMQ Java client 將com.rabbitmq.client作為其頂層包. 關鍵類和接口有:

          • Channel
          • Connection
          • ConnectionFactory
          • Consumer
          協議操作可通過Channel接口來進行.Connection用于開啟channels,注冊connection生命周期事件處理, 并在不需要時關閉connections.
          Connections是通過ConnectionFactory來初始化的,在ConnectionFactory中,你可以配置不同的connection設置,如:虛擬主機和用戶名等等.

          Connections 和 Channels

          核心API類是Connection和Channel, 它們代表對應AMQP 0-9-1 connection 和 channel. 在使用前,可像下面這樣來導入:

          import com.rabbitmq.client.Connection; 
          import com.rabbitmq.client.Channel;

          連接到broker

          下面的代碼會使用給定的參數連接到AMQP broker:

          ConnectionFactory factory = new ConnectionFactory(); 
          factory.setUsername(userName);
          factory.setPassword(password);
          factory.setVirtualHost(virtualHost);
          factory.setHost(hostName);
          factory.setPort(portNumber);
          Connection conn = factory.newConnection();

          也可以使用URIs 來設置連接參數:

          ConnectionFactory factory = new ConnectionFactory(); 
          factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
          Connection conn = factory.newConnection();


          Connection 接口可用來打開一個channel:

          Channel channel = conn.createChannel(); 

          channel現在可用來發送和接收消息,正如后續章節中描述的一樣.

          要斷開連接,只需要簡單地關閉channel和connection:

          channel.close(); conn.close();

          關閉channel被認為是最佳實踐,但在這里不是嚴格必須的 - 當底層連接關閉的時候,channel也會自動關閉.

          使用 Exchanges 和 Queues

          采用交換器和隊列工作的客戶端應用程序,是AMQP高級別構建模塊。在使用前,必須先聲明.聲明每種類型的對象都需要確保名稱存在,如果有必要須進行創建.

          繼續上面的例子,下面的代碼聲明了一個交換器和一個隊列,然后再將它們進行綁定.

          channel.exchangeDeclare(exchangeName, "direct", true); 
          String queueName = channel.queueDeclare().getQueue();
          channel.queueBind(queueName, exchangeName, routingKey);

          這實際上會聲明下面的對象,它們兩者都可以可選參數來定制. 在這里,它們兩個都沒有特定參數。

          1. 一個類型為direct,且持久化,非自動刪除的交換器
          2. 采用隨機生成名稱,且非持久化,私有的,自動刪除隊列

          上面的函數然后使用給定的路由鍵來綁定隊列和交換器.

          注意,當只有一個客戶端時,這是一種典型聲明隊列的方式:它不需要一個已知的名稱,其它的客戶端也不會使用它(exclusive),并會被自動清除(autodelete).
          如果多個客戶端想共享帶有名稱的隊列,下面的代碼應該更適合:

          channel.exchangeDeclare(exchangeName, "direct", true); 
          channel.queueDeclare(queueName, true, false, false, null);
          channel.queueBind(queueName, exchangeName, routingKey);

          這實際上會聲明:

          1. 一個類型為direct,且持久化,非自動刪除的交換器
          2. 一個已知名稱,且持久化的,非私有,非自動刪除隊列

          注意,Channel API 的方法都是重載的。這些 exchangeDeclarequeueDeclare 和queueBind 都使用的是預設行為.
          這里也有更多參數的長形式,它們允許你按需覆蓋默認行為,允許你完全控制。


          發由消息

          要向交換器中發布消息,可按下面這樣來使用Channel.basicPublish方法:

          byte[] messageBodyBytes = "Hello, world!".getBytes(); 
          channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

          為了更好的控制,你可以使用重載方法來指定mandatory標志,或使用預先設置的消息屬性來發送消息:

          channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

          這會使用分發模式2(持久化)來發送消息, 優先級為1,且content-type 為"text/plain".你可以使用Builder類來構建你自己的消息屬性對象:

          channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);

          下面的例子使用自定義的headers來發布消息:

          Map<String, Object> headers = new HashMap<String, Object>(); 
          headers.put("latitude", 51.5252949);
          headers.put("longitude", -0.0905493);
          channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

          下面的例子使用expiration來發布消息:

          channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

          BasicProperties is an inner class of the autogenerated holder class AMQP.

          Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.

          Channels 和并發考慮(線程安全性)

          Channel 實例不能在多個線程間共享。應用程序必須在每個線程中使用不同的channel實例,而不能將同個channel實例在多個線程間共享。 有些channl上的操作是線程安全的,有些則不是,這會導致傳輸時出現錯誤的幀交叉。
          在多個線程共享channels也會干擾Publisher Confirms.

          通過訂閱來來接收消息

          import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;

          接收消息最高效的方式是用Consumer接口來訂閱。當消息到達時,它們會自動地進行分發,而不需要顯示地請求

          當在調用Consumers的相關方法時, 個別訂閱總是通過它們的consumer tags來確定的, consumer tags可通過客戶端或服務端來生成,參考 the AMQP specification document.
          同一個channel上的消費者必須有不同的consumer tags.

          實現Consumer的最簡單方式是繼承便利類DefaultConsumer.子類可通過在設置訂閱時,將其傳遞給basicConsume調用:

          boolean autoAck = false; 
          channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {
          @Override
          publicvoid handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{
          String routingKey = envelope.getRoutingKey();
          String contentType = properties.getContentType();
          long deliveryTag = envelope.getDeliveryTag();
          // (process the message components here ...)
          channel.basicAck(deliveryTag, false);
          }
          });

          在這里,由于我們指定了autoAck = false,因此消費者有必要應答分發的消息,最便利的方式是在handleDelivery 方法中處理.

          更復雜的消費者可能需要覆蓋更多的方法,實踐中,handleShutdownSignal會在channels和connections關閉時調用,handleConsumeOk 會在其它消費者之前

          調用
          ,傳遞consumer tag(不明白,要研究)。

           

          消費者可實現handleCancelOk 和 handleCancel方法來接收顯示和隱式取消操作通知。

          你可以使用Channel.basicCancel來顯示地取消某個特定的消費者:

          channel.basicCancel(consumerTag);

          passing the consumer tag.

          消費者回調是在單獨線程上處理的,這意味著消費者可以安全地在Connection或Channel, 如queueDeclare, txCommit, basicCancel或basicPublish上調用阻塞方法。

          每個Channel都有其自己的dispatch線程.對于一個消費者一個channel的大部分情況來說,這意味著消費者不會阻擋其它的消費者。如果在一個channel上多個消費者,則必須意識到長時間運行的消費者可能阻擋此channel上其它消費者回調調度.

          獲取單個消息

          要顯示地獲取一個消息,可使用Channel.basicGet.返回值是一個GetResponse實例, 在它之中,header信息(屬性) 和消息body都可以提取:

          boolean autoAck = false; 
          GetResponse response = channel.basicGet(queueName, autoAck);
          if (response == null) {
          // No message retrieved.
          } else {
          AMQP.BasicProperties props = response.getProps();
          byte[] body = response.getBody();
          long deliveryTag = response.getEnvelope().getDeliveryTag(); ...

          因為autoAck = false,你必須調用Channel.basicAck來應答你已經成功地接收了消息:

          channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message }

          處理未路由消息

          如果發布消息時,設置了"mandatory"標志,但如果消息不能路由的話,broker會將其返回到發送客戶端 (通過 AMQP.Basic.Return 命令).

          要收到這種返回的通知, clients可實現ReturnListener接口,并調用Channel.setReturnListener.如果channel沒有配置return listener,那么返回的消息會默默地丟棄。

          channel.setReturnListener(new ReturnListener() {     
              publicvoid handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {
          ...
              }
          });

           return listener將被調用,例如,如果client使用"mandatory"標志向未綁定隊列的direct類型交換器發送了消息.

          關閉協議

          AMQP client 關閉概述

          AMQP 0-9-1 connection和channel 使用相同的方法來管理網絡故障,內部故障,以及顯示本地關閉.

          AMQP 0-9-1 connection  和 channel 有如下的生命周期狀態:

          • open: 準備要使用的對象
          • closing: 對象已顯示收到收到本地關閉通知, 并向任何支持的底層對象發出關閉請求,并等待其關閉程序完成
          • closed: 對象已收到所有底層對象的完成關閉通知,最終將執行關閉操作

          這些對象總是以closed狀態結束的,不管基于什么原因引發的關閉,比如:應用程序請求,內部client library故障, 遠程網絡請求或網絡故障.

          AMQP connection 和channel 對象會持有下面與關閉相關的方法:

          • addShutdownListener(ShutdownListener 監聽器)和removeShutdownListener(ShutdownListener 監聽器),用來管理監聽器,當對象轉為closed狀態時,將會觸發這些監聽器.注意,在已經關閉的對象上添加一個ShutdownListener將會立即觸發監聽器
          • getCloseReason(), 允許同其交互以了解對象關閉的理由
          • isOpen(), 用于測試對象是否處于open狀態
          • close(int closeCode, String closeMessage), 用于顯示通知對象關閉

          可以像這樣來簡單使用監聽器:

          import com.rabbitmq.client.ShutdownSignalException; 
          import com.rabbitmq.client.ShutdownListener;
          connection.addShutdownListener(new ShutdownListener() {
          public void shutdownCompleted(ShutdownSignalException cause) { ... } }
          );

          關閉環境信息

          可通過顯示調用getCloseReason()方法或通過使用ShutdownListener類中的業務方法的cause參數來ShutdownSignalException中獲取關閉原因的有用信息.

          ShutdownSignalException 類提供方法來分析關閉的原因.通過調用isHardError()方法,我們可以知道是connection錯誤還是channel錯誤.getReason()會返回相關cause的相關信息,這些引起cause的方法形式-要么是AMQP.Channel.Close方法,要么是AMQP.Connection.Close (或者是null,如果是library中引發的異常,如網絡通信故障,在這種情況下,可通過getCause()方法來獲取信息).

          public void shutdownCompleted(ShutdownSignalException cause) {   if (cause.isHardError())   {     
          Connection conn = (Connection)cause.getReference();
          if (!cause.isInitiatedByApplication()) {
          Method reason = cause.getReason(); ... } ... }
          else { Channel ch = (Channel)cause.getReference(); ... } }

          原子使用isOpen()方法

          channel和connection對象的isOpen()方法不建議在在生產代碼中使用,因為此方法的返回值依賴于shutdown cause的存在性. 下面的代碼演示了竟爭條件的可能性:

          public void brokenMethod(Channel channel) {     if (channel.isOpen())     {         // The following code depends on the channel being in open state.         // However there is a possibility of the change in the channel state         // between isOpen() and basicQos(1) call         ...         channel.basicQos(1);     } }

          相反,我們應該忽略這種檢查,并簡單地嘗試這種操作.如果代碼執行期間,connection的channel關閉了,那么將拋出ShutdownSignalException,這就表明對象處于一種無效狀態了.當broker意外關閉連接時,我們也應該捕獲由SocketException引發的IOException,或者當broker清理關閉時,捕獲ShutdownSignalException.

          public void validMethod(Channel channel) {     try {         ...         channel.basicQos(1);     } catch (ShutdownSignalException sse) {         // possibly check if channel was closed         // by the time we started action and reasons for         // closing it         ...     } catch (IOException ioe) {         // check why connection was closed         ...     } }

          高級連接選項

          Consumer線程池

          Consumer 線程默認是通過一個新的ExecutorService線程池來自動分配的(參考下面的Receiving).如果需要在newConnection() 方法中更好地控制ExecutorService,可以使用定制的線程池.下面的示例展示了一個比正常分配稍大的線程池:

          ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); 
          Executors 和 ExecutorService 都是java.util.concurrent包中的類.

          當連接關閉時,默認的ExecutorService將會被shutdown(), 但用戶自定義的ExecutorService (如上面所示)將不會被shutdown(). 提供自定義ExecutorService的Clients必須確保最終它能被關閉(通過調用它的shutdown() 方法), 否則池中的線程可能會阻止JVM終止.

          同一個executor service,可在多個連接之間共享,或者連續地在重新連接上重用,但在shutdown()后,則不能再使用.

          使用這種特性時,唯一需要考慮的是:在消費者回調的處理過程中,是否有證據證明有嚴重的瓶頸. 如果沒有消費者執行回調,或很少,默認的配置是綽綽有余. 開銷最初是很小的,分配的全部線程資源也是有界限的,即使偶爾可能出現一陣消費活動.

          使用Host列表

          可以傳遞一個Address數組給newConnection()Address只是 com.rabbitmq.client 包中包含host和port組件的簡單便利類. 例如:

          Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)                                  , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr); 
          將會嘗試連接hostname1:portnumber1, 如果不能連接,則會連接hostname2:portnumber2,然后會返回數組中第一個成功連接(不會拋出IOException)上broker的連接.這完全等價于在factory上重復調用factory.newConnection()方法來設置host和port, 直到有一個成功返回.

          如果提供了ExecutorService(在factory.newConnection(es, addrArr)中使用),那么線程池將與第一個成功連接相關聯.

          心跳超時

          參考Heartbeats guide 來了解更多關于心跳及其在Java client中如何配置的更多信息.

          自定義線程工廠

          像Google App Engine (GAE)這樣的環境會限制線程直接實例化. 在這樣的環境中使用RabbitMQ Java client, 需要配置一個定制的ThreadFactory,即使用合適的方法來實例化線程,如: GAE's ThreadManager. 下面是Google App Engine的相關代碼.

          import com.google.appengine.api.ThreadManager;  ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory()); 

          網絡故障時自動恢復

          Connection恢復

          clients和RabbitMQ節點之間的連接可發生故障. RabbitMQ Java client 支持連接和拓撲(queues, exchanges, bindings, 和consumers)的自動恢復. 大多數應用程序的連接自動恢復過程會遵循下面的步驟:

          1. 重新連接
          2. 恢復連接監聽器
          3. 重新打開通道
          4. 恢復通道監聽器
          5. 恢復通道basic.qos 設置,發布者確認和事務設置
          拓撲恢復包含下面的操作,每個通道都會執行下面的步驟:
          1. 重新聲明交換器(except for predefined ones)
          2. 重新聲明隊列
          3. 恢復所有綁定
          4. 恢復所有消費者
          要啟用自動連接恢復,須使用factory.setAutomaticRecoveryEnabled(true):
          ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();
          如果恢復因異常失敗(如. RabbitMQ節點仍然不可達),它會在固定時間間隔后進行重試(默認是5秒). 時間間隔可以進行配置:
          ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);
          當提供了address列表時,將會在所有address上逐個進行嘗試:
          ConnectionFactory factory = new ConnectionFactory();  Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")}; factory.newConnection(addresses);

          恢復監聽器

          可在可恢復連接和通道上注冊一個或多個恢復監聽器. 當啟用了連接恢復時,ConnectionFactory#newConnection 和 Connection#createChannel 的連接已實現了com.rabbitmq.client.Recoverable,并提供了兩個方法:

          • addRecoveryListener
          • removeRecoveryListener
          注意,在使用這些方法時,你需要將connections和channels強制轉換為Recoverable.

          發布影響

          當連接失敗時,使用Channel.basicPublish方法發送的消息將會丟失. client不會保證在連接恢復后,消息會得到分發.要確保發布的消息到達了RabbitMQ,應用程序必須使用Publisher Confirms 


          拓撲恢復

          拓撲恢復涉及交換器,隊列,綁定以及消費者恢復.默認是啟用的,但也可以禁用:

          ConnectionFactory factory = new ConnectionFactory();  Connection conn = factory.newConnection(); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(false);

          手動應答和自動恢復

          當使用手動應答時,在消息分發與應答之間可能存在網絡連接中斷. 在連接恢復后,RabbitMQ會在所有通道上重設delivery標記. 也就是說,使用舊delivery標記的basic.ackbasic.nack, 以及basic.reject將會引發channel exception. 為了避免發生這種情況, RabbitMQ Java client可以跟蹤,更新,以使它們在恢復期間單調地增長. Channel.basicAckChannel.basicNack, 以及Channel.basicReject 然后可以轉換這些 delivery tags,并且不再發送過期的delivery tags. 使用手動應答和自動恢復的應用程序必須負責處理重新分發.

          未處理異常

          關于connection, channel, recovery, 和consumer lifecycle 的異常將會委派給exception handler,Exception handler是實現了ExceptionHandler接口的任何對象. 默認情況下,將會使用DefaultExceptionHandler實例,它會將異常細節輸出到標準輸出上.

          可使用ConnectionFactory#setExceptionHandler來覆蓋原始handler,它將被用于由此factory創建的所有連接:

          ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);         
          Exception handlers 應該用于異常日志.

          Google App Engine上的RabbitMQ Java Client

          在Google App Engine (GAE) 上使用RabbitMQ Java client,必須使用一個自定義的線程工廠來初始化線程,如使用GAE's ThreadManager. 此外,還需要設置一個較小的心跳間隔(4-5 seconds) 來避免InputStream 上讀超時:

          ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);         

          警告和限制

          為了能使拓撲恢復, RabbitMQ Java client 維持了聲明隊列,交換器,綁定的緩存. 緩存是基于每個連接的.某些RabbitMQ的特性使得客戶端不能觀察到拓撲的變化,如,當隊列因TTL被刪除時. RabbitMQ Java client 會嘗試在下面的情況中使用緩存實體失效:

          • 當隊列被刪除時.
          • 當交換器被刪除時.
          • 當綁定被刪除時.
          • 當消費者在自動刪除隊列上退出時.
          • 當隊列或交換器不再綁定到自動刪除的交換器上時.
          然而, 除了單個連接外,client不能跟蹤這些拓撲變化. 依賴于自動刪除隊列或交換器的應用程序,正如TTL隊列一樣 (注意:不是消息TTL!), 如果使用了自動連接恢復,在知道不再使用或要刪除時,必須明確地刪除這些緩存實體,以凈化 client-side 拓撲cache. 這些可通過Channel#queueDeleteChannel#exchangeDelete,Channel#queueUnbind, and Channel#exchangeUnbind來進行.

          RPC (Request/Reply) 模式

          為了便于編程, Java client API提供了一個使用臨時回復隊列的RpcClient類來提供簡單的RPC-style communication.

          此類不會在RPC參數和返回值上強加任何特定格式. 它只是簡單地提供一種機制來向帶特定路由鍵的交換器發送消息,并在回復隊列上等待響應.

          import com.rabbitmq.client.RpcClient;  
          RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

          (其實現細節為:請求消息使用basic.correlation_id唯一值字段來發送消息,并使用basic.reply_to來設置響應隊列的名稱.)

          一旦你創建此類的實例,你可以使用下面的任意一個方法來發送RPC請求:

          byte[] primitiveCall(byte[] message); 
          String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)

          primitiveCall 方法會將原始byte數組轉換為請求和響應的消息體. stringCall只是一個primitiveCall的簡單包裝,將消息體視為帶有默認字符集編碼的String實例.

          mapCall 變種稍為有些復雜: 它會將原始java值包含在java.util.Map中,并將其編碼為AMQP 0-9-1二進制表示形式,并以同樣的方式來解碼response. (注意:在這里,對一些值對象類型有所限制,具體可參考javadoc.)

          所有的編組/解組便利方法都使用primitiveCall來作為傳輸機制,其它方法只是在它上面的做了一個封裝.

          posted on 2016-06-04 00:37 胡小軍 閱讀(15639) 評論(1)  編輯  收藏 所屬分類: RabbitMQ

          FeedBack:
          # re: Java Client API Guide 2016-06-05 17:10 有機綠茶
          非常詳細的介紹!學習啦!  回復  更多評論
            
          主站蜘蛛池模板: 武隆县| 盐城市| 苗栗县| 张家界市| 河北区| 曲水县| 武安市| 桂东县| 崇信县| 两当县| 太仓市| 宁乡县| 达日县| 江都市| 剑河县| 昆明市| 西贡区| 满洲里市| 长治县| 漳浦县| 雷州市| 云梦县| 漠河县| 阿坝| 光山县| 安岳县| 西华县| 塘沽区| 株洲县| 图片| 茂名市| 土默特右旗| 淮滨县| 托里县| 新田县| 镇巴县| 阿尔山市| 京山县| 云安县| 铜川市| 山西省|