HBase与MapReduce集成

即HBase作为MapReduce的数据来源,MapReduce 分析,输出数据存储在HBase表中

CLASSPATH

HBase, MapReduce, and the CLASSPATH
By default, MapReduce jobs deployed to a MapReduce cluster do not have access to either the HBase configuration under $HBASE_CONF_DIR or the HBase classes.
官网bb了很多,意思是说,mapReduce 默认是没有添加HBase的依赖包的,你可以通过添加HBase-site这个配置文件到hadoop配置目录下,但是这样要复制到整个集群;或者你可以编辑Hadoop的CLASSPATH,但这样又会使得你的Hadoop环境受到污染。而且需要重启Hadoop集群才能生效。
因此,最好的方法是让HBase自己添加自己的依赖包到Hadoop的CLASSPATH,然后再使用程序。

1.输出MapReduce与HBase集成时候需要的HBase依赖包

bin/hbase mapredcp

2.于是我们可以,通过以下方法执行程序

#先将HBase的依赖包告诉世界 (空格) 然后执行mapreduce程序
$ HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp` $HADOOP_HOME/bin/hadoop jar $HBASE_HOME/lib/hbase-server-1.2.0-cdh5.12.0.jar

工具包hbase-server-VERSION.jar含了以下几个功能(超级有用)

# 统计Cell数目
CellCounter: Count cells in HBase table.

# 
WALPlayer: Replay WAL files.

# ******大量的数据加载******重中之重,把TSV、CSV格式的文件通过 MapReduce 直接存储成 hfile(以块存储的HBase文件) 然后加载(移动)到表中去,不走正常的路径一条条插入
completebulkload: Complete a bulk data load.

# 从一个集群拷贝到另一个集群
copytable: Export a table from local cluster to peer cluster.

# 导入导出数据从HBase >    HDFS 
export: Write table data to HDFS.
exportsnapshot: Export the specific snapshot to a given FileSystem.
import: Import data written by Export.
# TSV table分隔 CSV 使用逗号分隔
importtsv: Import data in TSV format.

# 统计行数
rowcounter: Count rows in HBase table.
verifyrep: Compare the data from tables in two different clusters.

MapReduce读写HBase范例程序编写(参考官网)

package com.gci.hadoop.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 需求分析,从数据表user读取info:name到新表basic:info:name
 */

// extends Configured implements Tool 实现Tool接口的run方法,真正的入口的方法
public class Table_user2basic extends Configured implements Tool {

    public static final String sourceTable = "user";
    public static final String targetTable = "basic";

    // 一.Mapper class extends TableMapper<KEYOUT输出的Key的类型, VALUEOUT输出的Value的类型>
    // 原版的Mapper程序是有输入的KV类型,和输出的KV类型四个参数,源码:extends Mapper<ImmutableBytesWritable,
    // Result, KEYOUT, VALUEOUT>
    // Put类型为hbase中定义的类型,便于作为Reducer的输入类型,根据reducer输入类型可知
    public static class ReadUserMapper extends TableMapper<Text, Put> {

        private Text mapOutputKey = new Text();

        @Override
        public void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
                throws IOException, InterruptedException {
            // get rowKey
            String rowKey = Bytes.toString(key.get());

            // set outputRowKey
            mapOutputKey.set(rowKey);

            // 通过rowKey创建put对象
            Put put = new Put(key.get());

            // 迭代以获取cell数据
            for (Cell cell : value.rawCells()) {
                // add family 详情请看HBase API 使用(让info在前,避免了空指针异常)
                if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                    // add column:name
                    if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                        put.add(cell);
                    }
                }
            }
        }
    }

    // 二.Reducer calss extends TableReducer<KEYIN, VALUEIN, KEYOUT>
    // 输出key 类型为ImmutableBytesWritable 实现writeableComparable的字节数组
    // 输出 value 类型为 Mutation 是 delete put increment append 的父类
    public static class WriteBasicReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {

        @Override
        public void reduce(Text key, Iterable<Put> values,
                Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
                throws IOException, InterruptedException {
            // 从得到的put中得到数据
            for (Put put : values) {
                // 往外写数据
                context.write(null, put);
            }
        }

    }

    // 三.Driver
    public int run(String[] arg0) throws Exception {

        // create job
        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());

        // set run job class
        job.setJarByClass(this.getClass());

        // set job
        Scan scan = new Scan();
        scan.setCaching(500); // 每次获取条目数 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false); // don't set to true for MR jobs
        // set other scan attrs

        // set input and set mapper
        TableMapReduceUtil.initTableMapperJob(sourceTable, // input table
                scan, // Scan instance to control CF and attribute selection
                ReadUserMapper.class, // mapper class
                Text.class, // mapper output key
                Put.class, // mapper output value
                job);

        // set reducer and output
        TableMapReduceUtil.initTableReducerJob(targetTable, // output table
                WriteBasicReducer.class, // reducer class
                job);
        job.setNumReduceTasks(1); // 设置Reduce个数 at least one, adjust as required

        // 提交 submit job
        Boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        // get configuration
        Configuration configuration = HBaseConfiguration.create();

        // submit job 提交job
        int status = ToolRunner.run(configuration, new Table_user2basic(), args);

        // exit program 结束程序
        System.exit(status);

    }
}
    原文作者:MapReduce
    原文地址: https://www.cnblogs.com/cenzhongman/p/7367696.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞