posts - 495,comments - 227,trackbacks - 0
          http://www.rigongyizu.com/hadoop-job-optimize-combinefileinputformat/

          某日,接手了同事寫的從Hadoop集群拷貝數(shù)據(jù)到另外一個集群的程序,該程序是運行在Hadoop集群上的job。這個job只有map階段,讀取hdfs目錄下數(shù)據(jù)的數(shù)據(jù),然后寫入到另外一個集群。

          顯然,這個程序沒有考慮大數(shù)據(jù)量的情況,如果輸入目錄下文件很多或數(shù)據(jù)量很大,就會導致map數(shù)很多。而實際上我們需要拷貝的一個數(shù)據(jù)源就有近 6T,job啟動起來有1w多個map,一下子整個queue的資源就占滿了。雖然通過調(diào)整一些參數(shù)可以控制map數(shù)(也就是并發(fā)數(shù)),但是無法準確的控 制map數(shù),而且換個數(shù)據(jù)源又得重新配置參數(shù)。

          第一個改進的版本是,加了Reduce過程,以期望通過設(shè)置Reduce數(shù)量來控制并發(fā)數(shù)。這樣雖然能精確地控制并發(fā)數(shù),但是增加了shuffle 過程,實際運行中發(fā)現(xiàn)輸入數(shù)據(jù)有傾斜(而partition的key由于業(yè)務(wù)需要無法更改),導致部分機器網(wǎng)絡(luò)被打滿,從而影響到了集群中的其他應(yīng)用。即 使通過 mapred.reduce.parallel.copies 參數(shù)來限制shuffle也是治標不治本。這個平白增加的shuffle過程實際上浪費了很多網(wǎng)絡(luò)帶寬和IO。

          最理想的情況當然是只有map階段,而且能夠準確的控制并發(fā)數(shù)了。

          于是,第二個優(yōu)化版本誕生了。這個job只有map階段,采用CombineFileInputFormat, 它可以將多個小文件打包成一個InputSplit提供給一個Map處理,避免因為大量小文件問題,啟動大量map。通過 mapred.max.split.size 參數(shù)可以大概地控制并發(fā)數(shù)。本以為這樣就能解決問題了,結(jié)果又發(fā)現(xiàn)了數(shù)據(jù)傾斜的問題。這種粗略地分splits的方式,導致有的map處理的數(shù)據(jù)少,有的 map處理的數(shù)據(jù)多,并不均勻。幾個拖后退的map就導致job的實際運行時間長了一倍多。

          看來只有讓每個map處理的數(shù)據(jù)量一樣多,才能完美的解決這個問題了。

          第三個版本也誕生了,這次是重寫了CombineFileInputFormat,自己實現(xiàn)getSplits方法。由于輸入數(shù)據(jù)為SequenceFile格式,因此需要一個SequenceFileRecordReaderWrapper類。

          實現(xiàn)代碼如下:
          CustomCombineSequenceFileInputFormat.java

          import java.io.IOException;
           
          import org.apache.hadoop.classification.InterfaceAudience;
          import org.apache.hadoop.classification.InterfaceStability;
          import org.apache.hadoop.mapreduce.InputSplit;
          import org.apache.hadoop.mapreduce.RecordReader;
          import org.apache.hadoop.mapreduce.TaskAttemptContext;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
          import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
           
          /**
           * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
           * <code>SequenceFileInputFormat</code>.
           *
           * @see CombineFileInputFormat
           */
          @InterfaceAudience.Public
          @InterfaceStability.Stable
          public class CustomCombineSequenceFileInputFormat<K, V> extends MultiFileInputFormat<K, V> {
              @SuppressWarnings({"rawtypes", "unchecked"})
              public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)
                      throws IOException {
                  return new CombineFileRecordReader((CombineFileSplit) split, context,
                          SequenceFileRecordReaderWrapper.class);
              }
           
              /**
               * A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be
               * used in a <code>CombineFileInputFormat</code>-equivalent for
               * <code>SequenceFileInputFormat</code>.
               *
               * @see CombineFileRecordReader
               * @see CombineFileInputFormat
               * @see SequenceFileInputFormat
               */
              private static class SequenceFileRecordReaderWrapper<K, V>
                      extends CombineFileRecordReaderWrapper<K, V> {
                  // this constructor signature is required by CombineFileRecordReader
                  public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context,
                          Integer idx) throws IOException, InterruptedException {
                      super(new SequenceFileInputFormat<K, V>(), split, context, idx);
                  }
              }
          }

          MultiFileInputFormat.java

          import java.io.IOException;
          import java.util.ArrayList;
          import java.util.List;
           
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;
          import org.apache.hadoop.fs.FileStatus;
          import org.apache.hadoop.fs.Path;
          import org.apache.hadoop.mapreduce.InputSplit;
          import org.apache.hadoop.mapreduce.Job;
          import org.apache.hadoop.mapreduce.JobContext;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
           
          /**
           * multiple files can be combined in one InputSplit so that InputSplit number can be limited!
           */
          public abstract class MultiFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
           
              private static final Log LOG = LogFactory.getLog(MultiFileInputFormat.class);
              public static final String CONFNAME_INPUT_SPLIT_MAX_NUM = "multifileinputformat.max_split_num";
              public static final Integer DEFAULT_MAX_SPLIT_NUM = 50;
           
              public static void setMaxInputSplitNum(Job job, Integer maxSplitNum) {
                  job.getConfiguration().setInt(CONFNAME_INPUT_SPLIT_MAX_NUM, maxSplitNum);
              }
           
              @Override
              public List<InputSplit> getSplits(JobContext job) throws IOException {
                  // get all the files in input path
                  List<FileStatus> stats = listStatus(job);
                  List<InputSplit> splits = new ArrayList<InputSplit>();
                  if (stats.size() == 0) {
                      return splits;
                  }
                  // 計算split的平均長度
                  long totalLen = 0;
                  for (FileStatus stat : stats) {
                      totalLen += stat.getLen();
                  }
                  int maxSplitNum = job.getConfiguration().getInt(CONFNAME_INPUT_SPLIT_MAX_NUM, DEFAULT_MAX_SPLIT_NUM);
                  int expectSplitNum = maxSplitNum < stats.size() ? maxSplitNum : stats.size();
                  long averageLen = totalLen / expectSplitNum;
                  LOG.info("Prepare InputSplit : averageLen(" + averageLen + ") totalLen(" + totalLen
                          + ") expectSplitNum(" + expectSplitNum + ") ");
                  // 設(shè)置inputSplit
                  List<Path> pathLst = new ArrayList<Path>();
                  List<Long> offsetLst = new ArrayList<Long>();
                  List<Long> lengthLst = new ArrayList<Long>();
                  long currentLen = 0;
                  for (int i = 0; i < stats.size(); i++) {
                      FileStatus stat = stats.get(i);
                      pathLst.add(stat.getPath());
                      offsetLst.add(0L);
                      lengthLst.add(stat.getLen());
                      currentLen += stat.getLen();
                      if (splits.size() < expectSplitNum - 1   && currentLen > averageLen) {
                          Path[] pathArray = new Path[pathLst.size()];
                          CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray),
                              getLongArray(offsetLst), getLongArray(lengthLst), new String[0]);
                          LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                                  + ") length(" + currentLen + ")");
                          splits.add(thissplit);
                          //
                          pathLst.clear();
                          offsetLst.clear();
                          lengthLst.clear();
                          currentLen = 0;
                      }
                  }
                  if (pathLst.size() > 0) {
                      Path[] pathArray = new Path[pathLst.size()];
                      CombineFileSplit thissplit =
                              new CombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst),
                                      getLongArray(lengthLst), new String[0]);
                      LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                              + ") length(" + currentLen + ")");
                      splits.add(thissplit);
                  }
                  return splits;
              }
           
              private long[] getLongArray(List<Long> lst) {
                  long[] rst = new long[lst.size()];
                  for (int i = 0; i < lst.size(); i++) {
                      rst[i] = lst.get(i);
                  }
                  return rst;
              }
          }

          通過 multifileinputformat.max_split_num 參數(shù)就可以較為準確的控制map數(shù)量,而且會發(fā)現(xiàn)每個map處理的數(shù)據(jù)量很均勻。至此,問題總算解決了。

          posted on 2014-09-16 09:25 SIMONE 閱讀(689) 評論(1)  編輯  收藏 所屬分類: hadoop

          FeedBack:
          # re: 一個Hadoop程序的優(yōu)化過程 – 根據(jù)文件實際大小實現(xiàn)CombineFileInputFormat[未登錄]
          2014-12-20 11:34 | 哈哈
          看了樓主的代碼,但是這種做法以文件為單位,一個文件至多分給一個map處理。如果某個目錄下有許多小文件, 另外還有一個超大文件, 處理大文件的map會嚴重偏慢,這個該怎么辦呢?  回復(fù)  更多評論
            
          主站蜘蛛池模板: 西吉县| 南昌县| 凉城县| 韩城市| 常德市| 泽州县| 简阳市| 仙游县| 威远县| 桦川县| 绥宁县| 平江县| 湄潭县| 贵港市| 冷水江市| 贺州市| 拉孜县| 宁南县| 长乐市| 崇礼县| 两当县| 宁都县| 镇赉县| 平山县| 邵东县| 扬州市| 东乌珠穆沁旗| 吉安市| 离岛区| 大田县| 霍城县| 许昌市| 定州市| 会泽县| 清远市| 塔城市| 曲麻莱县| 中牟县| 大姚县| 青川县| 万荣县|