需求:将HDFS上的文件中的数据导入到hbase中
实现上面的需求也有两种办法,一种是自定义mr,一种是使用hbase提供好的import工具
一、hdfs中的数据是这样的
hbase创建好表
create 'NNTB','info'
二、自定义mr
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; /** * 用于HDFS的数据读取,写入到hbase中, * hbase里预先创建好表:create 'NNTB','info' * */ public class HdfsToHBase { public static void main(String[] args) throws Exception{ System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.6");//这行我是本地运行所需指定的hadoop home Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "202.168.27.196:2181");//ip乱写的,端口默认2181 conf.set(TableOutputFormat.OUTPUT_TABLE, "NNTB"); Job job = Job.getInstance(conf, HdfsToHBase.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(HdfsToHBase.class); job.setMapperClass(HdfsToHBaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(HdfsToHBaseReducer.class); FileInputFormat.addInputPath(job, new Path("hdfs://202.168.27.196:9000/user/hadoop/gznt/gznt_bmda/*")); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); } public static class HdfsToHBaseMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); outKey.set(splits[0]); outValue.set(splits[1]+"\t"+splits[2]+"\t"+splits[3]+"\t"+splits[4]); context.write(outKey, outValue); } } //::: create 'NNTB','info' public static class HdfsToHBaseReducer extends TableReducer<Text, Text, NullWritable> { @Override protected void reduce(Text k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { Put put = new Put(k2.getBytes()); for (Text v2 : v2s) { String[] splis = v2.toString().split("\t"); //info,对应hbase列族名 if(splis[0]!=null && !"NULL".equals(splis[0])){ put.addColumn("info".getBytes(), "NodeCode".getBytes(),splis[0].getBytes()); } if(splis[1]!=null && !"NULL".equals(splis[1])){ put.addColumn("info".getBytes(), "NodeType".getBytes(),splis[1].getBytes()); } if(splis[2]!=null && !"NULL".equals(splis[2])){ put.addColumn("info".getBytes(), "NodeName".getBytes(),splis[2].getBytes()); } if(splis[3]!=null && !"NULL".equals(splis[3])){ put.addColumn("info".getBytes(), "IsWarehouse".getBytes(),splis[3].getBytes()); } } context.write(NullWritable.get(),put); } } }
参考自:HBase从hdfs导入数据
参考文献中的hbase导入工具介绍
(my_python_env)[root@hadoop26 ~]# hbase org.apache.hadoop.hbase.mapreduce.Import ERROR: Wrong number of arguments: 0 Usage: Import [options] <tablename> <inputdir> By default Import will load data directly into HBase. To instead generate HFiles of data to prepare for a bulk data load, pass the option: -Dimport.bulk.output=/path/for/output
在命令中中使用命令进行导入:
hbase org.apache.hadoop.hbase.mapreduce.Import table2 /t2