Windows環境下用ECLIPSE提交MAPREDUCE JOB至遠程HBASE中運行
- 假設遠程HADOOP主機名為ubuntu,則應在hosts文件中加上192.168.58.130 ubuntu
- 新建MAVEN項目,加上相應的配置
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloudputing</groupId>
<artifactId>bigdata</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>bigdata</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.1</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.90.2</version>
</dependency> -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>3.0.5.RELEASE</version>
</dependency>
</dependencies>
</project> -
hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://ubuntu:9000/hbase</value>
</property>
<!-- 在構造JOB時,會新建一文件夾來準備所需文件。
如果這一段沒寫,則默認本地環境為LINUX,將用LINUX命令去實施,在WINDOWS環境下會出錯 -->
<property>
<name>mapred.job.tracker</name>
<value>ubuntu:9001</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 此處會向ZOOKEEPER咨詢JOB TRACKER的可用IP -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>ubuntu</value>
</property>
<property skipInDoc="true">
<name>hbase.defaults.for.version</name>
<value>0.94.1</value>
</property>
</configuration> - 測試文件:MapreduceTest.java
package com.cloudputing.mapreduce;
import java.io.IOException;
import junit.framework.TestCase;
public class MapreduceTest extends TestCase{
public void testReadJob() throws IOException, InterruptedException, ClassNotFoundException
{
MapreduceRead.read();
}
} -
MapreduceRead.javapackage com.cloudputing.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
public class MapreduceRead {
public static void read() throws IOException, InterruptedException, ClassNotFoundException
{
// Add these statements. XXX
// File jarFile = EJob.createTempJar("target/classes");
// EJob.addClasspath("D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/src/main/resources");
// ClassLoader classLoader = EJob.getClassLoader();
// Thread.currentThread().setContextClassLoader(classLoader);
Configuration config = HBaseConfiguration.create();
addTmpJar("file:/D:/PAUL/WORK/WORK-SPACES/TEST1/cloudputing/target/bigdata-1.0.jar",config);
Job job = new Job(config, "ExampleRead");
// And add this statement. XXX
// ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
// TableMapReduceUtil.addDependencyJars(job);
// TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
// MapreduceRead.class,MyMapper.class);
job.setJarByClass(MapreduceRead.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
"wiki", // input HBase table name
scan, // Scan instance to control CF and attribute selection
MapreduceRead.MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
// DistributedCache.addFileToClassPath(new Path("hdfs://node.tracker1:9000/user/root/lib/stat-analysis-mapred-1.0-SNAPSHOT.jar"),job.getConfiguration());
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
/**
* 為Mapreduce添加第三方jar包
*
* @param jarPath
* 舉例:D:/Java/new_java_workspace/scm/lib/guava-r08.jar
* @param conf
* @throws IOException
*/
public static void addTmpJar(String jarPath, Configuration conf) throws IOException {
System.setProperty("path.separator", ":");
FileSystem fs = FileSystem.getLocal(conf);
String newJarPath = new Path(jarPath).makeQualified(fs).toString();
String tmpjars = conf.get("tmpjars");
if (tmpjars == null || tmpjars.length() == 0) {
conf.set("tmpjars", newJarPath);
} else {
conf.set("tmpjars", tmpjars + ":" + newJarPath);
}
}
public static class MyMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result value,
Context context) throws InterruptedException, IOException {
String val1 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual1")));
String val2 = getValue(value.getValue(Bytes.toBytes("text"), Bytes.toBytes("qual2")));
System.out.println(val1 + " -- " + val2);
}
private String getValue(byte [] value)
{
return value == null? "null" : new String(value);
}
}
}
posted on 2013-01-29 00:19 paulwong 閱讀(1761) 評論(0) 編輯 收藏 所屬分類: 分布式 、HADOOP 、云計算 、HBASE