posts - 495,comments - 227,trackbacks - 0
          http://www.rigongyizu.com/use-multiinputformat-read-different-files-in-one-job/

          hadoop中提供了 MultiOutputFormat 能將結(jié)果數(shù)據(jù)輸出到不同的目錄,也提供了 FileInputFormat 來一次讀取多個(gè)目錄的數(shù)據(jù),但是默認(rèn)一個(gè)job只能使用 job.setInputFormatClass 設(shè)置使用一個(gè)inputfomat處理一種格式的數(shù)據(jù)。如果需要實(shí)現(xiàn) 在一個(gè)job中同時(shí)讀取來自不同目錄的不同格式文件 的功能,就需要自己實(shí)現(xiàn)一個(gè) MultiInputFormat 來讀取不同格式的文件了(原來已經(jīng)提供了MultipleInputs)。

          例如:有一個(gè)mapreduce job需要同時(shí)讀取兩種格式的數(shù)據(jù),一種格式是普通的文本文件,用 LineRecordReader 一行一行讀取;另外一種文件是偽XML文件,用自定義的AJoinRecordReader讀取。

          自己實(shí)現(xiàn)了一個(gè)簡單的 MultiInputFormat 如下:

          import org.apache.hadoop.io.LongWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapreduce.InputSplit;
          import org.apache.hadoop.mapreduce.RecordReader;
          import org.apache.hadoop.mapreduce.TaskAttemptContext;
          import org.apache.hadoop.mapreduce.lib.input.FileSplit;
          import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
          import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
           
          public class MultiInputFormat extends TextInputFormat {
           
              @Override
              public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
                  RecordReader reader = null;
                  try {
                      String inputfile = ((FileSplit) split).getPath().toString();
                      String xmlpath = context.getConfiguration().get("xml_prefix");
                      String textpath = context.getConfiguration().get("text_prefix");
           
                      if (-1 != inputfile.indexOf(xmlpath)) {
                          reader = new AJoinRecordReader();
                      } else if (-1 != inputfile.indexOf(textpath)) {
                          reader = new LineRecordReader();
                      } else {
                          reader = new LineRecordReader();
                      }
                  } catch (IOException e) {
                      // do something ...
                  }
           
                  return reader;
              }
          }

          其實(shí)原理很簡單,就是在 createRecordReader 的時(shí)候,通過 ((FileSplit) split).getPath().toString() 獲取到當(dāng)前要處理的文件名,然后根據(jù)特征匹配,選取對(duì)應(yīng)的 RecordReader 即可。xml_prefix和text_prefix可以在程序啟動(dòng)時(shí)通過 -D 傳給Configuration。

          比如某次執(zhí)行打印的值如下:

          inputfile=hdfs://test042092.sqa.cm4:9000/test/input_xml/common-part-00068
          xmlpath_prefix=hdfs://test042092.sqa.cm4:9000/test/input_xml
          textpath_prefix=hdfs://test042092.sqa.cm4:9000/test/input_txt

          這里只是通過簡單的文件路徑和標(biāo)示符匹配來做,也可以采用更復(fù)雜的方法,比如文件名、文件后綴等。

          接著在map類中,也同樣可以根據(jù)不同的文件名特征進(jìn)行不同的處理:

          @Override
          public void map(LongWritable offset, Text inValue, Context context)
                  throws IOException {
           
              String inputfile = ((FileSplit) context.getInputSplit()).getPath()
                      .toString();
           
              if (-1 != inputfile.indexOf(textpath)) {
                  ......
              } else if (-1 != inputfile.indexOf(xmlpath)) {
                  ......
              } else {
                  ......
              }
          }

          這種方式太土了,原來hadoop里面已經(jīng)提供了 MultipleInputs 來實(shí)現(xiàn)對(duì)一個(gè)目錄指定一個(gè)inputformat和對(duì)應(yīng)的map處理類。

          MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
             MapClass.class);
          MultipleInputs.addInputPath(conf, new Path("/bar"),
             KeyValueTextInputFormat.class, MapClass2.class);
          posted on 2014-09-16 09:27 SIMONE 閱讀(2755) 評(píng)論(0)  編輯  收藏 所屬分類: hadoop
          主站蜘蛛池模板: 屏山县| 沁源县| 三台县| 滕州市| 东方市| 安新县| 灵寿县| 普安县| 同江市| 天津市| 靖安县| 大姚县| 民丰县| 游戏| 镇江市| 弋阳县| 乐清市| 孟连| 图们市| 开鲁县| 宁化县| 法库县| 丽江市| 嘉禾县| 静安区| 天祝| 辰溪县| 维西| 平南县| 五寨县| 衡南县| 玛纳斯县| 广水市| 文安县| 义乌市| 雷山县| 泸西县| 廊坊市| 南皮县| 遂昌县| 平阳县|