莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          第一個MapReduce任務

          Posted on 2008-08-23 11:08 dennis 閱讀(2666) 評論(0)  編輯  收藏 所屬分類: javaHadoop與分布式
              前兩天在公司內網上搭了個2個節點hadoop集群,暫時沒有多大實際意義,僅用作自己的測試。遇到的問題在阿里巴巴這位仁兄的《Hadoop集群配置和使用技巧》都有提到的。也遇到了reduce任務卡住的問題,只需要在每個節點的/etc/hosts將集群中的機器都配置上即可解決。
             今天將一個日志統計任務用Hadoop MapReduce框架重新實現了一次,數據量并不大,每天分析一個2G多的日志文件罷了。先前是用Ruby配合cat、grep命令搞定,運行一次在50多秒左右,如果純粹采用Ruby的話CPU占用率非常高而且慢的無法忍受,利用IO.popen調用linux的cat、grep命令先期處理就好多了。看看這個MapReduce任務:
          public class GameCount extends Configured implements
                  org.apache.hadoop.util.Tool {
              
          public static class MapClass extends MapReduceBase implements
                      Mapper
          <LongWritable, Text, Text, IntWritable> {

                  
          private Pattern pattern;

                  
          public void configure(JobConf job) {
                      String gameName 
          = job.get("mapred.mapper.game");
                      pattern 
          = Pattern.compile("play\\sgame\\s" + gameName
                              
          + ".*uid=(\\d+),score=(-?\\d+),money=(-?\\d+)");
                  }

                  @Override
                  
          public void map(LongWritable key, Text value,
                          OutputCollector
          <Text, IntWritable> output, Reporter reporter)
                          
          throws IOException {
                      String text 
          = value.toString();
                      Matcher matcher 
          = pattern.matcher(text);
                      
          int total = 0// 總次數
                      while (matcher.find()) {
                          
          int record = Integer.parseInt(matcher.group(2));
                          output.collect(
          new Text(matcher.group(1)), new IntWritable(
                                  record));
                          total 
          += 1;
                      }
                      output.collect(
          new Text("total"), new IntWritable(total));
                  }
              }

              
          public static class ReduceClass extends MapReduceBase implements
                      Reducer
          <Text, IntWritable, Text, IntWritable> {

                  @Override
                  
          public void reduce(Text key, Iterator<IntWritable> values,
                          OutputCollector
          <Text, IntWritable> output, Reporter reporter)
                          
          throws IOException {
                      
          int sum = 0;
                      
          while (values.hasNext()) {
                          sum 
          += values.next().get();
                      }
                      output.collect(key, 
          new IntWritable(sum));
                  }

              }

              
          static int printUsage() {
                  System.out
                          .println(
          "gamecount [-m <maps>] [-r <reduces>] <input> <output> <gamename>");
                  ToolRunner.printGenericCommandUsage(System.out);
                  
          return -1;
              }

             
          public int run(String[] args) throws Exception {
                  JobConf conf 
          = new JobConf(getConf(), GameCount.class);
                  conf.setJobName(
          "gamecount");

                
          conf.setOutputKeyClass(Text.class);
                  conf.setOutputValueClass(IntWritable.class);

                  conf.setMapperClass(MapClass.
          class);
                  conf.setCombinerClass(ReduceClass.
          class);
                  conf.setReducerClass(ReduceClass.
          class);

                  List
          <String> other_args = new ArrayList<String>();
                  
          for (int i = 0; i < args.length; ++i) {
                      
          try {
                          
          if ("-m".equals(args[i])) {
                              conf.setNumMapTasks(Integer.parseInt(args[
          ++i]));
                          } 
          else if ("-r".equals(args[i])) {
                              conf.setNumReduceTasks(Integer.parseInt(args[
          ++i]));
                          } 
          else {
                              other_args.add(args[i]);
                          }
                      } 
          catch (NumberFormatException except) {
                          System.out.println(
          "ERROR: Integer expected instead of "
                                  
          + args[i]);
                          
          return printUsage();
                      } 
          catch (ArrayIndexOutOfBoundsException except) {
                          System.out.println(
          "ERROR: Required parameter missing from "
                                  
          + args[i - 1]);
                          
          return printUsage();
                      }
                  }
                  
          // Make sure there are exactly 2 parameters left.
                  if (other_args.size() != 3) {
                      System.out.println(
          "ERROR: Wrong number of parameters: "
                              
          + other_args.size() + " instead of 2.");
                      
          return printUsage();
                  }
                  FileInputFormat.setInputPaths(conf, other_args.get(
          0));
                  FileOutputFormat.setOutputPath(conf, 
          new Path(other_args.get(1)));
                  conf.set(
          "mapred.mapper.game", args[2]);
                  JobClient.runJob(conf);
                  
          return 0;
              }

              
          public static void main(String[] args) throws Exception {
                  
          long start = System.nanoTime();
                  
          int res = ToolRunner.run(new Configuration(), new GameCount(), args);
                  System.out.println(
          "running time:" + (System.nanoTime() - start)
                          
          / 1000000 + " ms");
                  System.exit(res);
              }

          }
              代碼沒啥好解釋的,就是分析類似"play game DouDiZhu result:uid=1871653,score=-720,money=0"這樣的字符串,分析每天玩家玩游戲的次數、分數等。打包成GameCount.jar,執行:
          hadoop jar GameCount.jar test.GameCount /usr/logs/test.log /usr/output GameName

             統計的運行時間在100多秒,適當增加map和reduce任務個數沒有多大改善,不過CPU占用率還是挺高的。


          主站蜘蛛池模板: 济阳县| 桃源县| 沂水县| 鹿泉市| 长岭县| 瓮安县| 拜泉县| 迁西县| 德江县| 桓仁| 栖霞市| 婺源县| 巴林左旗| 黑龙江省| 巨鹿县| 嘉义市| 通州市| 西林县| 怀宁县| 旌德县| 巨鹿县| 凤翔县| 哈尔滨市| 鸡泽县| 儋州市| 泰顺县| 乌恰县| 大渡口区| 海门市| 柳江县| 屯门区| 龙口市| 崇州市| 台北县| 巴彦县| 福建省| 株洲县| 沁水县| 繁昌县| 巴东县| 渝中区|