paulwong

          WordCount的一個變種版本…Hadoop

          統計域名(實際是host)的計數器。

          輸入:一個文件夾中有一堆的文本文件,內容是一行一個的url,可以想像為數據庫中的一條記錄
          流程:提取url的domain,對domain計數+1
          輸出:域名,域名計數

          代碼如下:
          Mapper
          package com.keseek.hadoop;

          import java.io.IOException;
          import java.net.URI;

          import org.apache.hadoop.io.LongWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapred.JobConf;
          import org.apache.hadoop.mapred.OutputCollector;
          import org.apache.hadoop.mapred.Reporter;
          import org.apache.hadoop.mapred.Mapper;

          public class DomainCountMapper implements
                  Mapper
          <LongWritable, Text, Text, LongWritable> {

              @Override
             
          public void configure(JobConf arg0) {
                 
          // Init Text and LongWritable
                  domain = new Text();
                  one
          = new LongWritable(1);
              }


              @Override
             
          public void close() throws IOException {
                 
          // TODO Auto-generated method stub
              }


              @Override
             
          public void map(LongWritable key, Text value,
                      OutputCollector
          <Text, LongWritable> output, Reporter reporter)
                     
          throws IOException {
                 
          // Get URL
                  String url = value.toString().trim();

                 
          // URL->Domain && Collect
                  domain.set(ParseDomain(url));
                 
          if (domain.getLength() != 0) {
                      output.collect(domain, one);
                  }


              }


             
          public String ParseDomain(String url) {
                 
          try {
                      URI uri
          = URI.create(url);
                     
          return uri.getHost();
                  }
          catch (Exception e) {
                     
          return "";
                  }

              }


             
          // Shared used Text domain
              private Text domain;

             
          // One static
              private LongWritable one;

          }

          Reducer

          package com.keseek.hadoop;

          import java.io.IOException;
          import java.util.Iterator;

          import org.apache.hadoop.io.LongWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapred.JobConf;
          import org.apache.hadoop.mapred.OutputCollector;
          import org.apache.hadoop.mapred.Reporter;
          import org.apache.hadoop.mapred.Reducer;

          public class DomainCountReducer implements
                  Reducer
          <Text, LongWritable, Text, LongWritable> {

              @Override
             
          public void configure(JobConf arg0) {
                 
          // TODO Auto-generated method stub

              }


              @Override
             
          public void close() throws IOException {
                 
          // TODO Auto-generated method stub

              }


              @Override
             
          public void reduce(Text key, Iterator<LongWritable> values,
                      OutputCollector
          <Text, LongWritable> output, Reporter reporter)
                     
          throws IOException {
                 
          // Count the domain
                  long cnt = 0;
                 
          while (values.hasNext()) {
                      cnt
          += values.next().get();
                  }

                 
          // Output
                  output.collect(key, new LongWritable(cnt));
              }


          }

          Main

          package com.keseek.hadoop;

          import org.apache.hadoop.fs.Path;
          import org.apache.hadoop.io.LongWritable;
          import org.apache.hadoop.io.Text;
          import org.apache.hadoop.mapred.FileInputFormat;
          import org.apache.hadoop.mapred.FileOutputFormat;
          import org.apache.hadoop.mapred.JobClient;
          import org.apache.hadoop.mapred.JobConf;
          import org.apache.hadoop.mapred.RunningJob;
          import org.apache.hadoop.mapred.TextInputFormat;
          import org.apache.hadoop.mapred.TextOutputFormat;

          public class DomainCountMain {
             
          public static void main(String[] args) throws Exception {
                 
          // Param for path
                  if (args.length != 2) {
                      System.out.println(
          "Usage:");
                      System.out
                              .println(
          "DomainCountMain.jar  <Input_Path>  <Outpu_Path>");
                      System.exit(
          -1);
                  }


                 
          // Configure JobConf
                  JobConf jobconf = new JobConf(DomainCountMain.class);

                  jobconf.setJobName(
          "Domain Counter by Coder4");

                  FileInputFormat.setInputPaths(jobconf,
          new Path(args[0]));
                 FileOutputFormat.setOutputPath(jobconf,
          new Path(args[1]));

                  jobconf.setInputFormat(TextInputFormat.
          class);
                 jobconf.setOutputFormat(TextOutputFormat.
          class);

                  jobconf.setMapperClass(DomainCountMapper.
          class);
                  jobconf.setReducerClass(DomainCountReducer.
          class);
                 jobconf.setCombinerClass(DomainCountReducer.
          class);

                  jobconf.setMapOutputKeyClass(Text.
          class);
                  jobconf.setMapOutputValueClass(LongWritable.
          class);
                  jobconf.setOutputKeyClass(Text.
          class);
                  jobconf.setOutputValueClass(LongWritable.
          class);

                 
          // Run job
                  RunningJob run = JobClient.runJob(jobconf);
                  run.waitForCompletion();
                 
          if (run.isSuccessful()) {
                      System.out.println(
          "<<<DomainCount Main>>> success.");
                  }
          else {
                      System.out.println(
          "<<<DomainCount Main>>> error.");
                  }

              }

          }

          posted on 2012-09-08 15:30 paulwong 閱讀(267) 評論(0)  編輯  收藏 所屬分類: HADOOP云計算

          主站蜘蛛池模板: 南岸区| 特克斯县| 日土县| 清水河县| 安福县| 彰武县| 广宁县| 日照市| 隆昌县| 北京市| 江孜县| 紫阳县| 荃湾区| 宣武区| 大化| 连南| 板桥市| 岐山县| 巫山县| 池州市| 忻州市| 西昌市| 盐亭县| 调兵山市| 五台县| 德格县| 定远县| 兴城市| 西乌| 柘荣县| 榆林市| 丽水市| 蒲江县| 裕民县| 金门县| 广河县| 文化| 方正县| 高安市| 广丰县| 德兴市|