Skynet

          ---------- ---------- 我的新 blog : liukaiyi.cublog.cn ---------- ----------

            BlogJava :: 首頁 :: 聯(lián)系 :: 聚合  :: 管理
            112 Posts :: 1 Stories :: 49 Comments :: 0 Trackbacks

          Java 代碼:
          package com.xunjie.dmsp.olduser;

          import java.util.Properties;

          import cascading.flow.Flow;
          import cascading.flow.FlowConnector;
          import cascading.operation.regex.RegexSplitter;
          import cascading.pipe.Each;
          import cascading.pipe.Pipe;
          import cascading.scheme.TextLine;
          import cascading.tap.Hfs;
          import cascading.tap.Tap;
          import cascading.tuple.Fields;

          /**
           * test.txt: 
           * 1    a
           * 2    b
           * 3    c
           * 
           * /data/hadoop/hadoop/bin/hadoop jar 
           *         dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar 
           *             hdfs:/user/hadoop/test/lky/test.txt
           *             file:///data/hadoop/test/lky/output
           
          */
          public class Test2 {
              
          public static void main(String[] args) {
                  
                  
          //設(shè)定輸入文件
                  String sourcePath= args[0];
                  
          //設(shè)置輸出文件夾
                  String sinkPath = args[1];

                  
          //定義讀取列
                  Fields inputfields = new Fields("num""value");
                  
          //定義分解正則,默認(rèn) \t
                  RegexSplitter spliter = new RegexSplitter(inputfields);
                  
                  
                  
          //管道定義
                  Pipe p1 = new Pipe( "test" );
                  
          //管道嵌套:
                  
          //分解日志源文件,輸出給定字段
                  p1 = new Each(p1,new Fields("line") ,spliter);
                  
                  
                  
          //設(shè)定輸入和輸出 ,使用 泛型Hfs
                  Tap source = new Hfs( new TextLine(),  sourcePath );
                  Tap sink 
          = new Hfs( new TextLine() , sinkPath );
                  
                  
                  
                  
          //配置job
                  Properties properties = new Properties();
                  properties.setProperty(
          "hadoop.job.ugi""hadoop,hadoop");
                  
                  FlowConnector.setApplicationJarClass( properties, Main.
          class );
                  FlowConnector flowConnector 
          = new FlowConnector(properties);
                  
                  Flow importFlow 
          = flowConnector.connect( "import flow", source,sink,p1);
                  
                  importFlow.start();
                  importFlow.complete();
                  

              }
          }




          整理 www.aygfsteel.com/Good-Game
          posted on 2009-07-22 10:01 劉凱毅 閱讀(674) 評論(0)  編輯  收藏 所屬分類: 集群開發(fā)
          主站蜘蛛池模板: 团风县| 云霄县| 大新县| 凯里市| 廉江市| 南部县| 柳林县| 承德县| 甘德县| 平罗县| 县级市| 唐河县| 辉县市| 桂阳县| 岑溪市| 肥东县| 竹北市| 五指山市| 黄浦区| 凤山市| 长乐市| 泽普县| 江源县| 蓝山县| 山丹县| 东光县| 芜湖市| 云浮市| 长宁县| 秦皇岛市| 广饶县| 宁国市| 栖霞市| 建德市| 莱阳市| 奉化市| 定兴县| 沙雅县| 新泰市| 九江县| 道真|