posts - 495,comments - 227,trackbacks - 0
          <2014年9月>
          31123456
          78910111213
          14151617181920
          21222324252627
          2829301234
          567891011

          常用鏈接

          留言簿(46)

          隨筆分類(476)

          隨筆檔案(495)

          最新隨筆

          搜索

          •  

          積分與排名

          • 積分 - 1396625
          • 排名 - 16

          最新評論

          閱讀排行榜

          評論排行榜

          http://www.rigongyizu.com/hadoop-job-optimize-combinefileinputformat/

          某日,接手了同事寫的從Hadoop集群拷貝數據到另外一個集群的程序,該程序是運行在Hadoop集群上的job。這個job只有map階段,讀取hdfs目錄下數據的數據,然后寫入到另外一個集群。

          顯然,這個程序沒有考慮大數據量的情況,如果輸入目錄下文件很多或數據量很大,就會導致map數很多。而實際上我們需要拷貝的一個數據源就有近 6T,job啟動起來有1w多個map,一下子整個queue的資源就占滿了。雖然通過調整一些參數可以控制map數(也就是并發數),但是無法準確的控 制map數,而且換個數據源又得重新配置參數。

          第一個改進的版本是,加了Reduce過程,以期望通過設置Reduce數量來控制并發數。這樣雖然能精確地控制并發數,但是增加了shuffle 過程,實際運行中發現輸入數據有傾斜(而partition的key由于業務需要無法更改),導致部分機器網絡被打滿,從而影響到了集群中的其他應用。即 使通過 mapred.reduce.parallel.copies 參數來限制shuffle也是治標不治本。這個平白增加的shuffle過程實際上浪費了很多網絡帶寬和IO。

          最理想的情況當然是只有map階段,而且能夠準確的控制并發數了。

          于是,第二個優化版本誕生了。這個job只有map階段,采用CombineFileInputFormat, 它可以將多個小文件打包成一個InputSplit提供給一個Map處理,避免因為大量小文件問題,啟動大量map。通過 mapred.max.split.size 參數可以大概地控制并發數。本以為這樣就能解決問題了,結果又發現了數據傾斜的問題。這種粗略地分splits的方式,導致有的map處理的數據少,有的 map處理的數據多,并不均勻。幾個拖后退的map就導致job的實際運行時間長了一倍多。

          看來只有讓每個map處理的數據量一樣多,才能完美的解決這個問題了。

          第三個版本也誕生了,這次是重寫了CombineFileInputFormat,自己實現getSplits方法。由于輸入數據為SequenceFile格式,因此需要一個SequenceFileRecordReaderWrapper類。

          實現代碼如下:
          CustomCombineSequenceFileInputFormat.java

          import java.io.IOException;
           
          import org.apache.hadoop.classification.InterfaceAudience;
          import org.apache.hadoop.classification.InterfaceStability;
          import org.apache.hadoop.mapreduce.InputSplit;
          import org.apache.hadoop.mapreduce.RecordReader;
          import org.apache.hadoop.mapreduce.TaskAttemptContext;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
          import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
           
          /**
           * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
           * <code>SequenceFileInputFormat</code>.
           *
           * @see CombineFileInputFormat
           */
          @InterfaceAudience.Public
          @InterfaceStability.Stable
          public class CustomCombineSequenceFileInputFormat<K, V> extends MultiFileInputFormat<K, V> {
              @SuppressWarnings({"rawtypes", "unchecked"})
              public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)
                      throws IOException {
                  return new CombineFileRecordReader((CombineFileSplit) split, context,
                          SequenceFileRecordReaderWrapper.class);
              }
           
              /**
               * A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be
               * used in a <code>CombineFileInputFormat</code>-equivalent for
               * <code>SequenceFileInputFormat</code>.
               *
               * @see CombineFileRecordReader
               * @see CombineFileInputFormat
               * @see SequenceFileInputFormat
               */
              private static class SequenceFileRecordReaderWrapper<K, V>
                      extends CombineFileRecordReaderWrapper<K, V> {
                  // this constructor signature is required by CombineFileRecordReader
                  public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context,
                          Integer idx) throws IOException, InterruptedException {
                      super(new SequenceFileInputFormat<K, V>(), split, context, idx);
                  }
              }
          }

          MultiFileInputFormat.java

          import java.io.IOException;
          import java.util.ArrayList;
          import java.util.List;
           
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;
          import org.apache.hadoop.fs.FileStatus;
          import org.apache.hadoop.fs.Path;
          import org.apache.hadoop.mapreduce.InputSplit;
          import org.apache.hadoop.mapreduce.Job;
          import org.apache.hadoop.mapreduce.JobContext;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
           
          /**
           * multiple files can be combined in one InputSplit so that InputSplit number can be limited!
           */
          public abstract class MultiFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
           
              private static final Log LOG = LogFactory.getLog(MultiFileInputFormat.class);
              public static final String CONFNAME_INPUT_SPLIT_MAX_NUM = "multifileinputformat.max_split_num";
              public static final Integer DEFAULT_MAX_SPLIT_NUM = 50;
           
              public static void setMaxInputSplitNum(Job job, Integer maxSplitNum) {
                  job.getConfiguration().setInt(CONFNAME_INPUT_SPLIT_MAX_NUM, maxSplitNum);
              }
           
              @Override
              public List<InputSplit> getSplits(JobContext job) throws IOException {
                  // get all the files in input path
                  List<FileStatus> stats = listStatus(job);
                  List<InputSplit> splits = new ArrayList<InputSplit>();
                  if (stats.size() == 0) {
                      return splits;
                  }
                  // 計算split的平均長度
                  long totalLen = 0;
                  for (FileStatus stat : stats) {
                      totalLen += stat.getLen();
                  }
                  int maxSplitNum = job.getConfiguration().getInt(CONFNAME_INPUT_SPLIT_MAX_NUM, DEFAULT_MAX_SPLIT_NUM);
                  int expectSplitNum = maxSplitNum < stats.size() ? maxSplitNum : stats.size();
                  long averageLen = totalLen / expectSplitNum;
                  LOG.info("Prepare InputSplit : averageLen(" + averageLen + ") totalLen(" + totalLen
                          + ") expectSplitNum(" + expectSplitNum + ") ");
                  // 設置inputSplit
                  List<Path> pathLst = new ArrayList<Path>();
                  List<Long> offsetLst = new ArrayList<Long>();
                  List<Long> lengthLst = new ArrayList<Long>();
                  long currentLen = 0;
                  for (int i = 0; i < stats.size(); i++) {
                      FileStatus stat = stats.get(i);
                      pathLst.add(stat.getPath());
                      offsetLst.add(0L);
                      lengthLst.add(stat.getLen());
                      currentLen += stat.getLen();
                      if (splits.size() < expectSplitNum - 1   && currentLen > averageLen) {
                          Path[] pathArray = new Path[pathLst.size()];
                          CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray),
                              getLongArray(offsetLst), getLongArray(lengthLst), new String[0]);
                          LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                                  + ") length(" + currentLen + ")");
                          splits.add(thissplit);
                          //
                          pathLst.clear();
                          offsetLst.clear();
                          lengthLst.clear();
                          currentLen = 0;
                      }
                  }
                  if (pathLst.size() > 0) {
                      Path[] pathArray = new Path[pathLst.size()];
                      CombineFileSplit thissplit =
                              new CombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst),
                                      getLongArray(lengthLst), new String[0]);
                      LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                              + ") length(" + currentLen + ")");
                      splits.add(thissplit);
                  }
                  return splits;
              }
           
              private long[] getLongArray(List<Long> lst) {
                  long[] rst = new long[lst.size()];
                  for (int i = 0; i < lst.size(); i++) {
                      rst[i] = lst.get(i);
                  }
                  return rst;
              }
          }

          通過 multifileinputformat.max_split_num 參數就可以較為準確的控制map數量,而且會發現每個map處理的數據量很均勻。至此,問題總算解決了。

          posted on 2014-09-16 09:25 SIMONE 閱讀(689) 評論(1)  編輯  收藏 所屬分類: hadoop

          FeedBack:
          # re: 一個Hadoop程序的優化過程 – 根據文件實際大小實現CombineFileInputFormat[未登錄]
          2014-12-20 11:34 | 哈哈
          看了樓主的代碼,但是這種做法以文件為單位,一個文件至多分給一個map處理。如果某個目錄下有許多小文件, 另外還有一個超大文件, 處理大文件的map會嚴重偏慢,這個該怎么辦呢?  回復  更多評論
            
          主站蜘蛛池模板: 金寨县| 顺平县| 德惠市| 江津市| 防城港市| 天津市| 金沙县| 云浮市| 柘城县| 东城区| 镇远县| 二手房| 阿鲁科尔沁旗| 潢川县| 株洲县| 三都| 阳城县| 开封市| 隆子县| 绵竹市| 乌什县| 南投市| 山丹县| 兴安盟| 石景山区| 铁岭县| 泰州市| 广西| 西吉县| 吴旗县| 德庆县| 伊川县| 灵台县| 万山特区| 泗洪县| 惠州市| 班戈县| 新昌县| 分宜县| 牟定县| 肇源县|