posts - 495,comments - 227,trackbacks - 0
          http://www.linuxidc.com/Linux/2012-01/51615.htm

          1 Map side tuning 參數(shù)

          1.1 MapTask 運(yùn)行內(nèi)部原理


          當(dāng)map task 開(kāi)始運(yùn)算,并產(chǎn)生中間數(shù)據(jù)時(shí),其產(chǎn)生的中間結(jié)果并非直接就簡(jiǎn)單的寫入磁盤。這中間的過(guò)程比較復(fù)雜,并且利用到了內(nèi)存buffer 來(lái)進(jìn)行已經(jīng)產(chǎn)生的部分結(jié)果的緩存,并在內(nèi)存buffer 中進(jìn)行一些預(yù)排序來(lái)優(yōu)化整個(gè)map 的性能。如上圖所示,每一個(gè)map 都會(huì)對(duì)應(yīng)存在一個(gè)內(nèi)存buffer (MapOutputBuffer ,即上圖的buffer in memory ),map 會(huì)將已經(jīng)產(chǎn)生的部分結(jié)果先寫入到該buffer 中,這個(gè)buffer 默認(rèn)是100MB 大小,但是這個(gè)大小是可以根據(jù)job 提交時(shí)的參數(shù)設(shè)定來(lái)調(diào)整的,該參數(shù)即為: io.sort.mb 。當(dāng)map 的產(chǎn)生數(shù)據(jù)非常大時(shí),并且把io.sort.mb 調(diào)大,那么map 在整個(gè)計(jì)算過(guò)程中spill 的次數(shù)就勢(shì)必會(huì)降低,map task 對(duì)磁盤的操作就會(huì)變少,如果map tasks 的瓶頸在磁盤上,這樣調(diào)整就會(huì)大大提高map 的計(jì)算性能。map 做sort 和spill 的內(nèi)存結(jié)構(gòu)如下如所示:


          map 在運(yùn)行過(guò)程中,不停的向該buffer 中寫入已有的計(jì)算結(jié)果,但是該buffer 并不一定能將全部的map 輸出緩存下來(lái),當(dāng)map 輸出超出一定閾值(比如100M ),那么map 就必須將該buffer 中的數(shù)據(jù)寫入到磁盤中去,這個(gè)過(guò)程在mapreduce 中叫做spill 。map 并不是要等到將該buffer 全部寫滿時(shí)才進(jìn)行spill ,因?yàn)槿绻繉憹M了再去寫spill ,勢(shì)必會(huì)造成map 的計(jì)算部分等待buffer 釋放空間的情況。所以,map 其實(shí)是當(dāng)buffer 被寫滿到一定程度(比如80% )時(shí),就開(kāi)始進(jìn)行spill 。這個(gè)閾值也是由一個(gè)job 的配置參數(shù)來(lái)控制,即 io.sort.spill.percent ,默認(rèn)為0.80 或80% 。這個(gè)參數(shù)同樣也是影響spill 頻繁程度,進(jìn)而影響map task 運(yùn)行周期對(duì)磁盤的讀寫頻率的。但非特殊情況下,通常不需要人為的調(diào)整。調(diào)整io.sort.mb 對(duì)用戶來(lái)說(shuō)更加方便。

          當(dāng)map task 的計(jì)算部分全部完成后,如果map 有輸出,就會(huì)生成一個(gè)或者多個(gè)spill 文件,這些文件就是map 的輸出結(jié)果。map 在正常退出之前,需要將這些spill 合并(merge )成一個(gè),所以map 在結(jié)束之前還有一個(gè)merge 的過(guò)程。merge 的過(guò)程中,有一個(gè)參數(shù)可以調(diào)整這個(gè)過(guò)程的行為,該參數(shù)為: io.sort.factor 。該參數(shù)默認(rèn)為10 。它表示當(dāng)merge spill 文件時(shí),最多能有多少并行的stream 向merge 文件中寫入。比如如果map 產(chǎn)生的數(shù)據(jù)非常的大,產(chǎn)生的spill 文件大于10 ,而io.sort.factor 使用的是默認(rèn)的10 ,那么當(dāng)map 計(jì)算完成做merge 時(shí),就沒(méi)有辦法一次將所有的spill 文件merge 成一個(gè),而是會(huì)分多次,每次最多10 個(gè)stream 。這也就是說(shuō),當(dāng)map 的中間結(jié)果非常大,調(diào)大io.sort.factor ,有利于減少merge 次數(shù),進(jìn)而減少map 對(duì)磁盤的讀寫頻率,有可能達(dá)到優(yōu)化作業(yè)的目的。

          當(dāng)job 指定了combiner 的時(shí)候,我們都知道m(xù)ap 介紹后會(huì)在map 端根據(jù)combiner 定義的函數(shù)將map 結(jié)果進(jìn)行合并。運(yùn)行combiner 函數(shù)的時(shí)機(jī)有可能會(huì)是merge 完成之前,或者之后,這個(gè)時(shí)機(jī)可以由一個(gè)參數(shù)控制,即 min.num.spill.for.combine (default 3 ),當(dāng)job 中設(shè)定了combiner ,并且spill 數(shù)最少有3 個(gè)的時(shí)候,那么combiner 函數(shù)就會(huì)在merge 產(chǎn)生結(jié)果文件之前運(yùn)行。通過(guò)這樣的方式,就可以在spill 非常多需要merge ,并且很多數(shù)據(jù)需要做conbine 的時(shí)候,減少寫入到磁盤文件的數(shù)據(jù)數(shù)量,同樣是為了減少對(duì)磁盤的讀寫頻率,有可能達(dá)到優(yōu)化作業(yè)的目的。

          減少中間結(jié)果讀寫進(jìn)出磁盤的方法不止這些,還有就是壓縮。也就是說(shuō)map 的中間,無(wú)論是spill 的時(shí)候,還是最后merge 產(chǎn)生的結(jié)果文件,都是可以壓縮的。壓縮的好處在于,通過(guò)壓縮減少寫入讀出磁盤的數(shù)據(jù)量。對(duì)中間結(jié)果非常大,磁盤速度成為map 執(zhí)行瓶頸的job ,尤其有用。控制map 中間結(jié)果是否使用壓縮的參數(shù)為: mapred.compress.map.output (true/false) 。將這個(gè)參數(shù)設(shè)置為true 時(shí),那么map 在寫中間結(jié)果時(shí),就會(huì)將數(shù)據(jù)壓縮后再寫入磁盤,讀結(jié)果時(shí)也會(huì)采用先解壓后讀取數(shù)據(jù)。這樣做的后果就是:寫入磁盤的中間結(jié)果數(shù)據(jù)量會(huì)變少,但是cpu 會(huì)消耗一些用來(lái)壓縮和解壓。所以這種方式通常適合job 中間結(jié)果非常大,瓶頸不在cpu ,而是在磁盤的讀寫的情況。說(shuō)的直白一些就是用cpu 換IO 。根據(jù)觀察,通常大部分的作業(yè)cpu 都不是瓶頸,除非運(yùn)算邏輯異常復(fù)雜。所以對(duì)中間結(jié)果采用壓縮通常來(lái)說(shuō)是有收益的。以下是一個(gè)wordcount 中間結(jié)果采用壓縮和不采用壓縮產(chǎn)生的map 中間結(jié)果本地磁盤讀寫的數(shù)據(jù)量對(duì)比:

          map 中間結(jié)果不壓縮:


          map 中間結(jié)果壓縮:


          可以看出,同樣的job ,同樣的數(shù)據(jù),在采用壓縮的情況下,map 中間結(jié)果能縮小將近10 倍,如果map 的瓶頸在磁盤,那么job 的性能提升將會(huì)非常可觀。

          當(dāng)采用map 中間結(jié)果壓縮的情況下,用戶還可以選擇壓縮時(shí)???用哪種壓縮格式進(jìn)行壓縮,現(xiàn)在Hadoop 支持的壓縮格式有: GzipCodec LzoCodec BZip2Codec LzmaCodec 等壓縮格式。通常來(lái)說(shuō),想要達(dá)到比較平衡的 cpu 和磁盤壓縮比, LzoCodec 比較適合。但也要取決于 job 的具體情況。用戶若想要自行選擇中間結(jié)果的壓縮算法,可以設(shè)置配置參數(shù): mapred.map.output.compression.codec =org.apache.hadoop.io.compress.DefaultCodec 或者其他用戶自行選擇的壓縮方式。


          1.2 Map side 相關(guān)參數(shù)調(diào)優(yōu)

          選項(xiàng)

          類型

          默認(rèn)值

          描述

          io.sort.mb

          int

          100

          緩存 map 中間結(jié)果的 buffer 大小 (in MB)

          io.sort.record.percent

          float

          0.05

          io.sort.mb 中用來(lái)保存 map output 記錄邊界的百分比,其他緩存用來(lái)保存數(shù)據(jù)

          io.sort.spill.percent

          float

          0.80

          map 開(kāi)始做 spill 操作的閾值

          io.sort.factor

          int

          10

          merge 操作時(shí)同時(shí)操作的 stream 數(shù)上限。

          min.num.spill.for.combine

          int

          3

          combiner 函數(shù)運(yùn)行的最小 spill 數(shù)

          mapred.compress.map.output

          boolean

          false

          map 中間結(jié)果是否采用壓縮

          mapred.map.output.compression.codec

          class name

          org.apache.Hadoop.io.

          compress.DefaultCodec

          map 中間結(jié)果的壓縮格式

           

          2 Reduce side tuning 參數(shù)

          2.1 ReduceTask 運(yùn)行內(nèi)部原理


          reduce 的運(yùn)行是分成三個(gè)階段的。分別為 copy->sort->reduce 。由于 job 的每一個(gè) map 都會(huì)根據(jù) reduce(n) 數(shù)將數(shù)據(jù)分成 map 輸出結(jié)果分成 n 個(gè) partition ,所以 map 的中間結(jié)果中是有可能包含每一個(gè) reduce 需要處理的部分?jǐn)?shù)據(jù)的。所以,為了優(yōu)化 reduce 的執(zhí)行時(shí)間, hadoop 中是等 job 的第一個(gè) map 結(jié)束后,所有的 reduce 就開(kāi)始嘗試從完成的 map 中下載該 reduce 對(duì)應(yīng)的 partition 部分?jǐn)?shù)據(jù)。這個(gè)過(guò)程就是通常所說(shuō)的 shuffle ,也就是 copy 過(guò)程。

                 Reduce task 在做 shuffle 時(shí),實(shí)際上就是從不同的已經(jīng)完成的 map 上去下載屬于自己這個(gè) reduce 的部分?jǐn)?shù)據(jù),由于 map 通常有許多個(gè),所以對(duì)一個(gè) reduce 來(lái)說(shuō),下載也可以是并行的從多個(gè) map 下載,這個(gè)并行度是可以調(diào)整的,調(diào)整參數(shù)為: mapred.reduce.parallel.copies default 5 )。默認(rèn)情況下,每個(gè)只會(huì)有 5 個(gè)并行的下載線程在從 map 下數(shù)據(jù),如果一個(gè)時(shí)間段內(nèi) job 完成的 map 100 個(gè)或者更多,那么 reduce 也最多只能同時(shí)下載 5 個(gè) map 的數(shù)據(jù),所以這個(gè)參數(shù)比較適合 map 很多并且完成的比較快的 job 的情況下調(diào)大,有利于 reduce 更快的獲取屬于自己部分的數(shù)據(jù)。

                 reduce 的每一個(gè)下載線程在下載某個(gè) map 數(shù)據(jù)的時(shí)候,有可能因?yàn)槟莻€(gè) map 中間結(jié)果所在機(jī)器發(fā)生錯(cuò)誤,或者中間結(jié)果的文件丟失,或者網(wǎng)絡(luò)瞬斷等等情況,這樣 reduce 的下載就有可能失敗,所以 reduce 的下載線程并不會(huì)無(wú)休止的等待下去,當(dāng)一定時(shí)間后下載仍然失敗,那么下載線程就會(huì)放棄這次下載,并在隨后嘗試從另外的地方下載(因?yàn)檫@段時(shí)間 map 可能重跑)。所以 reduce 下載線程的這個(gè)最大的下載時(shí)間段是可以調(diào)整的,調(diào)整參數(shù)為: mapred.reduce.copy.backoff default 300 秒)。如果集群環(huán)境的網(wǎng)絡(luò)本身是瓶頸,那么用戶可以通過(guò)調(diào)大這個(gè)參數(shù)來(lái)避免 reduce 下載線程被誤判為失敗的情況。不過(guò)在網(wǎng)絡(luò)環(huán)境比較好的情況下,沒(méi)有必要調(diào)整。通常來(lái)說(shuō)專業(yè)的集群網(wǎng)絡(luò)不應(yīng)該有太大問(wèn)題,所以這個(gè)參數(shù)需要調(diào)整的情況不多。

                 Reduce map 結(jié)果下載到本地時(shí),同樣也是需要進(jìn)行 merge 的,所以 io.sort.factor 的配置選項(xiàng)同樣會(huì)影響 reduce 進(jìn)行 merge 時(shí)的行為,該參數(shù)的詳細(xì)介紹上文已經(jīng)提到,當(dāng)發(fā)現(xiàn) reduce shuffle 階段 iowait 非常的高的時(shí)候,就有可能通過(guò)調(diào)大這個(gè)參數(shù)來(lái)加大一次 merge 時(shí)的并發(fā)吞吐,優(yōu)化 reduce 效率。

                 Reduce shuffle 階段對(duì)下載來(lái)的 map 數(shù)據(jù),并不是立刻就寫入磁盤的,而是會(huì)先緩存在內(nèi)存中,然后當(dāng)使用內(nèi)存達(dá)到一定量的時(shí)候才刷入磁盤。這個(gè)內(nèi)存大小的控制就不像 map 一樣可以通過(guò) io.sort.mb 來(lái)設(shè)定了,而是通過(guò)另外一個(gè)參數(shù)來(lái)設(shè)置: mapred.job.shuffle.input.buffer.percent default 0.7 ),這個(gè)參數(shù)其實(shí)是一個(gè)百分比,意思是說(shuō), shuffile reduce 內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為: 0.7 × maxHeap of reduce task 。也就是說(shuō),如果該 reduce task 的最大 heap 使用量(通常通過(guò) mapred.child.java.opts 來(lái)設(shè)置,比如設(shè)置為 -Xmx1024m )的一定比例用來(lái)緩存數(shù)據(jù)。默認(rèn)情況下, reduce 會(huì)使用其 heapsize 70% 來(lái)在內(nèi)存中緩存數(shù)據(jù)。如果 reduce heap 由于業(yè)務(wù)原因調(diào)整的比較大,相應(yīng)的緩存大小也會(huì)變大,這也是為什么 reduce 用來(lái)做緩存的參數(shù)是一個(gè)百分比,而不是一個(gè)固定的值了。

          假設(shè) mapred.job.shuffle.input.buffer.percent 0.7 reduce task max heapsize 1G ,那么用來(lái)做下載數(shù)據(jù)緩存的內(nèi)存就為大概 700MB 左右,這 700M 的內(nèi)存,跟 map 端一樣,也不是要等到全部寫滿才會(huì)往磁盤刷的,而是當(dāng)這 700M 中被使用到了一定的限度(通常是一個(gè)百分比),就會(huì)開(kāi)始往磁盤刷。這個(gè)限度閾值也是可以通過(guò) job 參數(shù)來(lái)設(shè)定的,設(shè)定參數(shù)為: mapred.job.shuffle.merge.percent default 0.66 )。如果下載速度很快,很容易就把內(nèi)存緩存撐大,那么調(diào)整一下這個(gè)參數(shù)有可能會(huì)對(duì) reduce 的性能有所幫助。

          當(dāng) reduce 將所有的 map 上對(duì)應(yīng)自己 partition 的數(shù)據(jù)下載完成后,就會(huì)開(kāi)始真正的 reduce 計(jì)算階段(中間有個(gè) sort 階段通常時(shí)間非常短,幾秒鐘就完成了,因?yàn)檎麄€(gè)下載階段就已經(jīng)是邊下載邊 sort ,然后邊 merge 的)。當(dāng) reduce task 真正進(jìn)入 reduce 函數(shù)的計(jì)算階段的時(shí)候,有一個(gè)參數(shù)也是可以調(diào)整 reduce 的計(jì)算行為。也就是: mapred.job.reduce.input.buffer.percent default 0.0 )。由于 reduce 計(jì)算時(shí)肯定也是需要消耗內(nèi)存的,而在讀取 reduce 需要的數(shù)據(jù)時(shí),同樣是需要內(nèi)存作為 buffer ,這個(gè)參數(shù)是控制,需要多少的內(nèi)存百分比來(lái)作為 reduce 讀已經(jīng) sort 好的數(shù)據(jù)的 buffer 百分比。默認(rèn)情況下為 0 ,也就是說(shuō),默認(rèn)情況下, reduce 是全部從磁盤開(kāi)始讀處理數(shù)據(jù)。如果這個(gè)參數(shù)大于 0 ,那么就會(huì)有一定量的數(shù)據(jù)被緩存在內(nèi)存并輸送給 reduce ,當(dāng) reduce 計(jì)算邏輯消耗內(nèi)存很小時(shí),可以分一部分內(nèi)存用來(lái)緩存數(shù)據(jù),反正 reduce 的內(nèi)存閑著也是閑著。

          2.2 Reduce side 相關(guān)參數(shù)調(diào)優(yōu)

          選項(xiàng)

          類型

          默認(rèn)值

          描述

          mapred.reduce.parallel.copies

          int

          5

          每個(gè) reduce 并行下載 map 結(jié)果的最大線程數(shù)

          mapred.reduce.copy.backoff

          int

          300

          reduce 下載線程最大等待時(shí)間( in sec

          io.sort.factor

          int

          10

          同上

          mapred.job.shuffle.input.buffer.percent

          float

          0.7

          用來(lái)緩存 shuffle 數(shù)據(jù)的 reduce task heap 百分比

          mapred.job.shuffle.merge.percent

          float

          0.66

          緩存的內(nèi)存中多少百分比后開(kāi)始做 merge 操作

          mapred.job.reduce.input.buffer.percent

          float

          0.0

          sort 完成后 reduce 計(jì)算階段用來(lái)緩存數(shù)據(jù)的百分比

          linux
          posted on 2014-11-19 13:42 SIMONE 閱讀(571) 評(píng)論(0)  編輯  收藏 所屬分類: hadoop
          主站蜘蛛池模板: 闸北区| 韩城市| 长子县| 彰武县| 固安县| 安宁市| 饶阳县| 湖北省| 饶平县| 平江县| 伽师县| 瓮安县| 日土县| 泸水县| 焉耆| 华坪县| 彭水| 永福县| 原阳县| 乌拉特前旗| 高州市| 林口县| 芜湖市| 贞丰县| 海宁市| 游戏| 陆丰市| 平山县| 息烽县| 建德市| 施甸县| 尉犁县| 满洲里市| 宁陵县| 永仁县| 宜川县| 大姚县| 靖边县| 兴文县| 万盛区| 奉新县|