Big Data Road

          Twitter Storm 分布式RPC

          本文轉載自

          http://chenlx.blog.51cto.com/4096635/748348
          本來準備自己翻譯一下,后來Google一下,發現網上已有,遂轉載。

          分布式RPC

          分布式RPC(DRPC)的真正目的是使用storm實時并行計算極端功能。Storm拓撲需要一個輸入流作為函數參數,以一個輸出流的形式發射每個函數調用的結果。
           
          DRPC沒有多少storm特性,因為它是從storm的原始流,spouts,bolts,拓撲來表達一個模式。DRPC沒有單獨打包,但它如此有用,以至于和storm捆綁在一起。
           
          概述
          分布式RPC通過“DRPC server”協調。DRPC服務器協調接收一個RPC請求,發送請求到storm拓撲,從storm拓撲接收結果,發送結果回等待的客戶端。從一個客戶端的角度來看,一個分布式RPC調用就像是一個常規的RPC調用。例如,一個客戶端如何為帶“http://twitter.com”參數的“reach”功能計算結果。
          1. DRPCClient client = new DRPCClient("drpc-host", 3772); 
          2. String result = client.execute("reach", "http://twitter.com"); 
          分布式RPC工作流程如下:

          客戶端發送功能名稱及功能所需參數到DRPC服務器去執行。圖中的拓撲實現了此功能,它使用DRPCSpout從DRPC服務器接收功能調用流。每個功能調用通過DRPC服務器使用唯一ID標記,隨后拓撲計算結果,在拓撲的最后,一個稱之為“ReturnResults”的bolt連接到DRPC服務器,把結果交給這個功能調用(根據功能調用ID),DRPC服務器根據ID找到等待中的客戶端,為等待中的客戶端消除阻塞,并發送結果給客戶端。

          LinearDRPCTopologyBuilder
          Storm有一個稱之為LinearDRPCTopologyBuilder的拓撲Builder幾乎自動完成DRPC所需的所有相關步驟。包括:
          1.設置spout
          2.返回結果給DRPC服務器
          3.為bolt提供對一組元組的有限聚合功能
          讓我們看一個簡單的例子。這是一個DRPC拓撲的實現,在輸入參數的尾部追加“!”并返回:
          1. public static class ExclaimBolt implements IBasicBolt { 
          2.     public void prepare(Map conf, TopologyContext context) { 
          3.     } 
          4.  
          5.     public void execute(Tuple tuple, BasicOutputCollector collector) { 
          6.         String input = tuple.getString(1); 
          7.         collector.emit(new Values(tuple.getValue(0), input + "!")); 
          8.     } 
          9.  
          10.     public void cleanup() { 
          11.     } 
          12.  
          13.     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
          14.         declarer.declare(new Fields("id", "result")); 
          15.     } 
          16.  
          17.  
          18. public static void main(String[] args) throws Exception { 
          19.     LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); 
          20.     builder.addBolt(new ExclaimBolt(), 3); 
          21.     // ... 

          如你所見,代碼非常少。當創建LinearDRPCTopologyBuilder時,你把這個拓撲的DRPC功能名稱告訴storm。一個DRPC服務器可以協調許多功能,功能名稱用于區別不同的功能,首先聲明的bolt將接收一個輸入的2-tuples,第一個字段是請求ID,第二個字段是請求參數。LinearDRPCTopologyBuilder認為最后的bolt會發射一個輸出流,該輸出流包含[id, result]格式的2-tuples。最后,所有拓撲中間過程產生的元組(tuple)都包含請求id作為其第一個字段。

          在這個例子中,ExclaimBolt只是簡單地在元組的第二個字段尾部追加“!”字符。LinearDRPCTopologyBuilder處理其余的協調工作,包括連接DRPC服務器,發送最終結果。
           
          本地模式DRPC
          DRPC可以運行在本地模式。這是如何在本地模式運行上述例子:
          1. LocalDRPC drpc = new LocalDRPC(); 
          2. LocalCluster cluster = new LocalCluster(); 
          3.  
          4. cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); 
          5.  
          6. System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello")); 
          7.  
          8. cluster.shutdown(); 
          9. drpc.shutdown(); 
          首先你創建一個LocalDRPC對象。這個對象在進程內模擬一個DRPC服務器,就像在進程內模擬一個storm集群一樣。然后你創建本地集群,在本地模式運行這個拓撲。創建本地拓撲和遠程拓撲,LinearDRPCTopologyBuilder有不同的方法。在本地模式,LocalDRPC未綁定任何端口,拓撲也需要知道與哪個對象通訊,這是為什么createLocaclTopology方法需要接受LocalDRPC對象作為輸入參數的原因。
           
          載入拓撲后,你可以用LocalDRPC的execute方法執行DRPC調用。
           
          遠程模式DRPC
          在實際的集群使用DRPC也很簡單。有三個步驟:
          1.      啟動DRPC服務器
          2.      配置DRPC服務器位置
          3.      提交DRPC拓撲到storm集群
          使用storm腳本啟動DRPC服務器,和啟動nimbus和ui一樣:
          1. bin/storm drpc 
          接下來,配置你的storm集群,讓集群知道DRPC服務器的位置,這樣DRPCSpout就知道從哪里讀取功能調用??梢酝ㄟ^修改storm.yaml配置文件或拓撲配置完成配置DRPC服務器位置。修改storm.yaml配置文件如下所示:
          1. drpc.servers: 
          2.   - "drpc1.foo.com" 
          3.   - "drpc2.foo.com" 
          最后,使用StormSubmitter啟動DRPC拓撲,就像啟動其它拓撲一樣。在遠程模式運行上述例子,代碼如下所示:
          1. StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology()); 
          createRemoteTopology方法用于在storm集群創建拓撲。
           
          一個更完整的例子
          這個exclaimation DRPC例子只是一個用來說明DRPC概念的玩具。讓我們看一個更完整的例子,該例子是一個真正需要storm集群的并行計算的DRPC功能。我們將要看的例子是對twitter網站上的一個URL的接觸用戶進行統計。
          一個URL的接觸用戶數是在twitter網站上接觸一個URL的用戶數,你需要以下4步:
          1. 獲取tweeted the URL的全部用戶
          2. 獲取這些用戶的全部追隨者
          3. 使追隨者集合中的用戶唯一
          4. 統計唯一的用戶數
          一個單獨的reach計算在計算期間涉及到數千數據庫訪問和數千萬追隨者記錄。它是一個真正的耗時計算。正如你將要看到的,在storm上實現這個功能非常簡單。在一臺機器上,reach計算花費數分鐘,在storm集群,最難計算reach的URL也只需數秒。
          Storm-starter項目
          這個拓撲以4個步驟的形式執行:

          1. GetTweeters獲取tweeted the URL的用戶。它轉換一個[id, url]形式的輸入流到[id, tweeter]形式的輸出流。每個url元組將映射到多個tweeter元組。

          2. GetFollowers獲取這些tweeter的追隨者。它轉換一個[id, tweeter]形式的輸入流到[id, follower]形式的輸出流??缢腥蝿?,當某人追隨多個tweeter,這些tweeter又tweeted相同的URL時,這可能會得到重復的追隨者。

          3. PartialUniquer按追隨者ID對追隨者數據流進行分組。同一的追隨者去到同一的任務,因此每個PartialUniquer任務都接收到獨立的相互獨立的追隨者集合。PartialUniquer一旦收到請求ID用于它的所有追隨者元組,它就發射追隨者子集的唯一總數。
          4. 最后,CountAggregator從每個PartialUniquer任務接收計數并對它們求和。
          讓我們來看看PartialUniquer:
          1. public static class PartialUniquer implements IRichBolt, FinishedCallback { 
          2.     OutputCollector _collector; 
          3.     Map<Object, Set<String>> _sets = new HashMap<Object, Set<String>>(); 
          4.      
          5.     public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 
          6.         _collector = collector; 
          7.     } 
          8.  
          9.     public void execute(Tuple tuple) { 
          10.         Object id = tuple.getValue(0); 
          11.         Set<String> curr = _sets.get(id); 
          12.         if(curr==null) { 
          13.             curr = new HashSet<String>(); 
          14.             _sets.put(id, curr); 
          15.         } 
          16.         curr.add(tuple.getString(1)); 
          17.         _collector.ack(tuple); 
          18.     } 
          19.  
          20.     public void cleanup() { 
          21.     } 
          22.  
          23.     public void finishedId(Object id) { 
          24.         Set<String> curr = _sets.remove(id); 
          25.         int count; 
          26.         if(curr!=null) { 
          27.             count = curr.size(); 
          28.         } else { 
          29.             count = 0; 
          30.         } 
          31.         _collector.emit(new Values(id, count)); 
          32.     } 
          33.  
          34.     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
          35.         declarer.declare(new Fields("id", "partial-count")); 
          36.     } 
          當PartialUniquer在exectue方法中接收一個follower元組時,它用一個內部HashMap添加它到與請求ID對應的集合。
          PartialUniquer也實現了FinishedCallback接口,它告訴LinearDRPCTopologyBuilder,對于任意給定的請求ID,當它已收到所有指向它的元組時,請通知它。這個回調是finishedId方法。在這個回調中,PartialUniquer發射單一的元組,元組包含它的追隨者子集的唯一總數。 

          在底層,CoordinatedBolt用于檢測一個bolt何時收到該請求ID的所有元組。CoordinatedBolt使用direct stream管理協調。

          其它的拓撲應該是不言自明。如你所見,reach計算的每一單步都是并行執行的,而且定義一個DRPC拓撲也非常簡單。
          Non-Linear DRPC拓撲
          LinearDRPCTopologyBuilder僅處理“線性的”DRPC拓撲,計算以一連串步驟的形式表達(像reach)。不難想象某些功能將需要更復雜的拓撲結構,這些拓撲帶有帶分支和合并bolt。目前,要做到這一點,你需要直接使用CoordinateBolt。務必在郵件列表中談談你的非線性DRPC拓撲用例,寫下DRPC拓撲更普遍的抽象結構。
          LinearDRPCTopologyBuilder如何工作?

          DRPCSpout發射[args, return-info],return-info是DRPC服務器的主機和端口,還有DRPC服務器生成的ID。

          拓撲組成部分:
          • DRPCSpout
          • PrepareRequest(生成一個請求ID,創建一個返回信息流,一個參數流)
          • CoordinatedBolt包裝器和直接分組
          • JoinResult(同返回信息一起連接結果)
          • ReturnResult(連接DRPC服務器并返回結果)
          • LinearDRPCTopologyBuilder是一個構建在Storm原語之上的高層次抽象的好例子。
          高級
          • 同時編排處理多個請的KeyedFairBolt
          • 如何直接使用CoordinateBolt

          posted on 2012-01-19 14:17 徐紅星 閱讀(1680) 評論(0)  編輯  收藏 所屬分類: Storm


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           

          My Links

          Blog Stats

          • 隨筆 - 0
          • 文章 - 4
          • 評論 - 0
          • Trackbacks - 0

          留言簿

          隨筆分類

          文章分類

          文章檔案

          搜索

          •  

          最新評論

          主站蜘蛛池模板: 穆棱市| 平和县| 象州县| 彰化市| 清涧县| 乐清市| 新化县| 清河县| 蕲春县| 五家渠市| 佛山市| 碌曲县| 金秀| 乡宁县| 尤溪县| 孝感市| 南岸区| 建始县| 宜昌市| 广灵县| 秦皇岛市| 遂昌县| 历史| 马关县| 福州市| 道真| 精河县| 安化县| 海兴县| 雷波县| 五寨县| 县级市| 绩溪县| 柞水县| 滁州市| 浏阳市| 翁源县| 大邑县| 宁陕县| 泾源县| 安福县|