Skynet

          ---------- ---------- 我的新 blog : liukaiyi.cublog.cn ---------- ----------

            BlogJava :: 首頁 :: 聯系 :: 聚合  :: 管理
            112 Posts :: 1 Stories :: 49 Comments :: 0 Trackbacks

          Java 代碼:
          package com.xunjie.dmsp.olduser;

          import java.util.Properties;

          import cascading.flow.Flow;
          import cascading.flow.FlowConnector;
          import cascading.operation.regex.RegexSplitter;
          import cascading.pipe.Each;
          import cascading.pipe.Pipe;
          import cascading.scheme.TextLine;
          import cascading.tap.Hfs;
          import cascading.tap.Tap;
          import cascading.tuple.Fields;

          /**
           * test.txt: 
           * 1    a
           * 2    b
           * 3    c
           * 
           * /data/hadoop/hadoop/bin/hadoop jar 
           *         dmsp_test_jar-1.0-SNAPSHOT-dependencies.jar 
           *             hdfs:/user/hadoop/test/lky/test.txt
           *             file:///data/hadoop/test/lky/output
           
          */
          public class Test2 {
              
          public static void main(String[] args) {
                  
                  
          //設定輸入文件
                  String sourcePath= args[0];
                  
          //設置輸出文件夾
                  String sinkPath = args[1];

                  
          //定義讀取列
                  Fields inputfields = new Fields("num""value");
                  
          //定義分解正則,默認 \t
                  RegexSplitter spliter = new RegexSplitter(inputfields);
                  
                  
                  
          //管道定義
                  Pipe p1 = new Pipe( "test" );
                  
          //管道嵌套:
                  
          //分解日志源文件,輸出給定字段
                  p1 = new Each(p1,new Fields("line") ,spliter);
                  
                  
                  
          //設定輸入和輸出 ,使用 泛型Hfs
                  Tap source = new Hfs( new TextLine(),  sourcePath );
                  Tap sink 
          = new Hfs( new TextLine() , sinkPath );
                  
                  
                  
                  
          //配置job
                  Properties properties = new Properties();
                  properties.setProperty(
          "hadoop.job.ugi""hadoop,hadoop");
                  
                  FlowConnector.setApplicationJarClass( properties, Main.
          class );
                  FlowConnector flowConnector 
          = new FlowConnector(properties);
                  
                  Flow importFlow 
          = flowConnector.connect( "import flow", source,sink,p1);
                  
                  importFlow.start();
                  importFlow.complete();
                  

              }
          }




          整理 www.aygfsteel.com/Good-Game
          posted on 2009-07-22 10:01 劉凱毅 閱讀(681) 評論(0)  編輯  收藏 所屬分類: 集群開發
          主站蜘蛛池模板: 辰溪县| 金昌市| 海城市| 西乌珠穆沁旗| 苍山县| 桂阳县| 泰和县| 四子王旗| 卓尼县| 西安市| 香港| 肥城市| 合阳县| 阳泉市| 乡宁县| 正定县| 永吉县| 北辰区| 苗栗市| 武乡县| 托里县| 漠河县| 繁峙县| 峨边| 古浪县| 于田县| 灵台县| 濮阳县| 河南省| 大庆市| 甘德县| 诏安县| 原平市| 隆昌县| 昌江| 通城县| 临漳县| 土默特左旗| 镇原县| 宁晋县| 盘山县|