即HBase作为MapReduce的数据来源,MapReduce 分析,输出数据存储在HBase表中
CLASSPATH
HBase, MapReduce, and the CLASSPATH
By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.
官网bb了很多,意思是说,mapReduce 默认是没有添加HBase的依赖包的,你可以通过添加HBase-site这个配置文件到hadoop配置目录下,但是这样要复制到整个集群;或者你可以编辑Hadoop的CLASSPATH,但这样又会使得你的Hadoop环境受到污染。而且需要重启Hadoop集群才能生效。
因此,最好的方法是让HBase自己添加自己的依赖包到Hadoop的CLASSPATH,然后再使用程序。
1.输出MapReduce与HBase集成时候需要的HBase依赖包
bin/hbase mapredcp
2.于是我们可以,通过以下方法执行程序
#先将HBase的依赖包告诉世界 (空格) 然后执行mapreduce程序
$ HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp` $HADOOP_HOME/bin/hadoop jar $HBASE_HOME/lib/hbase-server-1.2.0-cdh5.12.0.jar
工具包hbase-server-VERSION.jar含了以下几个功能(超级有用)
# 统计Cell数目
CellCounter: Count cells in HBase table.
#
WALPlayer: Replay WAL files.
# ******大量的数据加载******重中之重,把TSV、CSV格式的文件通过 MapReduce 直接存储成 hfile(以块存储的HBase文件) 然后加载(移动)到表中去,不走正常的路径一条条插入
completebulkload: Complete a bulk data load.
# 从一个集群拷贝到另一个集群
copytable: Export a table from local cluster to peer cluster.
# 导入导出数据从HBase > HDFS
export: Write table data to HDFS.
exportsnapshot: Export the specific snapshot to a given FileSystem.
import: Import data written by Export.
# TSV table分隔 CSV 使用逗号分隔
importtsv: Import data in TSV format.
# 统计行数
rowcounter: Count rows in HBase table.
verifyrep: Compare the data from tables in two different clusters.
MapReduce读写HBase范例程序编写(参考官网)
package com.gci.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 需求分析,从数据表user读取info:name到新表basic:info:name
*/
// extends Configured implements Tool 实现Tool接口的run方法,真正的入口的方法
public class Table_user2basic extends Configured implements Tool {
public static final String sourceTable = "user";
public static final String targetTable = "basic";
// 一.Mapper class extends TableMapper<KEYOUT输出的Key的类型, VALUEOUT输出的Value的类型>
// 原版的Mapper程序是有输入的KV类型,和输出的KV类型四个参数,源码:extends Mapper<ImmutableBytesWritable,
// Result, KEYOUT, VALUEOUT>
// Put类型为hbase中定义的类型,便于作为Reducer的输入类型,根据reducer输入类型可知
public static class ReadUserMapper extends TableMapper<Text, Put> {
private Text mapOutputKey = new Text();
@Override
public void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
throws IOException, InterruptedException {
// get rowKey
String rowKey = Bytes.toString(key.get());
// set outputRowKey
mapOutputKey.set(rowKey);
// 通过rowKey创建put对象
Put put = new Put(key.get());
// 迭代以获取cell数据
for (Cell cell : value.rawCells()) {
// add family 详情请看HBase API 使用(让info在前,避免了空指针异常)
if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
// add column:name
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
put.add(cell);
}
}
}
}
}
// 二.Reducer calss extends TableReducer<KEYIN, VALUEIN, KEYOUT>
// 输出key 类型为ImmutableBytesWritable 实现writeableComparable的字节数组
// 输出 value 类型为 Mutation 是 delete put increment append 的父类
public static class WriteBasicReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {
@Override
public void reduce(Text key, Iterable<Put> values,
Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
// 从得到的put中得到数据
for (Put put : values) {
// 往外写数据
context.write(null, put);
}
}
}
// 三.Driver
public int run(String[] arg0) throws Exception {
// create job
Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
// set run job class
job.setJarByClass(this.getClass());
// set job
Scan scan = new Scan();
scan.setCaching(500); // 每次获取条目数 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
// set input and set mapper
TableMapReduceUtil.initTableMapperJob(sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
ReadUserMapper.class, // mapper class
Text.class, // mapper output key
Put.class, // mapper output value
job);
// set reducer and output
TableMapReduceUtil.initTableReducerJob(targetTable, // output table
WriteBasicReducer.class, // reducer class
job);
job.setNumReduceTasks(1); // 设置Reduce个数 at least one, adjust as required
// 提交 submit job
Boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// get configuration
Configuration configuration = HBaseConfiguration.create();
// submit job 提交job
int status = ToolRunner.run(configuration, new Table_user2basic(), args);
// exit program 结束程序
System.exit(status);
}
}