posts - 28, comments - 37, trackbacks - 0, articles - 0


          mapreduce,一個(gè)jobmap個(gè)數(shù), 每個(gè)map處理的數(shù)據(jù)量是如何決定的呢? 另外每個(gè)map又是如何讀取輸入文件的內(nèi)容呢? 用戶是否可以自己決定輸入方式, 決定map個(gè)數(shù)呢? 這篇文章將詳細(xì)講述hadoop中各種InputFormat的功能和如何編寫自定義的InputFormat.

           

          簡(jiǎn)介: mapreduce作業(yè)會(huì)根據(jù)輸入目錄產(chǎn)生多個(gè)map任務(wù), 通過(guò)多個(gè)map任務(wù)并行執(zhí)行來(lái)提高作業(yè)運(yùn)行速度, 但如果map數(shù)量過(guò)少, 并行量低, 作業(yè)執(zhí)行慢, 如果map數(shù)過(guò)多, 資源有限, 也會(huì)增加調(diào)度開(kāi)銷. 因此, 根據(jù)輸入產(chǎn)生合理的map數(shù), 為每個(gè)map分配合適的數(shù)據(jù)量, 能有效的提升資源利用率, 并使作業(yè)運(yùn)行速度加快.

              mapreduce, 每個(gè)作業(yè)都會(huì)通過(guò) InputFormat來(lái)決定map數(shù)量. InputFormat是一個(gè)接口, 提供兩個(gè)方法:

          InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

          RecordReader<K, V> getRecordReader(InputSplit split,

                                               JobConf job,

                                               Reporter reporter) throws IOException;

              其中getSplits方法會(huì)根據(jù)輸入目錄產(chǎn)生InputSplit數(shù)組, 每個(gè)InputSplit會(huì)相應(yīng)產(chǎn)生一個(gè)map任務(wù), map的輸入定義在InputSplit. getRecordReader方法返回一個(gè)RecordReader對(duì)象, RecordReader決定了map任務(wù)如何讀取輸入數(shù)據(jù), 例如一行一行的讀取還是一個(gè)字節(jié)一個(gè)字節(jié)的讀取, 等等.

              下圖是InputFormat的實(shí)現(xiàn)類:

                 (暫時(shí)無(wú)法上傳)

              這理詳細(xì)介紹FileInputFormatCombineFileInputFormat, 其它不常用,有興趣的可以自己查看hadoop源碼.


           

          FileInputFormat(舊接口org.apache.hadoop.mapred)

           

          mapreduce默認(rèn)使用TextInputFormatTextInputFormat沒(méi)有實(shí)現(xiàn)自己的getSplits方法,繼承于FileInputFormat, 因此使用了FileInputFormat的.

          org.apache.hadoop.mapred.FileInputFormatgetSplits流程:

          兩個(gè)配置

          mapred.min.split.size        (一個(gè)map最小輸入長(zhǎng)度),

          mapred.map.tasks                (推薦map數(shù)量)

          如何決定每個(gè)map輸入長(zhǎng)度呢? 首先獲取輸入目錄下所有文件的長(zhǎng)度和, 除以mapred.map.tasks得到一個(gè)推薦長(zhǎng)度goalSize, 然后通過(guò)式子: Math.max(minSize, Math.min(goalSize, blockSize))決定map輸入長(zhǎng)度. 這里的minSizemapred.min.split.size, blockSize為相應(yīng)文件的block長(zhǎng)度. 這式子能保證一個(gè)map的輸入至少大于mapred.min.split.size, 對(duì)于推薦的map長(zhǎng)度,只有它的長(zhǎng)度小于blockSize且大于mapred.min.split.size才會(huì)有效果. 由于mapred.min.split.size默認(rèn)長(zhǎng)度為1, 因此通常情況下只要小于blockSize就有效果,否則使用blockSize做為map輸入長(zhǎng)度.

          因此, 如果想增加map數(shù), 可以把mapred.min.split.size調(diào)小(其實(shí)默認(rèn)值即可), 另外還需要把mapred.map.tasks設(shè)置大.

          如果需要減少map數(shù),可以把mapred.min.split.size調(diào)大, 另外把mapred.map.tasks調(diào)小.

          這里要特別指出的是FileInputFormat會(huì)讓每個(gè)輸入文件至少產(chǎn)生一個(gè)map任務(wù), 因此如果你的輸入目錄下有許多文件, 而每個(gè)文件都很小, 例如幾十kb, 那么每個(gè)文件都產(chǎn)生一個(gè)map會(huì)增加調(diào)度開(kāi)銷. 作業(yè)變慢.

          那么如何防止這種問(wèn)題呢? CombineFileInputFormat能有效的減少map數(shù)量.


          FileInputFormat(新接口org.apache.hadoop.mapreduce.lib.input)

          Hadoop 0.20開(kāi)始定義了一套新的mapreduce編程接口, 使用新的FileInputFormat, 它與舊接口下的FileInputFormat主要區(qū)別在于, 它不再使用mapred.map.tasks, 而使用mapred.max.split.size參數(shù)代替goalSize, 通過(guò)Math.max(minSize, Math.min(maxSize, blockSize))決定map輸入長(zhǎng)度, 一個(gè)map的輸入要大于minSize,小于

          Math.min(maxSize, blockSize).

              若需增加map數(shù),可以把mapred.min.split.size調(diào)小,mapred.max.split.size調(diào)大. 若需減少map數(shù), 可以把mapred.min.split.size調(diào)大, 并把mapred.max.split.size調(diào)小.


          CombineFileInputFormat

          顧名思義, CombineFileInputFormat的作用是把許多文件合并作為一個(gè)map的輸入.

          在它之前,可以使用MultiFileInputFormat,不過(guò)其功能太簡(jiǎn)單, 以文件為單位,一個(gè)文件至多分給一個(gè)map處理, 如果某個(gè)目錄下有許多小文件, 另外還有一個(gè)超大文件, 處理大文件的map會(huì)嚴(yán)重偏慢.

          CombineFileInputFormat是一個(gè)被推薦使用的InputFormat. 它有三個(gè)配置:

          mapred.min.split.size.per.node 一個(gè)節(jié)點(diǎn)上split的至少的大小

          mapred.min.split.size.per.rack   一個(gè)交換機(jī)下split至少的大小

          mapred.max.split.size             一個(gè)split最大的大小

          它的主要思路是把輸入目錄下的大文件分成多個(gè)map的輸入, 并合并小文件, 做為一個(gè)map的輸入. 具體的原理是下述三步:

          1.根據(jù)輸入目錄下的每個(gè)文件,如果其長(zhǎng)度超過(guò)mapred.max.split.size,block為單位分成多個(gè)split(一個(gè)split是一個(gè)map的輸入),每個(gè)split的長(zhǎng)度都大于mapred.max.split.size, 因?yàn)橐?/span>block為單位, 因此也會(huì)大于blockSize, 此文件剩下的長(zhǎng)度如果大于mapred.min.split.size.per.node, 則生成一個(gè)split, 否則先暫時(shí)保留.

          2. 現(xiàn)在剩下的都是一些長(zhǎng)度效短的碎片,把每個(gè)rack下碎片合并, 只要長(zhǎng)度超過(guò)mapred.max.split.size就合并成一個(gè)split, 最后如果剩下的碎片比mapred.min.split.size.per.rack, 就合并成一個(gè)split, 否則暫時(shí)保留.

          3. 把不同rack下的碎片合并, 只要長(zhǎng)度超過(guò)mapred.max.split.size就合并成一個(gè)split, 剩下的碎片無(wú)論長(zhǎng)度, 合并成一個(gè)split.

          舉例: mapred.max.split.size=1000

               mapred.min.split.size.per.node=300

                mapred.min.split.size.per.rack=100

          輸入目錄下五個(gè)文件,rack1下三個(gè)文件,長(zhǎng)度為2050,1499,10, rack2下兩個(gè)文件,長(zhǎng)度為1010,80. 另外blockSize500.

          經(jīng)過(guò)第一步, 生成五個(gè)split: 1000,1000,1000,499,1000. 剩下的碎片為rack1:50,10; rack210:80

          由于兩個(gè)rack下的碎片和都不超過(guò)100, 所以經(jīng)過(guò)第二步, split和碎片都沒(méi)有變化.

          第三步,合并四個(gè)碎片成一個(gè)split, 長(zhǎng)度為150.

           

          如果要減少map數(shù)量, 可以調(diào)大mapred.max.split.size, 否則調(diào)小即可.

          其特點(diǎn)是: 一個(gè)塊至多作為一個(gè)map的輸入,一個(gè)文件可能有多個(gè)塊,一個(gè)文件可能因?yàn)閴K多分給做為不同map的輸入, 一個(gè)map可能處理多個(gè)塊,可能處理多個(gè)文件。


          編寫自己的InputFormat

           

              待續(xù)


           

          Feedback

          # re: hadoop各種輸入方法(InputFormat)匯總  回復(fù)  更多評(píng)論   

          2013-08-28 17:36 by 辰采星
          作為一個(gè)初學(xué)者,我通過(guò)樓主的帖子理解了map的輸入機(jī)制,但是還是不夠明確,如給定輸入目錄,輸入目錄下有多個(gè)文件要進(jìn)行處理(但是不能合并處理)的情況。

          # re: hadoop各種輸入方法(InputFormat)匯總  回復(fù)  更多評(píng)論   

          2013-11-25 10:46 by jingmin
          mapred.max.split.size=1000 這里的1000是什么單位? 我不知道這一千具體意味著什么,不知道樓主能否解釋下?

          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 洪湖市| 五莲县| 习水县| 库尔勒市| 陇川县| 黄龙县| 班玛县| 新野县| 阳信县| 汉源县| 合江县| 琼结县| 丹巴县| 剑阁县| 蒙阴县| 景谷| 东乌| 永顺县| 从化市| 乾安县| 女性| 会理县| 锡林郭勒盟| 久治县| 中山市| 侯马市| 延长县| 句容市| 定日县| 宁波市| 普格县| 海淀区| 三门县| 新田县| 乐东| 新河县| 大宁县| 中山市| 淮滨县| 南雄市| 太湖县|