hbase 从hdfs上读取数据到hbase中

 1 <dependencies>
 2     <dependency>
 3         <groupId>org.apache.hbase</groupId>
 4         <artifactId>hbase-client</artifactId>
 5         <version>2.0.2</version>
 6     </dependency>
 7     <dependency>
 8         <groupId>org.apache.hbase</groupId>
 9         <artifactId>hbase-server</artifactId>
10         <version>2.0.2</version>
11     </dependency>
12     <dependency>
13         <groupId>org.apache.hbase</groupId>
14         <artifactId>hbase-mapreduce</artifactId>
15         <version>2.0.2</version>
16     </dependency>
17   </dependencies>

Mappper

 1 package cn.hbase.mapreduce.hdfs;  2 
 3 import java.io.IOException;  4 import java.nio.ByteBuffer;  5 import java.util.ArrayList;  6 import java.util.HashMap;  7 import java.util.Iterator;  8 import java.util.List;  9 import java.util.Map; 10 import java.util.Map.Entry; 11 import java.util.Set; 12 
13 import org.apache.hadoop.hbase.client.Put; 14 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 15 import org.apache.hadoop.hbase.util.Bytes; 16 import org.apache.hadoop.io.LongWritable; 17 import org.apache.hadoop.io.Text; 18 import org.apache.hadoop.mapreduce.Mapper; 19 
20 /**
21  * 22  * @author Tele 输入key hdfs上的文本的行号 输入value 文本 输出key 行键 输出value 将插入hbase的一行数据,需要行键 23  * 24  */
25 
26 public class ReadFruitFromHdfsMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 27 
28  @Override 29     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 30         // 读取
31         String line = value.toString(); 32 
33         // 切割
34         /**
35  * 1001 apple red 1002 pear yellow 1003 pineapple yellow 36          */
37         String[] fields = line.split("\t"); 38 
39         // 每个列族对应多个列
40         Map<String, Object> map = new HashMap<String, Object>(); 41 
42         // 封装列族下需要的列
43         List<String> infoCNList = new ArrayList<String>(); 44         infoCNList.add("name");// 值对应field[1]
45         infoCNList.add("color");// 值对应field[2]
46         map.put("info", infoCNList); 47 
48         String row = fields[0]; 49 
50         // 封装
51         Put put = new Put(Bytes.toBytes(row)); 52 
53         // 遍历map,封装每个列族下的列
54         Set<Entry<String, Object>> entrySet = map.entrySet(); 55         Iterator<Entry<String, Object>> iterator = entrySet.iterator(); 56         while (iterator.hasNext()) { 57             Entry<String, Object> entry = iterator.next(); 58             String cf = entry.getKey(); 59             List<String> cnList = (List<String>) entry.getValue(); 60 
61             // 遍历list
62             for (int i = 0; i < cnList.size(); i++) { 63                 put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cnList.get(i)), Bytes.toBytes(fields[i + 1])); 64  } 65  } 66 
67         // 行键
68         ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(fields[0])); 69 
70         // 写出
71  context.write(immutableBytesWritable, put); 72 
73  } 74 
75 }

Reducer

 1 package cn.hbase.mapreduce.hdfs;  2 
 3 import java.io.IOException;  4 
 5 import org.apache.hadoop.hbase.client.Mutation;  6 import org.apache.hadoop.hbase.client.Put;  7 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  8 import org.apache.hadoop.hbase.mapreduce.TableReducer;  9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.mapreduce.Reducer; 11 
12 /** 
13  * 14  *@author Tele 15  * 16  *对hbase上的表操作,继承tablereducer即可 17  * 18  */
19 
20 public class WriteFruitReducer extends TableReducer<ImmutableBytesWritable,Put,NullWritable> { 21     
22  @Override 23     protected void reduce(ImmutableBytesWritable key, Iterable<Put> value,Context context) throws IOException, InterruptedException { 24         for (Put put : value) { 25  context.write(NullWritable.get(), put); 26  } 27  } 28     
29 }

Runner

 1 package cn.hbase.mapreduce.hdfs;  2 
 3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.conf.Configured;  5 import org.apache.hadoop.fs.Path;  6 import org.apache.hadoop.hbase.HBaseConfiguration;  7 import org.apache.hadoop.hbase.client.Put;  8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.util.Tool; 13 import org.apache.hadoop.util.ToolRunner; 14 
15 /**
16  * 17  * @author Tele 18  * 19  */
20 
21 public class FruitRunner extends Configured implements Tool { 22 
23     public int run(String[] args) throws Exception { 24         // 实例化job
25         Job job = Job.getInstance(this.getConf()); 26 
27         // 设置jar包路径
28         job.setJarByClass(FruitRunner.class); 29 
30         // 组装mapper
31         job.setMapperClass(ReadFruitFromHdfsMapper.class); 32         job.setMapOutputKeyClass(ImmutableBytesWritable.class); 33         job.setMapOutputValueClass(Put.class); 34 
35         // 设置数据来源
36         FileInputFormat.addInputPath(job, new Path("/input_fruit")); 37 
38         // 组装reducer
39         TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitReducer.class, job); 40 
41         // 设置reduce个数
42         job.setNumReduceTasks(1); 43 
44         // 提交
45 
46         return job.waitForCompletion(true) ? 0 : 1; 47  } 48 
49     public static void main(String[] args) throws Exception { 50         Configuration conf = HBaseConfiguration.create(); 51         ToolRunner.run(new FruitRunner(), args); 52 
53  } 54 
55 }

 ps:需要预先创建表

    原文作者:hbase
    原文地址: https://www.cnblogs.com/tele-share/p/9979569.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞