paulwong

          Implementation for CombineFileInputFormat Hadoop 0.20.205

          運行MAPREDUCE JOB時,如果輸入的文件比較小而多時,默認情況下會生成很多的MAP JOB,即一個文件一個MAP JOB,因此需要優化,使多個文件能合成一個MAP JOB的輸入。

          具體的原理是下述三步:

          1.根據輸入目錄下的每個文件,如果其長度超過mapred.max.split.size,以block為單位分成多個split(一個split是一個map的輸入),每個split的長度都大于mapred.max.split.size, 因為以block為單位, 因此也會大于blockSize, 此文件剩下的長度如果大于mapred.min.split.size.per.node, 則生成一個split, 否則先暫時保留.

          2. 現在剩下的都是一些長度效短的碎片,把每個rack下碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一個split, 否則暫時保留.

          3. 把不同rack下的碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 剩下的碎片無論長度, 合并成一個split.
          舉例: mapred.max.split.size=1000
          mapred.min.split.size.per.node=300
          mapred.min.split.size.per.rack=100
          輸入目錄下五個文件,rack1下三個文件,長度為2050,1499,10, rack2下兩個文件,長度為1010,80. 另外blockSize為500.
          經過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片為rack1下:50,10; rack2下10:80
          由于兩個rack下的碎片和都不超過100, 所以經過第二步, split和碎片都沒有變化.
          第三步,合并四個碎片成一個split, 長度為150.

          如果要減少map數量, 可以調大mapred.max.split.size, 否則調小即可.

          其特點是: 一個塊至多作為一個map的輸入,一個文件可能有多個塊,一個文件可能因為塊多分給做為不同map的輸入, 一個map可能處理多個塊,可能處理多個文件。

          注:CombineFileInputFormat是一個抽象類,需要編寫一個繼承類。


          import java.io.IOException;

          import org.apache.hadoop.conf.Configuration;
          import org.apache.hadoop.io.LongWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapred.FileSplit;
          import org.apache.hadoop.mapred.InputSplit;
          import org.apache.hadoop.mapred.JobConf;
          import org.apache.hadoop.mapred.LineRecordReader;
          import org.apache.hadoop.mapred.RecordReader;
          import org.apache.hadoop.mapred.Reporter;
          import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
          import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
          import org.apache.hadoop.mapred.lib.CombineFileSplit;

          @SuppressWarnings("deprecation")
          public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {

              @SuppressWarnings({ "unchecked", "rawtypes" })
              @Override
              public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {

                  return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
              }

              public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
                  private final LineRecordReader linerecord;

                  public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
                      FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
                      linerecord = new LineRecordReader(conf, filesplit);
                  }

                  @Override
                  public void close() throws IOException {
                      linerecord.close();

                  }

                  @Override
                  public LongWritable createKey() {
                      // TODO Auto-generated method stub
                      return linerecord.createKey();
                  }

                  @Override
                  public Text createValue() {
                      // TODO Auto-generated method stub
                      return linerecord.createValue();
                  }

                  @Override
                  public long getPos() throws IOException {
                      // TODO Auto-generated method stub
                      return linerecord.getPos();
                  }

                  @Override
                  public float getProgress() throws IOException {
                      // TODO Auto-generated method stub
                      return linerecord.getProgress();
                  }

                  @Override
                  public boolean next(LongWritable key, Text value) throws IOException {

                      // TODO Auto-generated method stub
                      return linerecord.next(key, value);
                  }

              }
          }


          在運行時這樣設置:

          if (argument != null) {
                          conf.set("mapred.max.split.size", argument);
                      } else {
                          conf.set("mapred.max.split.size", "134217728"); // 128 MB
                      }
          //

                      conf.setInputFormat(CombinedInputFormat.class);


          posted on 2013-08-29 16:08 paulwong 閱讀(387) 評論(0)  編輯  收藏 所屬分類: 分布式HADOOP 、云計算

          主站蜘蛛池模板: 竹北市| 拜城县| 汤阴县| 赞皇县| 辽源市| 平利县| 镇雄县| 永兴县| 顺平县| 日照市| 铁岭市| 鹤山市| 塔河县| 临桂县| 介休市| 温州市| 新田县| 象州县| 井陉县| 北辰区| 石台县| 高安市| 舞阳县| 金山区| 邯郸县| 英超| 祁连县| 新野县| 呼和浩特市| 普洱| 崇州市| 岱山县| 乌海市| 荣昌县| 江山市| 会理县| 卢湾区| 桂阳县| 双江| 壶关县| 新化县|