Skynet

          ---------- ---------- 我的新 blog : liukaiyi.cublog.cn ---------- ----------

            BlogJava :: 首頁 :: 聯系 :: 聚合  :: 管理
            112 Posts :: 1 Stories :: 49 Comments :: 0 Trackbacks

          Java 代碼:
          package com.xunjie.dmsp.olduser;

          import java.util.Properties;

          import cascading.flow.Flow;
          import cascading.flow.FlowConnector;
          import cascading.operation.regex.RegexSplitter;
          import cascading.pipe.Each;
          import cascading.pipe.Pipe;
          import cascading.scheme.TextLine;
          import cascading.tap.Hfs;
          import cascading.tap.Tap;
          import cascading.tuple.Fields;

          /**
           * test.txt: 
           * 1    a
           * 2    b
           * 3    c
           * 
           * /data/hadoop/hadoop/bin/hadoop jar 
           *         dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar 
           *             hdfs:/user/hadoop/test/lky/test.txt
           *             file:///data/hadoop/test/lky/output
           
          */
          public class Test2 {
              
          public static void main(String[] args) {
                  
                  
          //設定輸入文件
                  String sourcePath= args[0];
                  
          //設置輸出文件夾
                  String sinkPath = args[1];

                  
          //定義讀取列
                  Fields inputfields = new Fields("num""value");
                  
          //定義分解正則,默認 \t
                  RegexSplitter spliter = new RegexSplitter(inputfields);
                  
                  
                  
          //管道定義
                  Pipe p1 = new Pipe( "test" );
                  
          //管道嵌套:
                  
          //分解日志源文件,輸出給定字段
                  p1 = new Each(p1,new Fields("line") ,spliter);
                  
                  
                  
          //設定輸入和輸出 ,使用 泛型Hfs
                  Tap source = new Hfs( new TextLine(),  sourcePath );
                  Tap sink 
          = new Hfs( new TextLine() , sinkPath );
                  
                  
                  
                  
          //配置job
                  Properties properties = new Properties();
                  properties.setProperty(
          "hadoop.job.ugi""hadoop,hadoop");
                  
                  FlowConnector.setApplicationJarClass( properties, Main.
          class );
                  FlowConnector flowConnector 
          = new FlowConnector(properties);
                  
                  Flow importFlow 
          = flowConnector.connect( "import flow", source,sink,p1);
                  
                  importFlow.start();
                  importFlow.complete();
                  

              }
          }




          整理 www.aygfsteel.com/Good-Game
          posted on 2009-07-22 10:01 劉凱毅 閱讀(681) 評論(0)  編輯  收藏 所屬分類: 集群開發
          主站蜘蛛池模板: 玉龙| 茌平县| 马龙县| 沙田区| 平泉县| 甘孜县| 开封市| 扶绥县| 化隆| 广丰县| 柳林县| 成武县| 田阳县| 永清县| 福州市| 泰宁县| 锡林郭勒盟| 勐海县| 洛南县| 比如县| 白水县| 蒲城县| 漳州市| 长白| 独山县| 阳朔县| 贵南县| 青川县| 南丰县| 长白| 郯城县| 新竹市| 雷州市| 怀来县| 左贡县| 天水市| 长泰县| 宣城市| 工布江达县| 凤阳县| 原阳县|