posts - 2,  comments - 0,  trackbacks - 0
          main:
           1 package com.aamend.hadoop.MapReduce;
           2  
           3 import java.io.IOException;
           4  
           5 import org.apache.hadoop.conf.Configuration;
           6 import org.apache.hadoop.fs.FileSystem;
           7 import org.apache.hadoop.fs.Path;
           8 import org.apache.hadoop.io.IntWritable;
           9 import org.apache.hadoop.io.Text;
          10 import org.apache.hadoop.mapreduce.Job;
          11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
          12 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
          13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
          14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
          15  
          16 public class WordCount {
          17  
          18     public static void main(String[] args) throws IOException,
          19             InterruptedException, ClassNotFoundException {
          20  
          21         Path inputPath = new Path(args[0]);
          22         Path outputDir = new Path(args[1]);
          23  
          24         // Create configuration
          25         Configuration conf = new Configuration(true);
          26  
          27         // Create job
          28         Job job = new Job(conf, "WordCount");
          29         job.setJarByClass(WordCountMapper.class);
          30  
          31         // Setup MapReduce
          32         job.setMapperClass(WordCountMapper.class);
          33         job.setReducerClass(WordCountReducer.class);
          34         job.setNumReduceTasks(1);
          35  
          36         // Specify key / value
          37         job.setOutputKeyClass(Text.class);
          38         job.setOutputValueClass(IntWritable.class);
          39  
          40         // Input
          41         FileInputFormat.addInputPath(job, inputPath);
          42         job.setInputFormatClass(TextInputFormat.class);
          43  
          44         // Output
          45         FileOutputFormat.setOutputPath(job, outputDir);
          46         job.setOutputFormatClass(TextOutputFormat.class);
          47  
          48         // Delete output if exists
          49         FileSystem hdfs = FileSystem.get(conf);
          50         if (hdfs.exists(outputDir))
          51             hdfs.delete(outputDir, true);
          52  
          53         // Execute job
          54         int code = job.waitForCompletion(true? 0 : 1;
          55         System.exit(code);
          56  
          57     }
          58  
          59 }

          mapper class:
           1 package com.aamend.hadoop.MapReduce;
           2  
           3 import java.io.IOException;
           4  
           5 import org.apache.hadoop.io.IntWritable;
           6 import org.apache.hadoop.io.Text;
           7 import org.apache.hadoop.mapreduce.Mapper;
           8  
           9 public class WordCountMapper extends
          10         Mapper<Object, Text, Text, IntWritable> {
          11  
          12     private final IntWritable ONE = new IntWritable(1);
          13     private Text word = new Text();
          14  
          15     public void map(Object key, Text value, Context context)
          16             throws IOException, InterruptedException {
          17  
          18         String[] csv = value.toString().split(",");
          19         for (String str : csv) {
          20             word.set(str);
          21             context.write(word, ONE);
          22         }
          23     }
          24 }

          reducer class:
           1 package com.aamend.hadoop.MapReduce;
           2  
           3 import java.io.IOException;
           4  
           5 import org.apache.hadoop.io.IntWritable;
           6 import org.apache.hadoop.io.Text;
           7 import org.apache.hadoop.mapreduce.Reducer;
           8  
           9 public class WordCountReducer extends
          10         Reducer<Text, IntWritable, Text, IntWritable> {
          11  
          12     public void reduce(Text text, Iterable<IntWritable> values, Context context)
          13             throws IOException, InterruptedException {
          14         int sum = 0;
          15         for (IntWritable value : values) {
          16             sum += value.get();
          17         }
          18         context.write(text, new IntWritable(sum));
          19     }
          20 }




          posted on 2014-09-24 21:48 hqjma 閱讀(86) 評論(0)  編輯  收藏

          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導航:
           
          <2025年6月>
          25262728293031
          1234567
          891011121314
          15161718192021
          22232425262728
          293012345

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 梁平县| 于田县| 富源县| 锦州市| 胶州市| 蛟河市| 元朗区| 化德县| 微山县| 商水县| 六盘水市| 丰原市| 土默特左旗| 冕宁县| 盐源县| 类乌齐县| 东乡族自治县| 大同市| 广宗县| 秀山| 克拉玛依市| 利辛县| 广德县| 布尔津县| 比如县| 柳江县| 东乌珠穆沁旗| 沁源县| 尖扎县| 青岛市| 水城县| 锡林浩特市| 贞丰县| 安丘市| 和田市| 五家渠市| 闽侯县| 余干县| 昌宁县| 南川市| 浪卡子县|