posts - 495,comments - 227,trackbacks - 0
          http://www.cnblogs.com/spork/archive/2010/04/21/1717592.html

            經過上一篇的分析,我們知道了Hadoop的作業提交目標是Cluster還是Local,與conf文件夾內的配置文件參數有著密切關系,不僅如此,其它的很多類都跟conf有關,所以提交作業時切記把conf放到你的classpath中。

            因為Configuration是利用當前線程上下文的類加載器來加載資源和文件的,所以這里我們采用動態載入的方式,先添加好對應的依賴庫和資源,然后再構建一個URLClassLoader作為當前線程上下文的類加載器。

          復制代碼
          public static ClassLoader getClassLoader() {
          ClassLoader parent
          = Thread.currentThread().getContextClassLoader();
          if (parent == null) {
          parent
          = EJob.class.getClassLoader();
          }
          if (parent == null) {
          parent
          = ClassLoader.getSystemClassLoader();
          }
          return new URLClassLoader(classPath.toArray(new URL[0]), parent);
          }
          復制代碼

            代碼很簡單,廢話就不多說了。調用例子如下:

          EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
          ClassLoader classLoader
          = EJob.getClassLoader();
          Thread.currentThread().setContextClassLoader(classLoader);

            設置好了類加載器,下面還有一步就是要打包Jar文件,就是讓Project自打包自己的class為一個Jar包,我這里以標準Eclipse工程文件夾布局為例,打包的就是bin文件夾里的class。

          復制代碼
          public static File createTempJar(String root) throws IOException {
          if (!new File(root).exists()) {
          return null;
          }
          Manifest manifest
          = new Manifest();
          manifest.getMainAttributes().putValue(
          "Manifest-Version", "1.0");
          final File jarFile = File.createTempFile("EJob-", ".jar", new File(System
          .getProperty(
          "java.io.tmpdir")));

          Runtime.getRuntime().addShutdownHook(
          new Thread() {
          public void run() {
          jarFile.delete();
          }
          });

          JarOutputStream out
          = new JarOutputStream(new FileOutputStream(jarFile),
          manifest);
          createTempJarInner(out,
          new File(root), "");
          out.flush();
          out.close();
          return jarFile;
          }

          private static void createTempJarInner(JarOutputStream out, File f,
          String base)
          throws IOException {
          if (f.isDirectory()) {
          File[] fl
          = f.listFiles();
          if (base.length() > 0) {
          base
          = base + "/";
          }
          for (int i = 0; i < fl.length; i++) {
          createTempJarInner(out, fl[i], base
          + fl[i].getName());
          }
          }
          else {
          out.putNextEntry(
          new JarEntry(base));
          FileInputStream in
          = new FileInputStream(f);
          byte[] buffer = new byte[1024];
          int n = in.read(buffer);
          while (n != -1) {
          out.write(buffer,
          0, n);
          n
          = in.read(buffer);
          }
          in.close();
          }
          }
          復制代碼

            這里的對外接口是createTempJar,接收參數為需要打包的文件夾根路徑,支持子文件夾打包。使用遞歸處理法,依次把文件夾里的結構和 文件打包到Jar里。很簡單,就是基本的文件流操作,陌生一點的就是Manifest和JarOutputStream,查查API就明了。

            好,萬事具備,只欠東風了,我們來實踐一下試試。還是拿WordCount來舉例:

          復制代碼
          // Add these statements. XXX
          File jarFile = EJob.createTempJar("bin");
          EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
          ClassLoader classLoader =
          EJob.getClassLoader();
          Thread.currentThread().setContextClassLoader(classLoader);


          Configuration conf
          = new Configuration();
          String[] otherArgs
          = new GenericOptionsParser(conf, args)
          .getRemainingArgs();
          if (otherArgs.length != 2) {
          System.err.println(
          "Usage: wordcount <in> <out>");
          System.exit(
          2);
          }

          Job job
          = new Job(conf, "word count");
          job.setJarByClass(WordCountTest.
          class);
          job.setMapperClass(TokenizerMapper.
          class);
          job.setCombinerClass(IntSumReducer.
          class);
          job.setReducerClass(IntSumReducer.
          class);
          job.setOutputKeyClass(Text.
          class);
          job.setOutputValueClass(IntWritable.
          class);
          FileInputFormat.addInputPath(job,
          new Path(otherArgs[0]));
          FileOutputFormat.setOutputPath(job,
          new Path(otherArgs[1]));
          System.exit(job.waitForCompletion(
          true) ? 0 : 1);
          復制代碼

            Run as Java Application。。。!!!No job jar file set...異常,看來job.setJarByClass(WordCountTest.class)這個語句設置作業Jar包沒有成功。這是為什么呢?

          因為這個方法使用了WordCount.class的類加載器來尋找包含該類的Jar包,然后設置該Jar包為作業所用的Jar包。但是我們的作業 Jar包是在程序運行時才打包的,而WordCount.class的類加載器是AppClassLoader,運行后我們無法改變它的搜索路徑,所以使 用setJarByClass是無法設置作業Jar包的。我們必須使用JobConf里的setJar來直接設置作業Jar包,像下面一樣:

          ((JobConf)job.getConfiguration()).setJar(jarFile);

            好,我們對上面的例子再做下修改,加上上面這條語句。

          Job job = new Job(conf, "word count");
          // And add this statement. XXX
          ((JobConf) job.getConfiguration()).setJar(jarFile.toString());

            再Run as Java Application,終于OK了~~

            該種方法的Run on Hadoop使用簡單,兼容性好,推薦一試。:)

            本例子由于時間關系,只在Ubuntu上做了偽分布式測試,但理論上是可以用到真實分布式上去的。

               >>點我下載<<

           

            The end.

          posted on 2013-02-22 14:05 SIMONE 閱讀(800) 評論(0)  編輯  收藏 所屬分類: hbase
          主站蜘蛛池模板: 江安县| 古田县| 紫阳县| 新晃| 通江县| 靖边县| 长春市| 舒城县| 益阳市| 嘉禾县| 神农架林区| 重庆市| 雅江县| 岳池县| 临沂市| 任丘市| 长阳| 桃源县| 吉林省| 彭州市| 普兰店市| 彩票| 荣成市| 子洲县| 张家口市| 沅江市| 志丹县| 辽宁省| 两当县| 保德县| 台山市| 阿图什市| 昔阳县| 汨罗市| 兴宁市| 乳山市| 蒙自县| 北流市| 东源县| 紫金县| 兴国县|