march alex's blog
          hello,I am march alex
          posts - 52,comments - 7,trackbacks - 0
          MapReduce是一種可用于數據處理的變成模型。
          這里主要講述Java語言實現MapReduce。
          一個投票模型:
          Jobs很喜歡給女生打分,好看的打100分,難看的打0分。有一次他給Lucy打了0分,結果被Lucy痛打了一頓。
          還有一次,Jobs給兩個美女打分,給美女Alice打了99分,給美女Candice打了98分。
          這個時候Alex就看不下去了,他于是站起來說:“明明是Candice比較好看嘛!”。
          兩人于是爭執起來,為了證明自己的觀點,結果爆發了一場大戰!什么降龍十八掌啊,黯然銷魂掌啊,他們都不會。
          那么怎么才能讓對方輸的心服口服呢?他們想到“群眾的眼睛是雪亮的”!于是他們發動了班上的20名同學進行投票。
          結果出來了,Alice的平均分是98.5,Candice的平均分是99.7,以壓倒性的優勢獲得了勝利。
          但是Jobs不服,于是把班上每個女生的照片放到了網上,讓大家來評分,最后他得到了好多文件,用自己的電腦算根本忙不過來,于是他想到了用Hadoop寫一個MapReduce程序。
          一直輸入文件的格式是:"[user]\t[girlname]\t[point]".例:
          alex    alice   88
          alex    candice 100
          jobs    alice   100
          john    lucy    89

          在這里,我們假設每個人的評分都為0到100的整數,最終的結果向下取整。那么MapReduce程序可以寫成如下:

          我們需要三樣東西:一個map函數,一個reduce函數,和一些用來運行作業的代碼。
          map函數由Mapper接口實現來表示,后者聲明了一個map()方法。
          AverageMapper.java

          import java.io.IOException;

          import org.apache.hadoop.io.IntWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapred.MapReduceBase;
          import org.apache.hadoop.mapred.Mapper;
          import org.apache.hadoop.mapred.OutputCollector;
          import org.apache.hadoop.mapred.Reporter;

          public class AverageMapper extends MapReduceBase
            implements Mapper<IntWritable, Text, Text, IntWritable> {
                  
                  public void map(IntWritable key, Text value,
                                  OutputCollector<Text, IntWritable> output, Reporter reporter)
                                  throws IOException {
                          
                          String s = value.toString();
                          String name = new String();
                          int point = 0;
                          int i;
                          for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
                          for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
                                  name += s.charAt(i);
                          }
                          for(i++;i<s.length();i++) {
                                  point = point * 10 + (s.charAt(i) - '0');
                          }
                          if(name.length() != 0 && point >=0 && point <= 100) {
                                  output.collect(new Text(name), new IntWritable(point));
                          }
                  }
          }

          該Mapper接口是一個泛型類型,他有四個形參類型,分別指定map函數的輸入鍵、輸入值、輸出鍵、輸出值的類型。
          以示例來說,輸入鍵是一個整形偏移量(表示行號),輸入值是一行文本,輸出鍵是美女的姓名,輸出值是美女的得分。
          Hadoop自身提供一套可優化網絡序列化傳輸的基本類型,而不直接使用Java內嵌的類型。這些類型均可在org.apache.hadoop.io包中找到。
          這里我們使用IntWritable類型(相當于Java中的Integer類型)和Text類型(想到與Java中的String類型)。
          map()方法還提供了OutputCollector示例用于輸出內容的寫入。
          我們只在輸入內容格式正確的時候才將其寫入輸出記錄中。

          reduce函數通過Reducer進行類似的定義。
          AverageReducer.java

          import java.io.IOException;
          import java.util.Iterator;

          import org.apache.hadoop.io.IntWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapred.MapReduceBase;
          import org.apache.hadoop.mapred.OutputCollector;
          import org.apache.hadoop.mapred.Reducer;
          import org.apache.hadoop.mapred.Reporter;


          public class AverageReducer extends MapReduceBase
            implements Reducer<Text, IntWritable, Text, IntWritable> {
                  
                  public void reduce(Text key, Iterator<IntWritable> values,
                                  OutputCollector<Text, IntWritable> output, Reporter reporter)
                                  throws IOException {
                          
                          long tot_point = 0, num = 0;
                          while(values.hasNext()) {
                                  tot_point += values.next().get();
                                  num ++;
                          }
                          int ave_point = (int)(tot_point/num);
                          output.collect(key, new IntWritable(ave_point));
                  }
          }

          第三部分代碼負責運行MapReduce作業。
          Average.java
          import java.io.IOException;

          import org.apache.hadoop.fs.Path;
          import org.apache.hadoop.io.IntWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapred.FileInputFormat;
          import org.apache.hadoop.mapred.FileOutputFormat;
          import org.apache.hadoop.mapred.JobConf;


          public class Average {
                  public static void main(String[] args) throws IOException {
                          if(args.length != 2) {
                                  System.err.println("Usage: Average <input path> <output path>");
                                  System.exit(-1);
                          }
                          JobConf conf = new JobConf(Average.class);
                          conf.setJobName("Average");
                          FileInputFormat.addInputPath(conf, new Path(args[0]));
                          FileOutputFormat.setOutputPath(conf, new Path(args[1]));
                          conf.setMapperClass(AverageMapper.class);
                          conf.setReducerClass(AverageReducer.class);
                          conf.setOutputKeyClass(Text.class);
                          conf.setOutputValueClass(IntWritable.class);
                  }
          }

          JobConf對象指定了作業執行規范。我們可以用它來控制整個作業的運行。
          在Hadoop作業上運行著寫作業時,我們需要將代碼打包成一個JAR文件(Hadoop會在集群上分發這些文件)。
          我們無需明確指定JAR文件的名稱,而只需在JobConf的構造函數中傳遞一個類,Hadoop將通過該類查找JAR文件進而找到相關的JAR文件。
          構造JobCOnf對象之后,需要指定輸入和輸出數據的路徑。
          調用FileInputFormat類的靜態方法addInputPath()來定義輸入數據的路徑。
          可以多次調用addInputOath()實現多路徑的輸入。
          調用FileOutputFormat類的靜態函數SetOutputPath()來指定輸出路徑。
          該函數指定了reduce函數輸出文件的寫入目錄。
          在運行任務前該目錄不應該存在,否則Hadoop會報錯并拒絕運行該任務。
          這種預防措施是為了防止數據丟失。
          接著,通過SetMapperClass()和SetReducerClass()指定map和reduce類型。
          輸入類型通過InputFormat類來控制,我們的例子中沒有設置,因為使用的是默認的TextInputFormat(文本輸入格式)。

          新增的Java MapReduce API


          新的Hadoop在版本0.20.0包含有一個新的Java MapReduce API,又是也稱為"上下文對象"(context object),旨在使API在今后更容易擴展。
          新特性:
          傾向于使用虛類,而不是接口;
          新的API放在org.apache.hadoop.mapreduce包中(舊版本org.apache.hadoop.mapred包);
          新的API充分使用上下文對象,使用戶代碼能與MapReduce通信。例如:MapContext基本具備了JobConf,OutputCollector和Reporter的功能;
          新的API同時支持“推”(push)和“拉”(pull)的迭代。這兩類API,均可以將鍵/值對記錄推給mapper,但除此之外,新的API也允許把記錄從map()方法中拉出、對reducer來說是一樣的。拉式處理的好處是可以實現批量處理,而非逐條記錄的處理。
          新的API實現了配置的統一。所有作業的配置均通過Configuration來完成。(區別于舊API的JobConf)。
          新API中作業控制由Job類實現,而非JobClient類,新API中刪除了JobClient類。
          輸出文件的命名稍有不同。

          用新上下文對象來重寫Average應用

          import java.io.IOException;

          import org.apache.hadoop.fs.Path;
          import org.apache.hadoop.io.IntWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapreduce.Job;
          import org.apache.hadoop.mapreduce.Mapper;
          import org.apache.hadoop.mapreduce.Reducer;
          import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
          import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

          public class NewAverage {
                  
                  static class NewAverageMapper
                    extends Mapper<IntWritable, Text, Text, IntWritable> {
                          
                          public void map(IntWritable key, Text value, Context context) 
                                  throws IOException, InterruptedException {
                                  
                                  String s = value.toString();
                                  String name = new String();
                                  int point = 0;
                                  int i;
                                  for(i=0;i<s.length() && s.charAt(i)!='\t';i++);
                                  for(i++;i<s.length() && s.charAt(i)!='\t';i++) {
                                          name += s.charAt(i);
                                  }
                                  for(i++;i<s.length();i++) {
                                          point = point * 10 + (s.charAt(i) - '0');
                                  }
                                  if(name.length() != 0 && point >=0 && point <= 100) {
                                          context.write(new Text(name), new IntWritable(point));
                                  }
                          }
                  }
                  
                  static class NewAverageReducer
                    extends Reducer<Text, IntWritable, Text, IntWritable> {
                          
                          public void reduce(Text key, Iterable<IntWritable> values,
                                          Context context)
                                          throws IOException, InterruptedException {
                                  
                                  long tot_point = 0, num = 0;
                                  for(IntWritable value : values) {
                                          tot_point += value.get();
                                          num ++;
                                  }
                                  int ave_point = (int)(tot_point/num);
                                  context.write(key, new IntWritable(ave_point));
                          }
                  }
                  
                  public static void main(String[] args) throws Exception {
                          if(args.length != 2) {
                                  System.err.println("Usage: NewAverage <input path> <output path>");
                                  System.exit(-1);
                          }
                          Job job = new Job();
                          job.setJarByClass(NewAverage.class);
                          
                          FileInputFormat.addInputPath(job, new Path(args[0]));
                          FileOutputFormat.setOutputPath(job, new Path(args[1]));
                          
                          job.setMapperClass(NewAverageMapper.class);
                          job.setReducerClass(NewAverageReducer.class);
                          job.setOutputKeyClass(Text.class);
                          job.setOutputValueClass(IntWritable.class);
                          
                          System.exit(job.waitForCompletion(true) ? 0 : 1);
                  }
          }
          posted on 2015-03-08 13:27 marchalex 閱讀(1384) 評論(0)  編輯  收藏 所屬分類: java小程序
          主站蜘蛛池模板: 舟山市| 广汉市| 四平市| 青神县| 平南县| 延庆县| 贵阳市| 长春市| 宁陵县| 三门峡市| 虹口区| 阳新县| 保德县| 宜兰县| 闽清县| 西峡县| 包头市| 聊城市| 资中县| 陇西县| 高淳县| 涿鹿县| 东乡县| 静安区| 南投市| 安化县| 丰宁| 焦作市| 普陀区| 蒙阴县| 北京市| 常德市| 平顶山市| 双牌县| 札达县| 安徽省| 北京市| 广河县| 刚察县| 台南市| 阿拉善右旗|