posts - 2,  comments - 0,  trackbacks - 0
          main:
           1 package com.aamend.hadoop.MapReduce;
           2  
           3 import java.io.IOException;
           4  
           5 import org.apache.hadoop.conf.Configuration;
           6 import org.apache.hadoop.fs.FileSystem;
           7 import org.apache.hadoop.fs.Path;
           8 import org.apache.hadoop.io.IntWritable;
           9 import org.apache.hadoop.io.Text;
          10 import org.apache.hadoop.mapreduce.Job;
          11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
          12 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
          13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
          14 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
          15  
          16 public class WordCount {
          17  
          18     public static void main(String[] args) throws IOException,
          19             InterruptedException, ClassNotFoundException {
          20  
          21         Path inputPath = new Path(args[0]);
          22         Path outputDir = new Path(args[1]);
          23  
          24         // Create configuration
          25         Configuration conf = new Configuration(true);
          26  
          27         // Create job
          28         Job job = new Job(conf, "WordCount");
          29         job.setJarByClass(WordCountMapper.class);
          30  
          31         // Setup MapReduce
          32         job.setMapperClass(WordCountMapper.class);
          33         job.setReducerClass(WordCountReducer.class);
          34         job.setNumReduceTasks(1);
          35  
          36         // Specify key / value
          37         job.setOutputKeyClass(Text.class);
          38         job.setOutputValueClass(IntWritable.class);
          39  
          40         // Input
          41         FileInputFormat.addInputPath(job, inputPath);
          42         job.setInputFormatClass(TextInputFormat.class);
          43  
          44         // Output
          45         FileOutputFormat.setOutputPath(job, outputDir);
          46         job.setOutputFormatClass(TextOutputFormat.class);
          47  
          48         // Delete output if exists
          49         FileSystem hdfs = FileSystem.get(conf);
          50         if (hdfs.exists(outputDir))
          51             hdfs.delete(outputDir, true);
          52  
          53         // Execute job
          54         int code = job.waitForCompletion(true? 0 : 1;
          55         System.exit(code);
          56  
          57     }
          58  
          59 }

          mapper class:
           1 package com.aamend.hadoop.MapReduce;
           2  
           3 import java.io.IOException;
           4  
           5 import org.apache.hadoop.io.IntWritable;
           6 import org.apache.hadoop.io.Text;
           7 import org.apache.hadoop.mapreduce.Mapper;
           8  
           9 public class WordCountMapper extends
          10         Mapper<Object, Text, Text, IntWritable> {
          11  
          12     private final IntWritable ONE = new IntWritable(1);
          13     private Text word = new Text();
          14  
          15     public void map(Object key, Text value, Context context)
          16             throws IOException, InterruptedException {
          17  
          18         String[] csv = value.toString().split(",");
          19         for (String str : csv) {
          20             word.set(str);
          21             context.write(word, ONE);
          22         }
          23     }
          24 }

          reducer class:
           1 package com.aamend.hadoop.MapReduce;
           2  
           3 import java.io.IOException;
           4  
           5 import org.apache.hadoop.io.IntWritable;
           6 import org.apache.hadoop.io.Text;
           7 import org.apache.hadoop.mapreduce.Reducer;
           8  
           9 public class WordCountReducer extends
          10         Reducer<Text, IntWritable, Text, IntWritable> {
          11  
          12     public void reduce(Text text, Iterable<IntWritable> values, Context context)
          13             throws IOException, InterruptedException {
          14         int sum = 0;
          15         for (IntWritable value : values) {
          16             sum += value.get();
          17         }
          18         context.write(text, new IntWritable(sum));
          19     }
          20 }




          posted on 2014-09-24 21:48 hqjma 閱讀(87) 評論(0)  編輯  收藏

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          <2025年7月>
          293012345
          6789101112
          13141516171819
          20212223242526
          272829303112
          3456789

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 平邑县| 武宁县| 万宁市| 鹤庆县| 衡阳县| 张北县| 新安县| 班玛县| 东至县| 昭觉县| 葵青区| 乳源| 沙雅县| 若羌县| 秦安县| 湟中县| 福贡县| 日喀则市| 清涧县| 临夏县| 绵竹市| 凉城县| 托克逊县| 三亚市| 五大连池市| 从化市| 屏边| 松原市| 灌阳县| 正定县| 滁州市| 巴东县| 华池县| 汉沽区| 图们市| 枞阳县| 苏州市| 新巴尔虎右旗| 香河县| 土默特左旗| 岳西县|