1 <dependencies>
2 <dependency>
3 <groupId>org.apache.hbase</groupId>
4 <artifactId>hbase-client</artifactId>
5 <version>2.0.2</version>
6 </dependency>
7 <dependency>
8 <groupId>org.apache.hbase</groupId>
9 <artifactId>hbase-server</artifactId>
10 <version>2.0.2</version>
11 </dependency>
12 <dependency>
13 <groupId>org.apache.hbase</groupId>
14 <artifactId>hbase-mapreduce</artifactId>
15 <version>2.0.2</version>
16 </dependency>
17 </dependencies>
Mappper
1 package cn.hbase.mapreduce.hdfs; 2
3 import java.io.IOException; 4 import java.nio.ByteBuffer; 5 import java.util.ArrayList; 6 import java.util.HashMap; 7 import java.util.Iterator; 8 import java.util.List; 9 import java.util.Map; 10 import java.util.Map.Entry; 11 import java.util.Set; 12
13 import org.apache.hadoop.hbase.client.Put; 14 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 15 import org.apache.hadoop.hbase.util.Bytes; 16 import org.apache.hadoop.io.LongWritable; 17 import org.apache.hadoop.io.Text; 18 import org.apache.hadoop.mapreduce.Mapper; 19
20 /**
21 * 22 * @author Tele 输入key hdfs上的文本的行号 输入value 文本 输出key 行键 输出value 将插入hbase的一行数据,需要行键 23 * 24 */
25
26 public class ReadFruitFromHdfsMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 27
28 @Override 29 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 30 // 读取
31 String line = value.toString(); 32
33 // 切割
34 /**
35 * 1001 apple red 1002 pear yellow 1003 pineapple yellow 36 */
37 String[] fields = line.split("\t"); 38
39 // 每个列族对应多个列
40 Map<String, Object> map = new HashMap<String, Object>(); 41
42 // 封装列族下需要的列
43 List<String> infoCNList = new ArrayList<String>(); 44 infoCNList.add("name");// 值对应field[1]
45 infoCNList.add("color");// 值对应field[2]
46 map.put("info", infoCNList); 47
48 String row = fields[0]; 49
50 // 封装
51 Put put = new Put(Bytes.toBytes(row)); 52
53 // 遍历map,封装每个列族下的列
54 Set<Entry<String, Object>> entrySet = map.entrySet(); 55 Iterator<Entry<String, Object>> iterator = entrySet.iterator(); 56 while (iterator.hasNext()) { 57 Entry<String, Object> entry = iterator.next(); 58 String cf = entry.getKey(); 59 List<String> cnList = (List<String>) entry.getValue(); 60
61 // 遍历list
62 for (int i = 0; i < cnList.size(); i++) { 63 put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cnList.get(i)), Bytes.toBytes(fields[i + 1])); 64 } 65 } 66
67 // 行键
68 ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(fields[0])); 69
70 // 写出
71 context.write(immutableBytesWritable, put); 72
73 } 74
75 }
Reducer
1 package cn.hbase.mapreduce.hdfs; 2
3 import java.io.IOException; 4
5 import org.apache.hadoop.hbase.client.Mutation; 6 import org.apache.hadoop.hbase.client.Put; 7 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 8 import org.apache.hadoop.hbase.mapreduce.TableReducer; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.mapreduce.Reducer; 11
12 /**
13 * 14 *@author Tele 15 * 16 *对hbase上的表操作,继承tablereducer即可 17 * 18 */
19
20 public class WriteFruitReducer extends TableReducer<ImmutableBytesWritable,Put,NullWritable> { 21
22 @Override 23 protected void reduce(ImmutableBytesWritable key, Iterable<Put> value,Context context) throws IOException, InterruptedException { 24 for (Put put : value) { 25 context.write(NullWritable.get(), put); 26 } 27 } 28
29 }
Runner
1 package cn.hbase.mapreduce.hdfs; 2
3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.util.Tool; 13 import org.apache.hadoop.util.ToolRunner; 14
15 /**
16 * 17 * @author Tele 18 * 19 */
20
21 public class FruitRunner extends Configured implements Tool { 22
23 public int run(String[] args) throws Exception { 24 // 实例化job
25 Job job = Job.getInstance(this.getConf()); 26
27 // 设置jar包路径
28 job.setJarByClass(FruitRunner.class); 29
30 // 组装mapper
31 job.setMapperClass(ReadFruitFromHdfsMapper.class); 32 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 33 job.setMapOutputValueClass(Put.class); 34
35 // 设置数据来源
36 FileInputFormat.addInputPath(job, new Path("/input_fruit")); 37
38 // 组装reducer
39 TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitReducer.class, job); 40
41 // 设置reduce个数
42 job.setNumReduceTasks(1); 43
44 // 提交
45
46 return job.waitForCompletion(true) ? 0 : 1; 47 } 48
49 public static void main(String[] args) throws Exception { 50 Configuration conf = HBaseConfiguration.create(); 51 ToolRunner.run(new FruitRunner(), args); 52
53 } 54
55 }
ps:需要预先创建表