编写MapReduce程序,集成HBase对表进行读取和写入数据

参考地址:http://hbase.apache.org/boot.html#mapreduce

导入:import org.apache.hadoop.conf.Configured;
导入:import org.apache.hadoop.util.Tool;。

《编写MapReduce程序,集成HBase对表进行读取和写入数据》

三要素

创建Mapper Class
创建Reducer Class
创建Driver

创建Mapper Class

在map方法中,代码思路步骤如下:
  • 获取rowkey:首先我们要得到rowkey来生成Put对象,通过String rowkey = (Bytes.toString(key.get());key是一个形参,类型是ImmutableBytesWritable, 当 集成时读HBase这张表的时候,得到的key就是rowkey。
  • 创建一个Put对象:Put put = new Put(key.get());然后用Cell对象来迭代Result对象的value.rawCells()来获取值,使用put.add(cell)组装好Put对象。判断列族是否相同,若相同判断name是否存在,判断age是否存在,最后使用put。
  • 最后使用context.write(mapOutPutKey,put )。
class内代码片段
//读取user表中的数据  ImmutableBytesWritable:key   Put:一列数据
    public static class ReadUserMapper extends TableMapper<ImmutableBytesWritable, Put> {

        @Override
        protected void map(ImmutableBytesWritable row, Result value,
                Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
                        throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            context.write(row, resultToPut(row, value));
        }
        //和命令:put 'user','10001','info:address','shanghai'  相同
        private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
            Put put = new Put(key.get());
            for (KeyValue kv : result.raw()) {
                put.add(kv);
            }
            return put;
        }
    }

创建Reducer Class

  • 迭代Iterable<Put>的值
  • 使用context.write(null,put)进行写
  • class内部代码段如下:
public static class WriteBasicReducer extends TableReducer<ImmutableBytesWritable, Put,ImmutableBytesWritable>{

        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
                Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Mutation>.Context context)
                        throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            for(Put put:values){
                context.write(key, put);
            }
        }
        
    }

创建Driver

  • 导入包
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  • 创建一个Job
Job job = Job.getInstance(getConf(), this.getClass().getName());
  • 设置Job运行的class
job.setJarByClass(this.getClass());
  • 设置Job
  • 创建Scan对象
Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don't set to true for MR jobs

Scan表示全表扫描,setCaching方法表示一次抓取多少条数据,而setCacheBlock 方法表示是否设置缓存,mapReduce千万不设置缓存,所以设置为false。

  • 设置input和mapper
    利用TableMapReduceUtil.initTableMapperJob()方法设置input对象和设置映射关系。
TableMapReduceUtil.initTableMapperJob(
                  "user",        // input HBase table name
                  scan,             // Scan instance to control CF and attribute selection
                  ReadUserMapper.class,   // mapper
                  Text.class,             // mapper output key
                  Put.class,             // mapper output value
                  job);
  • 设置reducer和output,利用TableMapReduceUtil的initTableReducerJob( )方法来设置输出对象和相应的类等信息。
    TableMapReduceUtil.initTableReducerJob(
                  "basic",      // output table
                  WriteBasicReducer.class,             // reducer class
                  job);

+设置Reduce的任务为1个

job.setNumReduceTasks(0);
  • 提交job
boolean b = job.waitForCompletion(true);
        
  • run()方法中的代码示例如下:
public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Job job = Job.getInstance(getConf(), this.getClass().getName());
        job.setJarByClass(this.getClass());
        
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don't set to true for MR jobs
        
        
        TableMapReduceUtil.initTableMapperJob(
                  "user",        // input HBase table name
                  scan,             // Scan instance to control CF and attribute selection
                  ReadUserMapper.class,   // mapper
                  Text.class,             // mapper output key
                  Put.class,             // mapper output value
                  job);
        
        
        TableMapReduceUtil.initTableReducerJob(
                  "basic",      // output table
                  WriteBasicReducer.class,             // reducer class
                  job);
        
        job.setNumReduceTasks(0);
        
        boolean b = job.waitForCompletion(true);
        
        return b?0:1;
    }

写main方法

  • 得到Configuration对象
Configuration configuration = HBaseConfiguration.create();
  • 运行Job
        int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args); 
  • 程序结束代码
System.exit(status);
  • 主函数代码如下:
public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args);
        System.exit(status);
    }

验证

  • 导出jar包

    《编写MapReduce程序,集成HBase对表进行读取和写入数据》
    《编写MapReduce程序,集成HBase对表进行读取和写入数据》

  • 导出上传到linux系统目录下

    《编写MapReduce程序,集成HBase对表进行读取和写入数据》

  • 然后导出jar包,运行该jar包,上传到FileZilla里面然后输入以下命令,验证mapreduce程序的正确性。
export HBASE_HOME=/opt/sofewares/hbase/hbase-0.98.6-hadoop2
export HADOOP_HOME=/opt/cdh5.3.6/hadoop-2.5.0-cdh5.3.6
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`  $HADOOP_HOME/bin/yarn jar  $HADOOP_HOME/jars/hbase-mr-user2basic.jar
  • 正确后,测试basic表格中的数据,得到以下结果,证明以上操作是正确的。

    《编写MapReduce程序,集成HBase对表进行读取和写入数据》 image.png

    原文作者:志辉聊码
    原文地址: https://www.jianshu.com/p/8dafc24c6537
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞