
HBase, MapReduce, and the CLASSPATH

$HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-VERSION.jar \ org.apache.hadoop.hbase.mapreduce.RowCounter usertable

MapReduece example

package com.beifeng.senior.hadoop.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class User2BasicMapReduce extends Configured implements Tool {
    // Mapper Class
    public static class ReadUserMapper extends TableMapper<Text, Put> {

        private Text mapOutputKey = new Text();

        public void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Put>.Context context)
                        throws IOException, InterruptedException {
            // get rowkey
            String rowkey = Bytes.toString(key.get());

            // set

            // --------------------------------------------------------
            Put put = new Put(key.get());

            // iterator
            for (Cell cell : value.rawCells()) {
                // add family : info
                if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                    // add column: name
                    if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                    // add column : age
                    if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {

            // context write
            context.write(mapOutputKey, put);


    // Reducer Class
    public static class WriteBasicReducer extends TableReducer<Text, Put, //
    ImmutableBytesWritable> {

        public void reduce(Text key, Iterable<Put> values,
                Reducer<Text, Put, ImmutableBytesWritable, Mutation>.Context context)
                        throws IOException, InterruptedException {
            for(Put put: values){
                context.write(null, put);


    // Driver
    public int run(String[] args) throws Exception {
        // create job
        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
        // set run job class
        // set job
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don't set to true for MR jobs
        // set other scan attrs

        // set input and set mapper
          "user",        // input table
          scan,               // Scan instance to control CF and attribute selection
          ReadUserMapper.class,     // mapper class
          Text.class,         // mapper output key
          Put.class,  // mapper output value
          job //
        // set reducer and output
          "basic",        // output table
          WriteBasicReducer.class,    // reducer class
        job.setNumReduceTasks(1);   // at least one, adjust as required
        // submit job
        boolean isSuccess = job.waitForCompletion(true) ;
        return isSuccess ? 0 : 1;
    public static void main(String[] args) throws Exception {
        // get configuration
        Configuration configuration = HBaseConfiguration.create();
        // submit job
        int status = ToolRunner.run(configuration,new User2BasicMapReduce(),args) ;
        // exit program

    原文地址: https://www.jianshu.com/p/da671b5ba574