Twitter Storm 分布式RPC
本文轉載自
分布式RPC
分布式RPC(DRPC)的真正目的是使用storm實時并行計算極端功能。Storm拓撲需要一個輸入流作為函數參數,以一個輸出流的形式發射每個函數調用的結果。
DRPC沒有多少storm特性,因為它是從storm的原始流,spouts,bolts,拓撲來表達一個模式。DRPC沒有單獨打包,但它如此有用,以至于和storm捆綁在一起。
概述
分布式RPC通過“DRPC server”協調。DRPC服務器協調接收一個RPC請求,發送請求到storm拓撲,從storm拓撲接收結果,發送結果回等待的客戶端。從一個客戶端的角度來看,一個分布式RPC調用就像是一個常規的RPC調用。例如,一個客戶端如何為帶“http://twitter.com”參數的“reach”功能計算結果。
- DRPCClient client = new DRPCClient("drpc-host", 3772);
- String result = client.execute("reach", "http://twitter.com");
客戶端發送功能名稱及功能所需參數到DRPC服務器去執行。圖中的拓撲實現了此功能,它使用DRPCSpout從DRPC服務器接收功能調用流。每個功能調用通過DRPC服務器使用唯一ID標記,隨后拓撲計算結果,在拓撲的最后,一個稱之為“ReturnResults”的bolt連接到DRPC服務器,把結果交給這個功能調用(根據功能調用ID),DRPC服務器根據ID找到等待中的客戶端,為等待中的客戶端消除阻塞,并發送結果給客戶端。
LinearDRPCTopologyBuilder
Storm有一個稱之為LinearDRPCTopologyBuilder的拓撲Builder幾乎自動完成DRPC所需的所有相關步驟。包括:
1.設置spout
2.返回結果給DRPC服務器
3.為bolt提供對一組元組的有限聚合功能
讓我們看一個簡單的例子。這是一個DRPC拓撲的實現,在輸入參數的尾部追加“!”并返回:
- public static class ExclaimBolt implements IBasicBolt {
- public void prepare(Map conf, TopologyContext context) {
- }
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String input = tuple.getString(1);
- collector.emit(new Values(tuple.getValue(0), input + "!"));
- }
- public void cleanup() {
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "result"));
- }
- }
- public static void main(String[] args) throws Exception {
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
- builder.addBolt(new ExclaimBolt(), 3);
- // ...
- }
如你所見,代碼非常少。當創建LinearDRPCTopologyBuilder時,你把這個拓撲的DRPC功能名稱告訴storm。一個DRPC服務器可以協調許多功能,功能名稱用于區別不同的功能,首先聲明的bolt將接收一個輸入的2-tuples,第一個字段是請求ID,第二個字段是請求參數。LinearDRPCTopologyBuilder認為最后的bolt會發射一個輸出流,該輸出流包含[id, result]格式的2-tuples。最后,所有拓撲中間過程產生的元組(tuple)都包含請求id作為其第一個字段。
在這個例子中,ExclaimBolt只是簡單地在元組的第二個字段尾部追加“!”字符。LinearDRPCTopologyBuilder處理其余的協調工作,包括連接DRPC服務器,發送最終結果。
本地模式DRPC
DRPC可以運行在本地模式。這是如何在本地模式運行上述例子:
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
- System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
- cluster.shutdown();
- drpc.shutdown();
首先你創建一個LocalDRPC對象。這個對象在進程內模擬一個DRPC服務器,就像在進程內模擬一個storm集群一樣。然后你創建本地集群,在本地模式運行這個拓撲。創建本地拓撲和遠程拓撲,LinearDRPCTopologyBuilder有不同的方法。在本地模式,LocalDRPC未綁定任何端口,拓撲也需要知道與哪個對象通訊,這是為什么createLocaclTopology方法需要接受LocalDRPC對象作為輸入參數的原因。
載入拓撲后,你可以用LocalDRPC的execute方法執行DRPC調用。
遠程模式DRPC
在實際的集群使用DRPC也很簡單。有三個步驟:
1. 啟動DRPC服務器
2. 配置DRPC服務器位置
3. 提交DRPC拓撲到storm集群
使用storm腳本啟動DRPC服務器,和啟動nimbus和ui一樣:
- bin/storm drpc
接下來,配置你的storm集群,讓集群知道DRPC服務器的位置,這樣DRPCSpout就知道從哪里讀取功能調用??梢酝ㄟ^修改storm.yaml配置文件或拓撲配置完成配置DRPC服務器位置。修改storm.yaml配置文件如下所示:
- drpc.servers:
- - "drpc1.foo.com"
- - "drpc2.foo.com"
最后,使用StormSubmitter啟動DRPC拓撲,就像啟動其它拓撲一樣。在遠程模式運行上述例子,代碼如下所示:
- StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology方法用于在storm集群創建拓撲。
一個更完整的例子
這個exclaimation DRPC例子只是一個用來說明DRPC概念的玩具。讓我們看一個更完整的例子,該例子是一個真正需要storm集群的并行計算的DRPC功能。我們將要看的例子是對twitter網站上的一個URL的接觸用戶進行統計。
一個URL的接觸用戶數是在twitter網站上接觸一個URL的用戶數,你需要以下4步:
1. 獲取tweeted the URL的全部用戶
2. 獲取這些用戶的全部追隨者
3. 使追隨者集合中的用戶唯一
4. 統計唯一的用戶數
一個單獨的reach計算在計算期間涉及到數千數據庫訪問和數千萬追隨者記錄。它是一個真正的耗時計算。正如你將要看到的,在storm上實現這個功能非常簡單。在一臺機器上,reach計算花費數分鐘,在storm集群,最難計算reach的URL也只需數秒。
posted on 2012-01-19 14:17 徐紅星 閱讀(1680) 評論(0) 編輯 收藏 所屬分類: Storm