莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          Storm源碼淺析之topology的提交

          Posted on 2011-12-01 21:48 dennis 閱讀(15228) 評論(10)  編輯  收藏 所屬分類: java源碼解讀Hadoop與分布式 、Clojure
              原文:http://www.aygfsteel.com/killme2008/archive/2011/11/17/364112.html
              作者:dennis (killme2008@gmail.com)
              轉載請注明出處。

              最近一直在讀twitter開源的這個分布式流計算框架——storm的源碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進行組織,并且只分析我關注的東西,因此稱之為淺析。       

          一、介紹
              Storm的開發語言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫核心邏輯。源碼統計結果:
               180 text files.
               
          177 unique files.                                          
                 
          7 files ignored.

          http:
          //cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
          -------------------------------------------------------------------------------
          Language                     files          blank        comment           code
          -------------------------------------------------------------------------------
          Java                           
          125           5010           2414          25661
          Lisp                            
          33            732            283           4871
          Python                           
          7            742            433           4675
          CSS                              
          1             12             45           1837
          ruby                             
          2             22              0            104
          Bourne Shell                     
          1              0              0              6
          Javascript                       
          2              1             15              6
          -------------------------------------------------------------------------------
          SUM:                           
          171           6519           3190          37160
          -------------------------------------------------------------------------------

              Java代碼25000多行,而Clojure(Lisp)只有4871行,說語言不重要再次證明是扯淡。
                  
          二、Topology和Nimbus       
              Topology是storm的核心理念,將spout和bolt組織成一個topology,運行在storm集群里,完成實時分析和計算的任務。這里我主要想介紹下topology部署到storm集群的大概過程。提交一個topology任務到Storm集群是通過StormSubmitter.submitTopology方法提交:
          StormSubmitter.submitTopology(name, conf, builder.createTopology());
              我們將topology打成jar包后,利用bin/storm這個python腳本,執行如下命令:
          bin/storm jar xxxx.jar com.taobao.MyTopology args
              將jar包提交給storm集群。storm腳本會啟動JVM執行Topology的main方法,執行submitTopology的過程。而submitTopology會將jar文件上傳到nimbus,上傳是通過socket傳輸。在storm這個python腳本的jar方法里可以看到:
          def jar(jarfile, klass, *args):                                                                                                                               
             exec_storm_class(                                                                                                                                          
                  klass,                                                                                                                                                
                  jvmtype
          ="-client",                                                                                                                                    
                  extrajars
          =[jarfile, CONF_DIR, STORM_DIR + "/bin"],                                                                                                    
                  args
          =args,                                                                                                                                            
                  prefix
          ="export STORM_JAR=" + jarfile + ";")
               將jar文件的地址設置為環境變量STORM_JAR,這個環境變量在執行submitTopology的時候用到:
          //StormSubmitter.java 
          private static void submitJar(Map conf) {
                  
          if(submittedJar==null) {
                      LOG.info(
          "Jar not uploaded to master yet. Submitting jar");
                      String localJar 
          = System.getenv("STORM_JAR");
                      submittedJar 
          = submitJar(conf, localJar);
                  } 
          else {
                      LOG.info(
          "Jar already uploaded to master. Not submitting jar.");
                  }
              }
              通過環境變量找到jar包的地址,然后上傳。利用環境變量傳參是個小技巧。

              其次,nimbus在接收到jar文件后,存放到數據目錄的inbox目錄,nimbus數據目錄的結構
          -nimbus
               
          -inbox
                   
          -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
                   
          -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

               
          -stormdist
                  
          -storm-id
                     
          -stormjar.jar
                     
          -stormconf.ser
                     
          -stormcode.ser
               其中inbox用于存放提交的jar文件,每個jar文件都重命名為stormjar加上一個32位的UUID。而stormdist存放的是啟動topology后生成的文件,每個topology都分配一個唯一的id,ID的規則是“name-計數-時間戳”。啟動后的topology的jar文件名命名為storm.jar ,而它的配置經過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。這些文件在部署的時候,supervisor會從這個目錄下載這些文件,然后在supervisor本地執行這些代碼。
              進入重點,topology任務的分配過程(zookeeper路徑說明忽略root):
          1.在zookeeper上創建/taskheartbeats/{storm id} 路徑,用于任務的心跳檢測。storm對zookeeper的一個重要應用就是利用zk的臨時節點做存活檢測。task將定時刷新節點的時間戳,然后nimbus會檢測這個時間戳是否超過timeout設置。
          2.從topology中獲取bolts,spouts設置的并行數目以及全局配置的最大并行數,然后產生task id列表,如[1 2 3 4]
          3.在zookeeper上創建/tasks/{strom id}/{task id}路徑,并存儲task信息
          4.開始分配任務(內部稱為assignment), 具體步驟:
           (1)從zk上獲得已有的assignment(新的toplogy當然沒有了)
           (2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。
           (3)將任務均勻地分配給可用的worker,這里有兩種情況:
           (a)task數目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
          {1: [host1:port1] 2 : [host2:port1]
                   
          3 : [host1:port1] 4 : [host2:port1]}
          ,可以看到任務平均地分配在兩個worker上。
          (b)如果task數目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個worker上,也就是將worker排列成
          [host1:port1 host2:port1 host1:port2 host2:port2]
          ,然后分配任務為
          {1: host1:port1 , 2 : host2:port2}

          (4)記錄啟動時間
          (5)判斷現有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
          5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應的數據里的active設置為true。
          6.nimbus會檢查task的心跳,如果發現task心跳超過超時時間,那么會重新跳到第4步做re-assignment。

          評論

          # re: Storm源碼淺析之topology的提交  回復  更多評論   

          2011-12-05 11:11 by
          Java代碼25000多行,而Clojure(Lisp)只有4871行5563。。。不明白是什么意思?是Storm中既有Java,又有Clojure?還是Storm有Java和Clojure兩個版本?

          # re: Storm源碼淺析之topology的提交  回復  更多評論   

          2011-12-14 23:56 by fiw
          非常感謝您的講解,給了我很大的幫助。有一個問題,Storm最后處理完的消息存到哪里了呢?如何查看處理的結果呢?我自己搭了一個Storm集群,跑了一下Storm_starter的wordCount例子,但是在StormUI上沒有找到結果,希望能得到您的幫助。

          # re: Storm源碼淺析之topology的提交  回復  更多評論   

          2011-12-15 12:02 by dennis
          @fiw
          處理完的消息怎么存儲是你自己負責的事情,storm不幫你處理的。
          wordcount的例子應該就是放在內存里了,掛了就沒了。
          storm ui只是統計,并不參與topology的邏輯展現。

          # re: Storm源碼淺析之topology的提交  回復  更多評論   

          2011-12-15 23:08 by coderplay
          很像hadoop :)

          # re: Storm源碼淺析之topology的提交  回復  更多評論   

          2011-12-16 16:58 by fork
          storm為了保證可靠性處理是否必須要存儲還沒有完全處理的Turple?這樣發送Turple的Spout是否會出現OOM?

          # re: Storm源碼淺析之topology的提交  回復  更多評論   

          2011-12-19 14:58 by dennis
          @fork
          不會的,storm只會存儲發送的tuple id,這只是一個8個字節的long類型,想要OOM還是比較困難的。

          # re: Storm源碼淺析之topology的提交  回復  更多評論   

          2012-01-06 11:34 by xiaofeng_metis
          期待Storm源碼淺析的其它內容

          # re: Storm源碼淺析之topology的提交[未登錄]  回復  更多評論   

          2012-02-20 01:03 by 胡楊
          前輩,您好!看到您讀過那么多的源代碼,真的是發自內心的崇拜,你的精力好旺盛,你對技術真的好執著!

          現在這幾天準備開始讀讀spring的源碼。但是把源代碼導入Eclipse并運行起來以后,在spring啟動的入口打了個斷點,可是總是進入不了這個斷點。在網上查了很多的資料,有的說是要編譯一下源代碼,我試過了,也不行,這個問題困擾了好幾天。不知道您剛開始的時候是怎么閱讀的? 用的什么工具?

          # re: Storm源碼淺析之topology的提交[未登錄]  回復  更多評論   

          2012-04-11 14:47 by dhc
          Storm中既有Java,又有Clojure

          # re: Storm源碼淺析之topology的提交[未登錄]  回復  更多評論   

          2012-04-11 14:50 by dhc
          你好,看了這篇文章很多以前沒有明白的點豁然開朗。但是這篇文章只是分析了storm client、nimbus上的流程,能夠介紹下supervisor啟動后的流程。謝謝!
          主站蜘蛛池模板: 锦屏县| 芦溪县| 合山市| 开封县| 呈贡县| 明水县| 涟源市| 同德县| 余姚市| 大荔县| 平定县| 温州市| 年辖:市辖区| 浦江县| 随州市| 武强县| 土默特左旗| 南靖县| 涡阳县| 金门县| 高清| 安多县| 余江县| 崇明县| 五峰| 马山县| 托克托县| 沙雅县| 九龙坡区| 丽水市| 长垣县| 沽源县| 淅川县| 阆中市| 新巴尔虎左旗| 黄冈市| 县级市| 曲松县| 泸水县| 陇西县| 观塘区|