莊周夢(mèng)蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          Storm源碼淺析之topology的提交

          Posted on 2011-12-01 21:48 dennis 閱讀(15245) 評(píng)論(10)  編輯  收藏 所屬分類: java源碼解讀Hadoop與分布式Clojure
              原文:http://www.aygfsteel.com/killme2008/archive/2011/11/17/364112.html
              作者:dennis (killme2008@gmail.com)
              轉(zhuǎn)載請(qǐng)注明出處。

              最近一直在讀twitter開源的這個(gè)分布式流計(jì)算框架——storm的源碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進(jìn)行組織,并且只分析我關(guān)注的東西,因此稱之為淺析。       

          一、介紹
              Storm的開發(fā)語言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫核心邏輯。源碼統(tǒng)計(jì)結(jié)果:
               180 text files.
               
          177 unique files.                                          
                 
          7 files ignored.

          http:
          //cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
          -------------------------------------------------------------------------------
          Language                     files          blank        comment           code
          -------------------------------------------------------------------------------
          Java                           
          125           5010           2414          25661
          Lisp                            
          33            732            283           4871
          Python                           
          7            742            433           4675
          CSS                              
          1             12             45           1837
          ruby                             
          2             22              0            104
          Bourne Shell                     
          1              0              0              6
          Javascript                       
          2              1             15              6
          -------------------------------------------------------------------------------
          SUM:                           
          171           6519           3190          37160
          -------------------------------------------------------------------------------

              Java代碼25000多行,而Clojure(Lisp)只有4871行,說語言不重要再次證明是扯淡。
                  
          二、Topology和Nimbus       
              Topology是storm的核心理念,將spout和bolt組織成一個(gè)topology,運(yùn)行在storm集群里,完成實(shí)時(shí)分析和計(jì)算的任務(wù)。這里我主要想介紹下topology部署到storm集群的大概過程。提交一個(gè)topology任務(wù)到Storm集群是通過StormSubmitter.submitTopology方法提交:
          StormSubmitter.submitTopology(name, conf, builder.createTopology());
              我們將topology打成jar包后,利用bin/storm這個(gè)python腳本,執(zhí)行如下命令:
          bin/storm jar xxxx.jar com.taobao.MyTopology args
              將jar包提交給storm集群。storm腳本會(huì)啟動(dòng)JVM執(zhí)行Topology的main方法,執(zhí)行submitTopology的過程。而submitTopology會(huì)將jar文件上傳到nimbus,上傳是通過socket傳輸。在storm這個(gè)python腳本的jar方法里可以看到:
          def jar(jarfile, klass, *args):                                                                                                                               
             exec_storm_class(                                                                                                                                          
                  klass,                                                                                                                                                
                  jvmtype
          ="-client",                                                                                                                                    
                  extrajars
          =[jarfile, CONF_DIR, STORM_DIR + "/bin"],                                                                                                    
                  args
          =args,                                                                                                                                            
                  prefix
          ="export STORM_JAR=" + jarfile + ";")
               將jar文件的地址設(shè)置為環(huán)境變量STORM_JAR,這個(gè)環(huán)境變量在執(zhí)行submitTopology的時(shí)候用到:
          //StormSubmitter.java 
          private static void submitJar(Map conf) {
                  
          if(submittedJar==null) {
                      LOG.info(
          "Jar not uploaded to master yet. Submitting jar");
                      String localJar 
          = System.getenv("STORM_JAR");
                      submittedJar 
          = submitJar(conf, localJar);
                  } 
          else {
                      LOG.info(
          "Jar already uploaded to master. Not submitting jar.");
                  }
              }
              通過環(huán)境變量找到j(luò)ar包的地址,然后上傳。利用環(huán)境變量傳參是個(gè)小技巧。

              其次,nimbus在接收到j(luò)ar文件后,存放到數(shù)據(jù)目錄的inbox目錄,nimbus數(shù)據(jù)目錄的結(jié)構(gòu)
          -nimbus
               
          -inbox
                   
          -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
                   
          -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

               
          -stormdist
                  
          -storm-id
                     
          -stormjar.jar
                     
          -stormconf.ser
                     
          -stormcode.ser
               其中inbox用于存放提交的jar文件,每個(gè)jar文件都重命名為stormjar加上一個(gè)32位的UUID。而stormdist存放的是啟動(dòng)topology后生成的文件,每個(gè)topology都分配一個(gè)唯一的id,ID的規(guī)則是“name-計(jì)數(shù)-時(shí)間戳”。啟動(dòng)后的topology的jar文件名命名為storm.jar ,而它的配置經(jīng)過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。這些文件在部署的時(shí)候,supervisor會(huì)從這個(gè)目錄下載這些文件,然后在supervisor本地執(zhí)行這些代碼。
              進(jìn)入重點(diǎn),topology任務(wù)的分配過程(zookeeper路徑說明忽略root):
          1.在zookeeper上創(chuàng)建/taskheartbeats/{storm id} 路徑,用于任務(wù)的心跳檢測(cè)。storm對(duì)zookeeper的一個(gè)重要應(yīng)用就是利用zk的臨時(shí)節(jié)點(diǎn)做存活檢測(cè)。task將定時(shí)刷新節(jié)點(diǎn)的時(shí)間戳,然后nimbus會(huì)檢測(cè)這個(gè)時(shí)間戳是否超過timeout設(shè)置。
          2.從topology中獲取bolts,spouts設(shè)置的并行數(shù)目以及全局配置的最大并行數(shù),然后產(chǎn)生task id列表,如[1 2 3 4]
          3.在zookeeper上創(chuàng)建/tasks/{strom id}/{task id}路徑,并存儲(chǔ)task信息
          4.開始分配任務(wù)(內(nèi)部稱為assignment), 具體步驟:
           (1)從zk上獲得已有的assignment(新的toplogy當(dāng)然沒有了)
           (2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個(gè)worker的端口。
           (3)將任務(wù)均勻地分配給可用的worker,這里有兩種情況:
           (a)task數(shù)目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
          {1: [host1:port1] 2 : [host2:port1]
                   
          3 : [host1:port1] 4 : [host2:port1]}
          ,可以看到任務(wù)平均地分配在兩個(gè)worker上。
          (b)如果task數(shù)目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會(huì)將woker排序,將不同host間隔排列,保證task不會(huì)全部分配到同一個(gè)worker上,也就是將worker排列成
          [host1:port1 host2:port1 host1:port2 host2:port2]
          ,然后分配任務(wù)為
          {1: host1:port1 , 2 : host2:port2}

          (4)記錄啟動(dòng)時(shí)間
          (5)判斷現(xiàn)有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
          5.啟動(dòng)topology,所謂啟動(dòng),只是將zookeeper上/storms/{storm id}對(duì)應(yīng)的數(shù)據(jù)里的active設(shè)置為true。
          6.nimbus會(huì)檢查task的心跳,如果發(fā)現(xiàn)task心跳超過超時(shí)時(shí)間,那么會(huì)重新跳到第4步做re-assignment。

          評(píng)論

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

          2011-12-05 11:11 by
          Java代碼25000多行,而Clojure(Lisp)只有4871行5563。。。不明白是什么意思?是Storm中既有Java,又有Clojure?還是Storm有Java和Clojure兩個(gè)版本?

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

          2011-12-14 23:56 by fiw
          非常感謝您的講解,給了我很大的幫助。有一個(gè)問題,Storm最后處理完的消息存到哪里了呢?如何查看處理的結(jié)果呢?我自己搭了一個(gè)Storm集群,跑了一下Storm_starter的wordCount例子,但是在StormUI上沒有找到結(jié)果,希望能得到您的幫助。

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

          2011-12-15 12:02 by dennis
          @fiw
          處理完的消息怎么存儲(chǔ)是你自己負(fù)責(zé)的事情,storm不幫你處理的。
          wordcount的例子應(yīng)該就是放在內(nèi)存里了,掛了就沒了。
          storm ui只是統(tǒng)計(jì),并不參與topology的邏輯展現(xiàn)。

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

          2011-12-15 23:08 by coderplay
          很像hadoop :)

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

          2011-12-16 16:58 by fork
          storm為了保證可靠性處理是否必須要存儲(chǔ)還沒有完全處理的Turple?這樣發(fā)送Turple的Spout是否會(huì)出現(xiàn)OOM?

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

          2011-12-19 14:58 by dennis
          @fork
          不會(huì)的,storm只會(huì)存儲(chǔ)發(fā)送的tuple id,這只是一個(gè)8個(gè)字節(jié)的long類型,想要OOM還是比較困難的。

          # re: Storm源碼淺析之topology的提交  回復(fù)  更多評(píng)論   

          2012-01-06 11:34 by xiaofeng_metis
          期待Storm源碼淺析的其它內(nèi)容

          # re: Storm源碼淺析之topology的提交[未登錄]  回復(fù)  更多評(píng)論   

          2012-02-20 01:03 by 胡楊
          前輩,您好!看到您讀過那么多的源代碼,真的是發(fā)自內(nèi)心的崇拜,你的精力好旺盛,你對(duì)技術(shù)真的好執(zhí)著!

          現(xiàn)在這幾天準(zhǔn)備開始讀讀spring的源碼。但是把源代碼導(dǎo)入Eclipse并運(yùn)行起來以后,在spring啟動(dòng)的入口打了個(gè)斷點(diǎn),可是總是進(jìn)入不了這個(gè)斷點(diǎn)。在網(wǎng)上查了很多的資料,有的說是要編譯一下源代碼,我試過了,也不行,這個(gè)問題困擾了好幾天。不知道您剛開始的時(shí)候是怎么閱讀的? 用的什么工具?

          # re: Storm源碼淺析之topology的提交[未登錄]  回復(fù)  更多評(píng)論   

          2012-04-11 14:47 by dhc
          Storm中既有Java,又有Clojure

          # re: Storm源碼淺析之topology的提交[未登錄]  回復(fù)  更多評(píng)論   

          2012-04-11 14:50 by dhc
          你好,看了這篇文章很多以前沒有明白的點(diǎn)豁然開朗。但是這篇文章只是分析了storm client、nimbus上的流程,能夠介紹下supervisor啟動(dòng)后的流程。謝謝!
          主站蜘蛛池模板: 平武县| 盱眙县| 应用必备| 怀宁县| 监利县| 塔河县| 东光县| 集安市| 双峰县| 蒲城县| 阳高县| 定兴县| 岱山县| 卢氏县| 房产| 伊宁市| 贵定县| 防城港市| 栾川县| 泰和县| 荃湾区| 湟中县| 石泉县| 明光市| 徐水县| 大足县| 梁山县| 榆中县| 浙江省| 库尔勒市| 磴口县| 东阿县| 天峻县| 开阳县| 灵寿县| 锦州市| 合江县| 乐至县| 通道| 深泽县| 定州市|