Skynet

          ---------- ---------- 我的新 blog : liukaiyi.cublog.cn ---------- ----------

            BlogJava :: 首頁 :: 聯系 :: 聚合  :: 管理
            112 Posts :: 1 Stories :: 49 Comments :: 0 Trackbacks

          Java 代碼:
          package com.xunjie.dmsp.olduser;

          import java.util.Properties;

          import cascading.flow.Flow;
          import cascading.flow.FlowConnector;
          import cascading.operation.regex.RegexSplitter;
          import cascading.pipe.Each;
          import cascading.pipe.Pipe;
          import cascading.scheme.TextLine;
          import cascading.tap.Hfs;
          import cascading.tap.Tap;
          import cascading.tuple.Fields;

          /**
           * test.txt: 
           * 1    a
           * 2    b
           * 3    c
           * 
           * /data/hadoop/hadoop/bin/hadoop jar 
           *         dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar 
           *             hdfs:/user/hadoop/test/lky/test.txt
           *             file:///data/hadoop/test/lky/output
           
          */
          public class Test2 {
              
          public static void main(String[] args) {
                  
                  
          //設定輸入文件
                  String sourcePath= args[0];
                  
          //設置輸出文件夾
                  String sinkPath = args[1];

                  
          //定義讀取列
                  Fields inputfields = new Fields("num""value");
                  
          //定義分解正則,默認 \t
                  RegexSplitter spliter = new RegexSplitter(inputfields);
                  
                  
                  
          //管道定義
                  Pipe p1 = new Pipe( "test" );
                  
          //管道嵌套:
                  
          //分解日志源文件,輸出給定字段
                  p1 = new Each(p1,new Fields("line") ,spliter);
                  
                  
                  
          //設定輸入和輸出 ,使用 泛型Hfs
                  Tap source = new Hfs( new TextLine(),  sourcePath );
                  Tap sink 
          = new Hfs( new TextLine() , sinkPath );
                  
                  
                  
                  
          //配置job
                  Properties properties = new Properties();
                  properties.setProperty(
          "hadoop.job.ugi""hadoop,hadoop");
                  
                  FlowConnector.setApplicationJarClass( properties, Main.
          class );
                  FlowConnector flowConnector 
          = new FlowConnector(properties);
                  
                  Flow importFlow 
          = flowConnector.connect( "import flow", source,sink,p1);
                  
                  importFlow.start();
                  importFlow.complete();
                  

              }
          }




          整理 www.aygfsteel.com/Good-Game
          posted on 2009-07-22 10:01 劉凱毅 閱讀(681) 評論(0)  編輯  收藏 所屬分類: 集群開發
          主站蜘蛛池模板: 南开区| 潞城市| 克山县| 林州市| 新津县| 江阴市| 驻马店市| 新绛县| 潜山县| 莆田市| 南投市| 沁水县| 吴旗县| 平塘县| 荥阳市| 松溪县| 乌兰浩特市| 叙永县| 新蔡县| 房产| 修文县| 阿拉善右旗| 额济纳旗| 南投县| 武城县| 东光县| 简阳市| 仙游县| 和平县| 银川市| 江阴市| 北流市| 吉木乃县| 西华县| 涟源市| 钟山县| 双桥区| 景泰县| 中西区| 鲁甸县| 开阳县|