一、概述
HBase官方提供了基于Mapreduce的批量数据导入工具:Bulk load和ImportTsv。关于Bulk load大家能够看下我还有一篇博文。
通常HBase用户会使用HBase API导数,可是假设一次性导入大批量数据,可能占用大量Regionserver资源,影响存储在该Regionserver上其它表的查询,本文将会从源代码上解析ImportTsv数据导入工具。探究怎样高效导入数据到HBase。
二、ImportTsv介绍
ImportTsv是Hbase提供的一个命令行工具。能够将存储在HDFS上的自己定义分隔符(默认\t)的数据文件。通过一条命令方便的导入到HBase表中,对于大数据量导入很有用,当中包括两种方式将数据导入到HBase表中:
第一种是使用TableOutputformat在reduce中插入数据;
另外一种是先生成HFile格式的文件,再运行一个叫做CompleteBulkLoad的命令,将文件move到HBase表空间文件夹下。同一时候提供给client查询。
三、源代码解析
本文基于CDH5 HBase0.98.1,ImportTsv的入口类是org.apache.hadoop.hbase.mapreduce.ImportTsv
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); String columns[] = conf.getStrings(COLUMNS_CONF_KEY); if (hfileOutPath != null) { if (!admin.tableExists(tableName)) { LOG.warn(format("Table '%s' does not exist.", tableName)); // TODO: this is backwards. Instead of depending on the existence of a table, // create a sane splits file for HFileOutputFormat based on data sampling. createTable(admin, tableName, columns); } HTable table = new HTable(conf, tableName); job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); if (mapperClass.equals(TsvImporterTextMapper.class)) { job.setMapOutputValueClass(Text.class); job.setReducerClass(TextSortReducer.class); } else { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); } HFileOutputFormat.configureIncrementalLoad(job, table); } else { if (mapperClass.equals(TsvImporterTextMapper.class)) { usage(TsvImporterTextMapper.class.toString() + " should not be used for non bulkloading case. use " + TsvImporterMapper.class.toString() + " or custom mapper whose value type is Put."); System.exit(-1); } // No reducers. Just write straight to table. Call initTableReducerJob // to set up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); job.setNumReduceTasks(0); }
从ImportTsv.createSubmittableJob方法中推断參数BULK_OUTPUT_CONF_KEY開始。这步直接影响ImportTsv的Mapreduce作业终于以哪种方式入HBase库
假设不为空而且用户没有自己定义Mapper实现类(參数importtsv.mapper.class)时,则使用PutSortReducer,当中会对Put排序,假设每行记录有非常多column,则会占用Reducer大量的内存资源进行排序。
Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(TableOutputFormat.class);
假设为空,调用TableMapReduceUtil.initTableReducerJob初始化TableOutputformat的Reducer输出。此方式不须要使用Reducer,由于直接在mapper的Outputformat中会批量的调用Put API将数据提交到Regionserver上(相当于并行的运行HBase Put API)
四、实战
1、使用TableOutputformat的Put API上传数据,非bulk-loading
$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <hdfs-inputdir>
2、使用bulk-loading生成StoreFiles(HFile)
step1、生成Hfile
$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir <tablename> <hdfs-data-inputdir>
step2、完毕导入
$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
五、总结
在使用ImportTsv时。一定要注意參数importtsv.bulk.output的配置,通常来说使用Bulk output的方式对Regionserver来说更加友好一些,这样的方式载入数据差点儿不占用Regionserver的计算资源。由于仅仅是在HDFS上移动了HFile文件,然后通知HMaster将该Regionserver的一个或多个region上线。