隨筆-314  評論-209  文章-0  trackbacks-0
          王 騰騰 和 邵 兵 2015 年 11 月 26 日發布 WeiboGoogle+用電子郵件發送本頁面 Comments 1 引子 隨著云時代的來臨,大數據(Big data)也獲得了越來越多的關注。著云臺的分析師團隊認為,大數據(Big data)通常用來形容一個公司創造的大量非結構化和半結構化數據,這些數據在下載到關系型數據庫用于分析時會花費過多時間和金錢。大數據分析常和云計算聯系到一起,因為實時的大型數據集分析需要像 MapReduce 一樣的框架來向數十、數百或甚至數千的電腦分配工作。 “大數據”在互聯網行業指的是這樣一種現象:互聯網公司在日常運營中生成、累積的用戶網絡行為數據。這些數據的規模是如此龐大,以至于不能用 G 或 T 來衡量。所以如何高效的處理分析大數據的問題擺在了面前。對于大數據的處理優化方式有很多種,本文中主要介紹在使用 Hadoop 平臺中對數據進行壓縮處理來提高數據處理效率。 壓縮簡介 Hadoop 作為一個較通用的海量數據處理平臺,每次運算都會需要處理大量數據,我們會在 Hadoop 系統中對數據進行壓縮處理來優化磁盤使用率,提高數據在磁盤和網絡中的傳輸速度,從而提高系統處理數據的效率。在使用壓縮方式方面,主要考慮壓縮速度和壓縮文件的可分割性。綜合所述,使用壓縮的優點如下: 1. 節省數據占用的磁盤空間; 2. 加快數據在磁盤和網絡中的傳輸速度,從而提高系統的處理速度。 壓縮格式 Hadoop 對于壓縮格式的是自動識別。如果我們壓縮的文件有相應壓縮格式的擴展名(比如 lzo,gz,bzip2 等)。Hadoop 會根據壓縮格式的擴展名自動選擇相對應的解碼器來解壓數據,此過程完全是 Hadoop 自動處理,我們只需要確保輸入的壓縮文件有擴展名。 Hadoop 對每個壓縮格式的支持, 詳細見下表: 表 1. 壓縮格式 壓縮格式 工具 算法 擴展名 多文件 可分割性 DEFLATE 無 DEFLATE .deflate 不 不 GZIP gzip DEFLATE .gzp 不 不 ZIP zip DEFLATE .zip 是 是,在文件范圍內 BZIP2 bzip2 BZIP2 .bz2 不 是 LZO lzop LZO .lzo 不 是 如果壓縮的文件沒有擴展名,則需要在執行 MapReduce 任務的時候指定輸入格式。 1 2 3 4 5 hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/ hadoop-streaming-0.20.2-CD H3B4.jar -file /usr/home/hadoop/hello/mapper.py -mapper / usr/home/hadoop/hello/mapper.py -file /usr/home/hadoop/hello/ reducer.py -reducer /usr/home/hadoop/hello/reducer.py -input lzotest -output result4 - jobconf mapred.reduce.tasks=1*-inputformatorg.apache.hadoop.mapred.LzoTextInputFormat* 性能對比 Hadoop 下各種壓縮算法的壓縮比,壓縮時間,解壓時間見下表: 表 2. 性能對比 壓縮算法 原始文件大小 壓縮文件大小 壓縮速度 解壓速度 gzip 8.3GB 1.8GB 17.5MB/s 58MB/s bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s 因此我們可以得出: 1) Bzip2 壓縮效果明顯是最好的,但是 bzip2 壓縮速度慢,可分割。 2) Gzip 壓縮效果不如 Bzip2,但是壓縮解壓速度快,不支持分割。 3) LZO 壓縮效果不如 Bzip2 和 Gzip,但是壓縮解壓速度最快!并且支持分割! 這里提一下,文件的可分割性在 Hadoop 中是很非常重要的,它會影響到在執行作業時 Map 啟動的個數,從而會影響到作業的執行效率! 所有的壓縮算法都顯示出一種時間空間的權衡,更快的壓縮和解壓速度通常會耗費更多的空間。在選擇使用哪種壓縮格式時,我們應該根據自身的業務需求來選擇。 下圖是在本地壓縮與通過流將壓縮結果上傳到 BI 的時間對比。 圖 1. 時間對比 圖 1. 時間對比 使用方式 MapReduce 可以在三個階段中使用壓縮。 1. 輸入壓縮文件。如果輸入的文件是壓縮過的,那么在被 MapReduce 讀取時,它們會被自動解壓。 2.MapReduce 作業中,對 Map 輸出的中間結果集壓縮。實現方式如下: 1)可以在 core-site.xml 文件中配置,代碼如下 圖 2. core-site.xml 代碼示例 圖 2. core-site.xml 代碼示例 2)使用 Java 代碼指定 1 2 conf.setCompressMapOut(true); conf.setMapOutputCompressorClass(GzipCode.class); 最后一行代碼指定 Map 輸出結果的編碼器。 3.MapReduce 作業中,對 Reduce 輸出的最終結果集壓。實現方式如下: 1)可以在 core-site.xml 文件中配置,代碼如下 圖 3. core-site.xml 代碼示例 圖 3. core-site.xml 代碼示例 2)使用 Java 代碼指定 1 2 conf.setBoolean(“mapred.output.compress”,true); conf.setClass(“mapred.output.compression.codec”,GzipCode.class,CompressionCodec.class); 最后一行同樣指定 Reduce 輸出結果的編碼器。 壓縮框架 我們前面已經提到過關于壓縮的使用方式,其中第一種就是將壓縮文件直接作為入口參數交給 MapReduce 處理,MapReduce 會自動根據壓縮文件的擴展名來自動選擇合適解壓器處理數據。那么到底是怎么實現的呢?如下圖所示: 圖 4. 壓縮實現情形 圖 4. 壓縮實現情形 我們在配置 Job 作業的時候,會設置數據輸入的格式化方式,使用 conf.setInputFormat() 方法,這里的入口參數是 TextInputFormat.class。 TextInputFormat.class 繼承于 InputFormat.class,主要用于對數據進行兩方面的預處理。一是對輸入數據進行切分,生成一組 split,一個 split 會分發給一個 mapper 進行處理;二是針對每個 split,再創建一個 RecordReader 讀取 split 內的數據,并按照的形式組織成一條 record 傳給 map 函數進行處理。此類在對數據進行切分之前,會首先初始化壓縮解壓工程類 CompressionCodeFactory.class,通過工廠獲取實例化的編碼解碼器 CompressionCodec 后對數據處理操作。 下面我們來詳細的看一下從壓縮工廠獲取編碼解碼器的過程。 壓縮解壓工廠類 CompressionCodecFactory 壓縮解壓工廠類 CompressionCodeFactory.class 主要功能就是負責根據不同的文件擴展名來自動獲取相對應的壓縮解壓器 CompressionCodec.class,是整個壓縮框架的核心控制器。我們來看下 CompressionCodeFactory.class 中的幾個重要方法: 1. 初始化方法 圖 5. 代碼示例 圖 5. 代碼示例 ① getCodeClasses(conf) 負責獲取關于編碼解碼器 CompressionCodec.class 的配置信息。下面將會詳細講解。 ② 默認添加兩種編碼解碼器。當 getCodeClass(conf) 方法沒有讀取到相關的編碼解碼器 CompressionCodec.class 的配置信息時,系統會默認添加兩種編碼解碼器 CompressionCodec.class,分別是 GzipCode.class 和 DefaultCode.class。 ③ addCode(code) 此方法用于將編碼解碼器 CompressionCodec.class 添加到系統緩存中。下面將會詳細講解。 2. getCodeClasses(conf) 圖 6. 代碼示例 圖 6. 代碼示例 ① 這里我們可以看,系統讀取關于編碼解碼器 CompressionCodec.class 的配置信息在 core-site.xml 中 io.compression.codes 下。我們看下這段配置文件,如下圖所示: 圖 7. 代碼示例 圖 7. 代碼示例 Value 標簽中是每個編碼解碼 CompressionCodec.class 的完整路徑,中間用逗號分隔。我們只需要將自己需要使用到的編碼解碼配置到此屬性中,系統就會自動加載到緩存中。 除了上述的這種方式以外,Hadoop 為我們提供了另一種加載方式:代碼加載。同樣最終將信息配置在 io.compression.codes 屬性中,代碼如下: 1 2 conf.set("io.compression.codecs","org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");) 3. addCode(code) 方法添加編碼解碼器 圖 8. 代碼示例 圖 8. 代碼示例 addCodec(codec) 方法入口參數是個編碼解碼器 CompressionCodec.class,這里我們會首先接觸到它的一個方法。 ① codec.getDefaultExtension() 方法看方法名的字面意思我們就可以知道,此方法用于獲取此編碼解碼所對應文件的擴展名,比如,文件名是 xxxx.gz2,那么這個方法的返回值就是“.bz2”,我們來看下 org.apache.hadoop.io.compress.BZip2Codec 此方法的實現代碼: 圖 9. 代碼示例 圖 9. 代碼示例 ② Codecs 是一個 SortedMap 的示例。這里有個很有意思的地方,它將 Key 值,也就是通過 codec.getDefaultExtension() 方法獲取到的文件擴展名進行了翻轉,舉個例子,比如文件名擴展名“.bz2”,將文件名翻轉之后就變成了“2zb.”。 系統加載完所有的編碼解碼器后,我們可以得到這樣一個有序映射表,如下: 圖 10. 代碼示例 圖 10. 代碼示例 現在編碼解碼器都有了,我們怎么得到對應的編碼解碼器呢?看下面這個方法。 4. getCodec() 方法 此方法用于獲取文件所對應的的編碼解碼器 CompressionCodec.class。 圖 11. 代碼示例 圖 11. 代碼示例 getCodec(Path) 方法的輸入參數是 Path 對象,保存著文件路徑。 ① 將文件名翻轉。如 xxxx.bz2 翻轉成 2zb.xxxx。 ② 獲取 codecs 集合中最接近 2zb.xxxx 的值。此方法有返回值同樣是個 SortMap 對象。 在這里對返回的 SortMap 對象進行第二次篩選。 編碼解碼器 CompressionCodec 剛剛在介紹壓縮解壓工程類 CompressionCodeFactory.class 的時候,我們多次提到了壓縮解壓器 CompressionCodecclass,并且我們在上文中還提到了它其中的一個用于獲取文件擴展名的方法 getDefaultExtension()。 壓縮解壓工程類 CompressionCodeFactory.class 使用的是抽象工廠的設計模式。它是一個接口,制定了一系列方法,用于創建特定壓縮解壓算法。下面我們來看下比較重要的幾個方法: 1. createOutputStream() 方法對數據流進行壓縮。 圖 12. 代碼示例 圖 12. 代碼示例 此方法提供了方法重載。 ① 基于流的壓縮處理; ② 基于壓縮機 Compress.class 的壓縮處理 2. createInputStream() 方法對數據流進行解壓。 圖 13. 代碼示例 圖 13. 代碼示例 這里的解壓方法同樣提供了方法重載。 ① 基于流的解壓處理; ② 基于解壓機 Decompressor.class 的解壓處理; 關于壓縮/解壓流與壓縮/解壓機會在下面的文章中我們會詳細講解。此處暫作了解。 3. getCompressorType() 返回需要的編碼器的類型。 getDefaultExtension() 獲取對應文件擴展名的方法。前文已提到過,不再敖述。 壓縮機 Compressor 和解壓機 Decompressor 前面在編碼解碼器部分的 createInputStream() 和 createInputStream() 方法中我們提到過 Compressor.class 和 Decompressor.class 對象。在 Hadoop 的實現中,數據編碼器和解碼器被抽象成了兩個接口: 1. org.apache.hadoop.io.compress.Compressor; 2. org.apache.hadoop.io.compress.Decompressor; 它們規定了一系列的方法,所以在 Hadoop 內部的編碼/解碼算法實現都需要實現對應的接口。在實際的數據壓縮與解壓縮過程,Hadoop 為用戶提供了統一的 I/O 流處理模式。 我們看一下壓縮機 Compressor.class,代碼如下: 圖 14. 代碼示例 圖 14. 代碼示例 ① setInput() 方法接收數據到內部緩沖區,可以多次調用; ② needsInput() 方法用于檢查緩沖區是否已滿。如果是 false 則說明當前的緩沖區已滿; ③ getBytesRead() 輸入未壓縮字節的總數; ④ getBytesWritten() 輸出壓縮字節的總數; ⑤ finish() 方法結束數據輸入的過程; ⑥ finished() 方法用于檢查是否已經讀取完所有的等待壓縮的數據。如果返回 false,表明壓縮器中還有未讀取的壓縮數據,可以繼續通過 compress() 方法讀??; ⑦ compress() 方法獲取壓縮后的數據,釋放緩沖區空間; ⑧ reset() 方法用于重置壓縮器,以處理新的輸入數據集合; ⑨ end() 方法用于關閉解壓縮器并放棄所有未處理的輸入; ⑩ reinit() 方法更進一步允許使用 Hadoop 的配置系統,重置并重新配置壓縮器; 為了提高壓縮效率,并不是每次用戶調用 setInput() 方法,壓縮機就會立即工作,所以,為了通知壓縮機所有數據已經寫入,必須使用 finish() 方法。finish() 調用結束后,壓縮機緩沖區中保持的已經壓縮的數據,可以繼續通過 compress() 方法獲得。至于要判斷壓縮機中是否還有未讀取的壓縮數據,則需要利用 finished() 方法來判斷。 壓縮流 CompressionOutputStream 和解壓縮流 CompressionInputStream 前文編碼解碼器部分提到過 createInputStream() 方法返回 CompressionOutputStream 對象,createInputStream() 方法返回 CompressionInputStream 對象。這兩個類分別繼承自 java.io.OutputStream 和 java.io.InputStream。從而我們不難理解,這兩個對象的作用了吧。 我們來看下 CompressionInputStream.class 的代碼: 圖 15. 代碼示例 圖 15. 代碼示例 可以看到 CompressionOutputStream 實現了 OutputStream 的 close() 方法和 flush() 方法,但用于輸出數據的 write() 方法以及用于結束壓縮過程并將輸入寫到底層流的 finish() 方法和重置壓縮狀態的 resetState() 方法還是抽象方法,需要 CompressionOutputStream 的子類實現。 Hadoop 壓縮框架中為我們提供了一個實現了 CompressionOutputStream 類通用的子類 CompressorStream.class。 圖 16. 代碼示例 圖 16. 代碼示例 CompressorStream.class 提供了三個不同的構造函數,CompressorStream 需要的底層輸出流 out 和壓縮時使用的壓縮器,都作為參數傳入構造函數。另一個參數是 CompressorStream 工作時使用的緩沖區 buffer 的大小,構造時會利用這個參數分配該緩沖區。第一個可以手動設置緩沖區大小,第二個默認 512,第三個沒有緩沖區且不可使用壓縮器。 圖 17. 代碼示例 圖 17. 代碼示例 在 write()、compress()、finish() 以及 resetState() 方法中,我們發現了壓縮機 Compressor 的身影,前面文章我們已經介紹過壓縮機的的實現過程,通過調用 setInput() 方法將待壓縮數據填充到內部緩沖區,然后調用 needsInput() 方法檢查緩沖區是否已滿,如果緩沖區已滿,將調用 compress() 方法對數據進行壓縮。流程如下圖所示: 圖 18. 調用流程圖 圖 18. 調用流程圖 結束語 本文深入到 Hadoop 平臺壓縮框架內部,對其核心代碼以及各壓縮格式的效率進行對比分析,以幫助讀者在使用 Hadoop 平臺時,可以通過對數據進行壓縮處理來提高數據處理效率。當再次面臨海量數據處理時, Hadoop 平臺的壓縮機制可以讓我們事半功倍。 相關主題 Hadoop 在線 API 《Hadoop 技術內幕深入解析 HADOOP COMMON 和 HDFS 架構設計與實現原理》 developerWorks 開源技術主題:查找豐富的操作信息、工具和項目更新,幫助您掌握開源技術并將其用于 IBM 產品。
          posted on 2017-09-14 17:35 xzc 閱讀(560) 評論(0)  編輯  收藏 所屬分類: hadoop
          主站蜘蛛池模板: 吉林市| 莱西市| 罗田县| 安泽县| 治县。| 武川县| 乌鲁木齐县| 邢台县| 邛崃市| 桓仁| 治县。| 文昌市| 道孚县| 高碑店市| 大田县| 玉林市| 仁寿县| 龙胜| 灵武市| 扶风县| 勐海县| 刚察县| 屏山县| 仁化县| 西乌珠穆沁旗| 连云港市| 西乌| 遵义市| 长岭县| 台中市| 南江县| 阿拉善盟| 顺义区| 安庆市| 米易县| 图木舒克市| 泰兴市| 宁远县| 江口县| 巴塘县| 岐山县|