paulwong

          Windows環境下用ECLIPSE提交MAPREDUCE JOB至遠程HBASE中運行

          1. 假設遠程HADOOP主機名為ubuntu,則應在hosts文件中加上192.168.58.130       ubuntu


          2. 新建MAVEN項目,加上相應的配置
            pom.xml
            <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation
            ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
              <modelVersion>4.0.0</modelVersion>

              <groupId>com.cloudputing</groupId>
              <artifactId>bigdata</artifactId>
              <version>1.0</version>
              <packaging>jar</packaging>

              <name>bigdata</name>
              <url>http://maven.apache.org</url>

              <properties>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              </properties>

                <dependencies>
                    <dependency>
                        <groupId>junit</groupId>
                        <artifactId>junit</artifactId>
                        <version>3.8.1</version>
                        <scope>test</scope>
                    </dependency>
                    <dependency>
                        <groupId>org.springframework.data</groupId>
                        <artifactId>spring-data-hadoop</artifactId>
                        <version>0.9.0.RELEASE</version>
                    </dependency>
                    <dependency>
                        <groupId>org.apache.hbase</groupId>
                        <artifactId>hbase</artifactId>
                        <version>0.94.1</version>
                    </dependency>
                    
                    <!-- <dependency>
                        <groupId>org.apache.hbase</groupId>
                        <artifactId>hbase</artifactId>
                        <version>0.90.2</version>
                    </dependency> 
            -->
                    <dependency>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-core</artifactId>
                        <version>1.0.3</version>
                    </dependency>
                    <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-test</artifactId>
                        <version>3.0.5.RELEASE</version>
                    </dependency>
                </dependencies>
            </project>


          3. hbase-site.xml
            <?xml version="1.0"?>
            <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
            <!--
            /**
             * Copyright 2010 The Apache Software Foundation
             *
             * Licensed to the Apache Software Foundation (ASF) under one
             * or more contributor license agreements.  See the NOTICE file
             * distributed with this work for additional information
             * regarding copyright ownership.  The ASF licenses this file
             * to you under the Apache License, Version 2.0 (the
             * "License"); you may not use this file except in compliance
             * with the License.  You may obtain a copy of the License at
             *
             *     http://www.apache.org/licenses/LICENSE-2.0
             *
             * Unless required by applicable law or agreed to in writing, software
             * distributed under the License is distributed on an "AS IS" BASIS,
             * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
             * See the License for the specific language governing permissions and
             * limitations under the License.
             */
            -->
            <configuration>

                <property>
                    <name>hbase.rootdir</name>
                    <value>hdfs://ubuntu:9000/hbase</value>
                </property>

                <!-- 在構造JOB時,會新建一文件夾來準備所需文件。
                       如果這一段沒寫,則默認本地環境為LINUX,將用LINUX命令去實施,在WINDOWS環境下會出錯 
            -->
                <property>
                    <name>mapred.job.tracker</name>
                    <value>ubuntu:9001</value>
                </property>
                
                <property>
                    <name>hbase.cluster.distributed</name>
                    <value>true</value>
                </property>
                
                <!-- 此處會向ZOOKEEPER咨詢JOB TRACKER的可用IP -->
                <property>
                    <name>hbase.zookeeper.quorum</name>
                    <value>ubuntu</value>
                </property>
                <property skipInDoc="true">
                    <name>hbase.defaults.for.version</name>
                    <value>0.94.1</value>
                </property>

            </configuration>


          4. 測試文件:MapreduceTest.java
            package com.cloudputing.mapreduce;

            import java.io.IOException;

            import junit.framework.TestCase;

            public class MapreduceTest extends TestCase{
                
                public void testReadJob() throws IOException, InterruptedException, ClassNotFoundException
                {
                    MapreduceRead.read();
                }

            }


          5. MapreduceRead.java
            package com.cloudputing.mapreduce;

            import java.io.IOException;

            import org.apache.hadoop.conf.Configuration;
            import org.apache.hadoop.fs.FileSystem;
            import org.apache.hadoop.fs.Path;
            import org.apache.hadoop.hbase.HBaseConfiguration;
            import org.apache.hadoop.hbase.client.Result;
            import org.apache.hadoop.hbase.client.Scan;
            import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
            import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
            import org.apache.hadoop.hbase.mapreduce.TableMapper;
            import org.apache.hadoop.hbase.util.Bytes;
            import org.apache.hadoop.io.Text;
            import org.apache.hadoop.mapreduce.Job;
            import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

            public class MapreduceRead {
                
                public static void read() throws IOException, InterruptedException, ClassNotFoundException
                {
                    // Add these statements. XXX
            //        File jarFile = EJob.createTempJar("target/classes");
            //        EJob.addClasspath("D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/src/main/resources");
            //        ClassLoader classLoader = EJob.getClassLoader();
            //        Thread.currentThread().setContextClassLoader(classLoader);

                    Configuration config = HBaseConfiguration.create();
                    addTmpJar("file:/D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/target/bigdata-1.0.jar",config);
                    
                    Job job = new Job(config, "ExampleRead");
                    // And add this statement. XXX
            //        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());

            //        TableMapReduceUtil.addDependencyJars(job);
            //        TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
            //                MapreduceRead.class,MyMapper.class);
                    
                    job.setJarByClass(MapreduceRead.class);     // class that contains mapper
                    
                    Scan scan = new Scan();
                    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
                    scan.setCacheBlocks(false);  // don't set to true for MR jobs
                    
            // set other scan attrs
                    
                    TableMapReduceUtil.initTableMapperJob(
                            "wiki",        // input HBase table name
                            scan,             // Scan instance to control CF and attribute selection
                            MapreduceRead.MyMapper.class,   // mapper
                            null,             // mapper output key 
                            null,             // mapper output value
                            job);
                    job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper
                    
            //        DistributedCache.addFileToClassPath(new Path("hdfs://node.tracker1:9000/user/root/lib/stat-analysis-mapred-1.0-SNAPSHOT.jar"),job.getConfiguration());
                    
                    boolean b = job.waitForCompletion(true);
                    if (!b) {
                        throw new IOException("error with job!");
                    }
                    
                }
                
                /**
                 * 為Mapreduce添加第三方jar包
                 * 
                 * 
            @param jarPath
                 *            舉例:D:/Java/new_java_workspace/scm/lib/guava-r08.jar
                 * 
            @param conf
                 * 
            @throws IOException
                 
            */
                public static void addTmpJar(String jarPath, Configuration conf) throws IOException {
                    System.setProperty("path.separator", ":");
                    FileSystem fs = FileSystem.getLocal(conf);
                    String newJarPath = new Path(jarPath).makeQualified(fs).toString();
                    String tmpjars = conf.get("tmpjars");
                    if (tmpjars == null || tmpjars.length() == 0) {
                        conf.set("tmpjars", newJarPath);
                    } else {
                        conf.set("tmpjars", tmpjars + ":" + newJarPath);
                    }
                }
                
                public static class MyMapper extends TableMapper<Text, Text> {

                    public void map(ImmutableBytesWritable row, Result value,
                            Context context) throws InterruptedException, IOException {
                        String val1 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual1")));
                        String val2 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual2")));
                        System.out.println(val1 + " -- " + val2);
                    }
                    
                    private String getValue(byte [] value)
                    {
                        return value == null? "null" : new String(value);
                    }
                } 

            }

          posted on 2013-01-29 00:19 paulwong 閱讀(1761) 評論(0)  編輯  收藏 所屬分類: 分布式HADOOP云計算HBASE

          主站蜘蛛池模板: 晋州市| 安丘市| 博白县| 文安县| 西青区| 治县。| 淮安市| 南投市| 楚雄市| 郯城县| 杨浦区| 丽水市| 太保市| 衡阳县| 疏附县| 永春县| 志丹县| 化德县| 荔浦县| 贡嘎县| 五家渠市| 观塘区| 新晃| 迭部县| 蒲城县| 古蔺县| 上饶市| 东宁县| 兰溪市| 昌黎县| 怀来县| 天柱县| 咸丰县| 平阳县| 体育| 井研县| 塘沽区| 舞阳县| 天水市| 新蔡县| 姚安县|