莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          Storm源碼淺析之topology的提交

          Posted on 2011-12-01 21:48 dennis 閱讀(15227) 評論(10)  編輯  收藏 所屬分類: java源碼解讀Hadoop與分布式Clojure
              原文:http://www.aygfsteel.com/killme2008/archive/2011/11/17/364112.html
              作者:dennis (killme2008@gmail.com)
              轉(zhuǎn)載請注明出處。

              最近一直在讀twitter開源的這個分布式流計算框架——storm的源碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進(jìn)行組織,并且只分析我關(guān)注的東西,因此稱之為淺析。       

          一、介紹
              Storm的開發(fā)語言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫核心邏輯。源碼統(tǒng)計結(jié)果:
               180 text files.
               
          177 unique files.                                          
                 
          7 files ignored.

          http:
          //cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
          -------------------------------------------------------------------------------
          Language                     files          blank        comment           code
          -------------------------------------------------------------------------------
          Java                           
          125           5010           2414          25661
          Lisp                            
          33            732            283           4871
          Python                           
          7            742            433           4675
          CSS                              
          1             12             45           1837
          ruby                             
          2             22              0            104
          Bourne Shell                     
          1              0              0              6
          Javascript                       
          2              1             15              6
          -------------------------------------------------------------------------------
          SUM:                           
          171           6519           3190          37160
          -------------------------------------------------------------------------------

              Java代碼25000多行,而Clojure(Lisp)只有4871行,說語言不重要再次證明是扯淡。
                  
          二、Topology和Nimbus       
              Topology是storm的核心理念,將spout和bolt組織成一個topology,運(yùn)行在storm集群里,完成實(shí)時分析和計算的任務(wù)。這里我主要想介紹下topology部署到storm集群的大概過程。提交一個topology任務(wù)到Storm集群是通過StormSubmitter.submitTopology方法提交:
          StormSubmitter.submitTopology(name, conf, builder.createTopology());
              我們將topology打成jar包后,利用bin/storm這個python腳本,執(zhí)行如下命令:
          bin/storm jar xxxx.jar com.taobao.MyTopology args
              將jar包提交給storm集群。storm腳本會啟動JVM執(zhí)行Topology的main方法,執(zhí)行submitTopology的過程。而submitTopology會將jar文件上傳到nimbus,上傳是通過socket傳輸。在storm這個python腳本的jar方法里可以看到:
          def jar(jarfile, klass, *args):                                                                                                                               
             exec_storm_class(                                                                                                                                          
                  klass,                                                                                                                                                
                  jvmtype
          ="-client",                                                                                                                                    
                  extrajars
          =[jarfile, CONF_DIR, STORM_DIR + "/bin"],                                                                                                    
                  args
          =args,                                                                                                                                            
                  prefix
          ="export STORM_JAR=" + jarfile + ";")
               將jar文件的地址設(shè)置為環(huán)境變量STORM_JAR,這個環(huán)境變量在執(zhí)行submitTopology的時候用到:
          //StormSubmitter.java 
          private static void submitJar(Map conf) {
                  
          if(submittedJar==null) {
                      LOG.info(
          "Jar not uploaded to master yet. Submitting jar");
                      String localJar 
          = System.getenv("STORM_JAR");
                      submittedJar 
          = submitJar(conf, localJar);
                  } 
          else {
                      LOG.info(
          "Jar already uploaded to master. Not submitting jar.");
                  }
              }
              通過環(huán)境變量找到j(luò)ar包的地址,然后上傳。利用環(huán)境變量傳參是個小技巧。

              其次,nimbus在接收到j(luò)ar文件后,存放到數(shù)據(jù)目錄的inbox目錄,nimbus數(shù)據(jù)目錄的結(jié)構(gòu)
          -nimbus
               
          -inbox
                   
          -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
                   
          -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

               
          -stormdist
                  
          -storm-id
                     
          -stormjar.jar
                     
          -stormconf.ser
                     
          -stormcode.ser
               其中inbox用于存放提交的jar文件,每個jar文件都重命名為stormjar加上一個32位的UUID。而stormdist存放的是啟動topology后生成的文件,每個topology都分配一個唯一的id,ID的規(guī)則是“name-計數(shù)-時間戳”。啟動后的topology的jar文件名命名為storm.jar ,而它的配置經(jīng)過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。這些文件在部署的時候,supervisor會從這個目錄下載這些文件,然后在supervisor本地執(zhí)行這些代碼。
              進(jìn)入重點(diǎn),topology任務(wù)的分配過程(zookeeper路徑說明忽略root):
          1.在zookeeper上創(chuàng)建/taskheartbeats/{storm id} 路徑,用于任務(wù)的心跳檢測。storm對zookeeper的一個重要應(yīng)用就是利用zk的臨時節(jié)點(diǎn)做存活檢測。task將定時刷新節(jié)點(diǎn)的時間戳,然后nimbus會檢測這個時間戳是否超過timeout設(shè)置。
          2.從topology中獲取bolts,spouts設(shè)置的并行數(shù)目以及全局配置的最大并行數(shù),然后產(chǎn)生task id列表,如[1 2 3 4]
          3.在zookeeper上創(chuàng)建/tasks/{strom id}/{task id}路徑,并存儲task信息
          4.開始分配任務(wù)(內(nèi)部稱為assignment), 具體步驟:
           (1)從zk上獲得已有的assignment(新的toplogy當(dāng)然沒有了)
           (2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。
           (3)將任務(wù)均勻地分配給可用的worker,這里有兩種情況:
           (a)task數(shù)目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
          {1: [host1:port1] 2 : [host2:port1]
                   
          3 : [host1:port1] 4 : [host2:port1]}
          ,可以看到任務(wù)平均地分配在兩個worker上。
          (b)如果task數(shù)目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個worker上,也就是將worker排列成
          [host1:port1 host2:port1 host1:port2 host2:port2]
          ,然后分配任務(wù)為
          {1: host1:port1 , 2 : host2:port2}

          (4)記錄啟動時間
          (5)判斷現(xiàn)有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
          5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應(yīng)的數(shù)據(jù)里的active設(shè)置為true。
          6.nimbus會檢查task的心跳,如果發(fā)現(xiàn)task心跳超過超時時間,那么會重新跳到第4步做re-assignment。

          評論

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評論   

          2011-12-05 11:11 by
          Java代碼25000多行,而Clojure(Lisp)只有4871行5563。。。不明白是什么意思?是Storm中既有Java,又有Clojure?還是Storm有Java和Clojure兩個版本?

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評論   

          2011-12-14 23:56 by fiw
          非常感謝您的講解,給了我很大的幫助。有一個問題,Storm最后處理完的消息存到哪里了呢?如何查看處理的結(jié)果呢?我自己搭了一個Storm集群,跑了一下Storm_starter的wordCount例子,但是在StormUI上沒有找到結(jié)果,希望能得到您的幫助。

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評論   

          2011-12-15 12:02 by dennis
          @fiw
          處理完的消息怎么存儲是你自己負(fù)責(zé)的事情,storm不幫你處理的。
          wordcount的例子應(yīng)該就是放在內(nèi)存里了,掛了就沒了。
          storm ui只是統(tǒng)計,并不參與topology的邏輯展現(xiàn)。

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評論   

          2011-12-15 23:08 by coderplay
          很像hadoop :)

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評論   

          2011-12-16 16:58 by fork
          storm為了保證可靠性處理是否必須要存儲還沒有完全處理的Turple?這樣發(fā)送Turple的Spout是否會出現(xiàn)OOM?

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評論   

          2011-12-19 14:58 by dennis
          @fork
          不會的,storm只會存儲發(fā)送的tuple id,這只是一個8個字節(jié)的long類型,想要OOM還是比較困難的。

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評論   

          2012-01-06 11:34 by xiaofeng_metis
          期待Storm源碼淺析的其它內(nèi)容

          # re: Storm源碼淺析之topology的提交[未登錄]  回復(fù)  更多評論   

          2012-02-20 01:03 by 胡楊
          前輩,您好!看到您讀過那么多的源代碼,真的是發(fā)自內(nèi)心的崇拜,你的精力好旺盛,你對技術(shù)真的好執(zhí)著!

          現(xiàn)在這幾天準(zhǔn)備開始讀讀spring的源碼。但是把源代碼導(dǎo)入Eclipse并運(yùn)行起來以后,在spring啟動的入口打了個斷點(diǎn),可是總是進(jìn)入不了這個斷點(diǎn)。在網(wǎng)上查了很多的資料,有的說是要編譯一下源代碼,我試過了,也不行,這個問題困擾了好幾天。不知道您剛開始的時候是怎么閱讀的? 用的什么工具?

          # re: Storm源碼淺析之topology的提交[未登錄]  回復(fù)  更多評論   

          2012-04-11 14:47 by dhc
          Storm中既有Java,又有Clojure

          # re: Storm源碼淺析之topology的提交[未登錄]  回復(fù)  更多評論   

          2012-04-11 14:50 by dhc
          你好,看了這篇文章很多以前沒有明白的點(diǎn)豁然開朗。但是這篇文章只是分析了storm client、nimbus上的流程,能夠介紹下supervisor啟動后的流程。謝謝!
          主站蜘蛛池模板: 望谟县| 昌图县| 神农架林区| 子洲县| 林西县| 普定县| 齐齐哈尔市| 宁德市| 马鞍山市| 万荣县| 浮梁县| 肥城市| 武冈市| 泸西县| 庆阳市| 万荣县| 井陉县| 赤峰市| 济源市| 宁德市| 射阳县| 南华县| 襄樊市| 盐城市| 通道| 富蕴县| 万州区| 会昌县| 彩票| 洪湖市| 新乐市| 庐江县| 谢通门县| 偃师市| 玉龙| 云霄县| 大渡口区| 石渠县| 吴堡县| 阿瓦提县| 武山县|