Skynet

          ---------- ---------- 我的新 blog : liukaiyi.cublog.cn ---------- ----------

            BlogJava :: 首頁(yè) :: 聯(lián)系 :: 聚合  :: 管理
            112 Posts :: 1 Stories :: 49 Comments :: 0 Trackbacks

          Java 代碼:
          package com.xunjie.dmsp.olduser;

          import java.util.Properties;

          import cascading.flow.Flow;
          import cascading.flow.FlowConnector;
          import cascading.operation.regex.RegexSplitter;
          import cascading.pipe.Each;
          import cascading.pipe.Pipe;
          import cascading.scheme.TextLine;
          import cascading.tap.Hfs;
          import cascading.tap.Tap;
          import cascading.tuple.Fields;

          /**
           * test.txt: 
           * 1    a
           * 2    b
           * 3    c
           * 
           * /data/hadoop/hadoop/bin/hadoop jar 
           *         dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar 
           *             hdfs:/user/hadoop/test/lky/test.txt
           *             file:///data/hadoop/test/lky/output
           
          */
          public class Test2 {
              
          public static void main(String[] args) {
                  
                  
          //設(shè)定輸入文件
                  String sourcePath= args[0];
                  
          //設(shè)置輸出文件夾
                  String sinkPath = args[1];

                  
          //定義讀取列
                  Fields inputfields = new Fields("num""value");
                  
          //定義分解正則,默認(rèn) \t
                  RegexSplitter spliter = new RegexSplitter(inputfields);
                  
                  
                  
          //管道定義
                  Pipe p1 = new Pipe( "test" );
                  
          //管道嵌套:
                  
          //分解日志源文件,輸出給定字段
                  p1 = new Each(p1,new Fields("line") ,spliter);
                  
                  
                  
          //設(shè)定輸入和輸出 ,使用 泛型Hfs
                  Tap source = new Hfs( new TextLine(),  sourcePath );
                  Tap sink 
          = new Hfs( new TextLine() , sinkPath );
                  
                  
                  
                  
          //配置job
                  Properties properties = new Properties();
                  properties.setProperty(
          "hadoop.job.ugi""hadoop,hadoop");
                  
                  FlowConnector.setApplicationJarClass( properties, Main.
          class );
                  FlowConnector flowConnector 
          = new FlowConnector(properties);
                  
                  Flow importFlow 
          = flowConnector.connect( "import flow", source,sink,p1);
                  
                  importFlow.start();
                  importFlow.complete();
                  

              }
          }




          整理 www.aygfsteel.com/Good-Game
          posted on 2009-07-22 10:01 劉凱毅 閱讀(674) 評(píng)論(0)  編輯  收藏 所屬分類: 集群開發(fā)
          主站蜘蛛池模板: 芜湖县| 唐海县| 连云港市| 繁昌县| 米林县| 隆林| 龙胜| 金塔县| 化州市| 环江| 榆林市| 吴堡县| 松溪县| 嘉祥县| 栾城县| 尼木县| 青阳县| 侯马市| 威远县| 宁都县| 麟游县| 云和县| 北川| 瑞昌市| 达尔| 太仓市| 陇川县| 通州区| 阜新市| 肥东县| 无棣县| 宁远县| 营山县| 彭州市| 来凤县| 雷州市| 射阳县| 梨树县| 洛川县| 宁阳县| 木里|