posts - 495,comments - 227,trackbacks - 0
          http://www.rigongyizu.com/hadoop-job-optimize-combinefileinputformat/

          某日,接手了同事寫的從Hadoop集群拷貝數(shù)據(jù)到另外一個集群的程序,該程序是運行在Hadoop集群上的job。這個job只有map階段,讀取hdfs目錄下數(shù)據(jù)的數(shù)據(jù),然后寫入到另外一個集群。

          顯然,這個程序沒有考慮大數(shù)據(jù)量的情況,如果輸入目錄下文件很多或數(shù)據(jù)量很大,就會導(dǎo)致map數(shù)很多。而實際上我們需要拷貝的一個數(shù)據(jù)源就有近 6T,job啟動起來有1w多個map,一下子整個queue的資源就占滿了。雖然通過調(diào)整一些參數(shù)可以控制map數(shù)(也就是并發(fā)數(shù)),但是無法準(zhǔn)確的控 制map數(shù),而且換個數(shù)據(jù)源又得重新配置參數(shù)。

          第一個改進(jìn)的版本是,加了Reduce過程,以期望通過設(shè)置Reduce數(shù)量來控制并發(fā)數(shù)。這樣雖然能精確地控制并發(fā)數(shù),但是增加了shuffle 過程,實際運行中發(fā)現(xiàn)輸入數(shù)據(jù)有傾斜(而partition的key由于業(yè)務(wù)需要無法更改),導(dǎo)致部分機(jī)器網(wǎng)絡(luò)被打滿,從而影響到了集群中的其他應(yīng)用。即 使通過 mapred.reduce.parallel.copies 參數(shù)來限制shuffle也是治標(biāo)不治本。這個平白增加的shuffle過程實際上浪費了很多網(wǎng)絡(luò)帶寬和IO。

          最理想的情況當(dāng)然是只有map階段,而且能夠準(zhǔn)確的控制并發(fā)數(shù)了。

          于是,第二個優(yōu)化版本誕生了。這個job只有map階段,采用CombineFileInputFormat, 它可以將多個小文件打包成一個InputSplit提供給一個Map處理,避免因為大量小文件問題,啟動大量map。通過 mapred.max.split.size 參數(shù)可以大概地控制并發(fā)數(shù)。本以為這樣就能解決問題了,結(jié)果又發(fā)現(xiàn)了數(shù)據(jù)傾斜的問題。這種粗略地分splits的方式,導(dǎo)致有的map處理的數(shù)據(jù)少,有的 map處理的數(shù)據(jù)多,并不均勻。幾個拖后退的map就導(dǎo)致job的實際運行時間長了一倍多。

          看來只有讓每個map處理的數(shù)據(jù)量一樣多,才能完美的解決這個問題了。

          第三個版本也誕生了,這次是重寫了CombineFileInputFormat,自己實現(xiàn)getSplits方法。由于輸入數(shù)據(jù)為SequenceFile格式,因此需要一個SequenceFileRecordReaderWrapper類。

          實現(xiàn)代碼如下:
          CustomCombineSequenceFileInputFormat.java

          import java.io.IOException;
           
          import org.apache.hadoop.classification.InterfaceAudience;
          import org.apache.hadoop.classification.InterfaceStability;
          import org.apache.hadoop.mapreduce.InputSplit;
          import org.apache.hadoop.mapreduce.RecordReader;
          import org.apache.hadoop.mapreduce.TaskAttemptContext;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
          import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
           
          /**
           * Input format that is a <code>CombineFileInputFormat</code>-equivalent for
           * <code>SequenceFileInputFormat</code>.
           *
           * @see CombineFileInputFormat
           */
          @InterfaceAudience.Public
          @InterfaceStability.Stable
          public class CustomCombineSequenceFileInputFormat<K, V> extends MultiFileInputFormat<K, V> {
              @SuppressWarnings({"rawtypes", "unchecked"})
              public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)
                      throws IOException {
                  return new CombineFileRecordReader((CombineFileSplit) split, context,
                          SequenceFileRecordReaderWrapper.class);
              }
           
              /**
               * A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be
               * used in a <code>CombineFileInputFormat</code>-equivalent for
               * <code>SequenceFileInputFormat</code>.
               *
               * @see CombineFileRecordReader
               * @see CombineFileInputFormat
               * @see SequenceFileInputFormat
               */
              private static class SequenceFileRecordReaderWrapper<K, V>
                      extends CombineFileRecordReaderWrapper<K, V> {
                  // this constructor signature is required by CombineFileRecordReader
                  public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context,
                          Integer idx) throws IOException, InterruptedException {
                      super(new SequenceFileInputFormat<K, V>(), split, context, idx);
                  }
              }
          }

          MultiFileInputFormat.java

          import java.io.IOException;
          import java.util.ArrayList;
          import java.util.List;
           
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;
          import org.apache.hadoop.fs.FileStatus;
          import org.apache.hadoop.fs.Path;
          import org.apache.hadoop.mapreduce.InputSplit;
          import org.apache.hadoop.mapreduce.Job;
          import org.apache.hadoop.mapreduce.JobContext;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
          import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
           
          /**
           * multiple files can be combined in one InputSplit so that InputSplit number can be limited!
           */
          public abstract class MultiFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
           
              private static final Log LOG = LogFactory.getLog(MultiFileInputFormat.class);
              public static final String CONFNAME_INPUT_SPLIT_MAX_NUM = "multifileinputformat.max_split_num";
              public static final Integer DEFAULT_MAX_SPLIT_NUM = 50;
           
              public static void setMaxInputSplitNum(Job job, Integer maxSplitNum) {
                  job.getConfiguration().setInt(CONFNAME_INPUT_SPLIT_MAX_NUM, maxSplitNum);
              }
           
              @Override
              public List<InputSplit> getSplits(JobContext job) throws IOException {
                  // get all the files in input path
                  List<FileStatus> stats = listStatus(job);
                  List<InputSplit> splits = new ArrayList<InputSplit>();
                  if (stats.size() == 0) {
                      return splits;
                  }
                  // 計算split的平均長度
                  long totalLen = 0;
                  for (FileStatus stat : stats) {
                      totalLen += stat.getLen();
                  }
                  int maxSplitNum = job.getConfiguration().getInt(CONFNAME_INPUT_SPLIT_MAX_NUM, DEFAULT_MAX_SPLIT_NUM);
                  int expectSplitNum = maxSplitNum < stats.size() ? maxSplitNum : stats.size();
                  long averageLen = totalLen / expectSplitNum;
                  LOG.info("Prepare InputSplit : averageLen(" + averageLen + ") totalLen(" + totalLen
                          + ") expectSplitNum(" + expectSplitNum + ") ");
                  // 設(shè)置inputSplit
                  List<Path> pathLst = new ArrayList<Path>();
                  List<Long> offsetLst = new ArrayList<Long>();
                  List<Long> lengthLst = new ArrayList<Long>();
                  long currentLen = 0;
                  for (int i = 0; i < stats.size(); i++) {
                      FileStatus stat = stats.get(i);
                      pathLst.add(stat.getPath());
                      offsetLst.add(0L);
                      lengthLst.add(stat.getLen());
                      currentLen += stat.getLen();
                      if (splits.size() < expectSplitNum - 1   && currentLen > averageLen) {
                          Path[] pathArray = new Path[pathLst.size()];
                          CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray),
                              getLongArray(offsetLst), getLongArray(lengthLst), new String[0]);
                          LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                                  + ") length(" + currentLen + ")");
                          splits.add(thissplit);
                          //
                          pathLst.clear();
                          offsetLst.clear();
                          lengthLst.clear();
                          currentLen = 0;
                      }
                  }
                  if (pathLst.size() > 0) {
                      Path[] pathArray = new Path[pathLst.size()];
                      CombineFileSplit thissplit =
                              new CombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst),
                                      getLongArray(lengthLst), new String[0]);
                      LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                              + ") length(" + currentLen + ")");
                      splits.add(thissplit);
                  }
                  return splits;
              }
           
              private long[] getLongArray(List<Long> lst) {
                  long[] rst = new long[lst.size()];
                  for (int i = 0; i < lst.size(); i++) {
                      rst[i] = lst.get(i);
                  }
                  return rst;
              }
          }

          通過 multifileinputformat.max_split_num 參數(shù)就可以較為準(zhǔn)確的控制map數(shù)量,而且會發(fā)現(xiàn)每個map處理的數(shù)據(jù)量很均勻。至此,問題總算解決了。

          posted on 2014-09-16 09:25 SIMONE 閱讀(690) 評論(1)  編輯  收藏 所屬分類: hadoop

          FeedBack:
          # re: 一個Hadoop程序的優(yōu)化過程 – 根據(jù)文件實際大小實現(xiàn)CombineFileInputFormat[未登錄]
          2014-12-20 11:34 | 哈哈
          看了樓主的代碼,但是這種做法以文件為單位,一個文件至多分給一個map處理。如果某個目錄下有許多小文件, 另外還有一個超大文件, 處理大文件的map會嚴(yán)重偏慢,這個該怎么辦呢?  回復(fù)  更多評論
            
          主站蜘蛛池模板: 宾川县| 建湖县| 鄯善县| 郓城县| 新化县| 嘉义县| 河西区| 宁陵县| 茌平县| 石嘴山市| 太原市| 梅州市| 曲水县| 绥江县| 遂宁市| 融水| 托里县| 伊金霍洛旗| 连江县| 深圳市| 青阳县| 阳曲县| 延长县| 隆回县| 建平县| 承德县| 沂南县| 屏山县| 依安县| 巧家县| 开鲁县| 大英县| 色达县| 开化县| 祁东县| 巴林左旗| 濮阳市| 汕头市| 康保县| 虎林市| 福州市|