HBase之MapReduce
由于HBase2.0API
变化较大,HBase-MapReduce
也有所变化,本文以2.0API
为准做一些HBase
的MapReduce
操作。
首先是POM
文件:
简化版:HBAse2.1.1 + Hadoop2.7.7
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.edu.hadoop</groupId>
<artifactId>eduHadoop</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.7.7</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-endpoint</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-metrics-api</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-thrift</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-rest</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
详细版本:Hbase2.1.1 + Hadoop2 兼容 Hadoop3
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hbase-build-configuration</artifactId>
<groupId>org.apache.hbase</groupId>
<version>2.1.1</version>
<relativePath>../hbase-build-configuration</relativePath>
</parent>
<artifactId>hbase-examples</artifactId>
<name>Apache HBase - Examples</name>
<description>Examples of HBase usage</description>
<!--REMOVE-->
<build>
<plugins>
<plugin>
<!--Make it so assembly:single does nothing in here-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<skipAssembly>true</skipAssembly>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!-- Have to set the groups here because we only do
split tests in this package, so groups on live in this module -->
<groups>${surefire.firstPartGroups}</groups>
</configuration>
</plugin>
<!-- Make a jar and put the sources in the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.revelc.code</groupId>
<artifactId>warbucks-maven-plugin</artifactId>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<versionRange>[2.8,)</versionRange>
<goals>
<goal>build-classpath</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-miscellaneous</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase.thirdparty</groupId>
<artifactId>hbase-shaded-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-endpoint</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-thrift</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-metrics-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-rest</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<!-- Skip the tests in this module -->
<profile>
<id>skipExamplesTests</id>
<activation>
<property>
<name>skipExamplesTests</name>
</property>
</activation>
<properties>
<surefire.skipFirstPart>true</surefire.skipFirstPart>
<surefire.skipSecondPart>true</surefire.skipSecondPart>
</properties>
</profile>
<!-- Profiles for building against different hadoop versions -->
<!-- There are a lot of common dependencies used here, should investigate
if we can combine these profiles somehow -->
<!-- profile for building against Hadoop 2.x. This is the default -->
<profile>
<id>hadoop-2.0</id>
<activation>
<property>
<!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
<!--h2-->
<name>!hadoop.profile</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>create-mrapp-generated-classpath</id>
<phase>generate-test-resources</phase>
<goals>
<goal>build-classpath</goal>
</goals>
<configuration>
<!-- needed to run the unit test for DS to generate
the required classpath that is required in the env
of the launch container in the mini mr/yarn cluster
-->
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<!--
profile for building against Hadoop 3.0.x. Activate using:
mvn -Dhadoop.profile=3.0
-->
<profile>
<id>hadoop-3.0</id>
<activation>
<property>
<name>hadoop.profile</name>
<value>3.0</value>
</property>
</activation>
<properties>
<hadoop.version>3.0-SNAPSHOT</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>create-mrapp-generated-classpath</id>
<phase>generate-test-resources</phase>
<goals>
<goal>build-classpath</goal>
</goals>
<configuration>
<!-- needed to run the unit test for DS to generate
the required classpath that is required in the env
of the launch container in the mini mr/yarn cluster
-->
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
接下来进入正题
使用MapReduce将HDFS数据导入到HBase
package com.test;
import java.io.IOException;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class SampleUploader extends Configured implements Tool {
private static final String NAME = "SampleUploader";
static class Uploader
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private long checkpoint = 100;
private long count = 0;
@Override
public void map(LongWritable key, Text line, Context context)
throws IOException {
// Each map() is a single line, where the key is the line number
// 每一行的数据格式需遵守: row,family,qualifier,value
// Split CSV line
String [] values = line.toString().split(",");
if(values.length != 4) {
return;
}
// Extract each value
byte [] row = Bytes.toBytes(values[0]);
byte [] family = Bytes.toBytes(values[1]);
byte [] qualifier = Bytes.toBytes(values[2]);
byte [] value = Bytes.toBytes(values[3]);
// Create Put
Put put = new Put(row);
put.addColumn(family, qualifier, value);
// Uncomment below to disable WAL. This will improve performance but means
// you will experience data loss in the case of a RegionServer crash.
// put.setWriteToWAL(false);
try {
context.write(new ImmutableBytesWritable(row), put);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Set status every checkpoint lines
if(++count % checkpoint == 0) {
context.setStatus("Emitting Put " + count);
}
}
}
/**
* Job configuration.
*/
public static Job configureJob(Configuration conf, String[] args)
throws IOException {
Path inputPath = new Path(args[0]);
String tableName = args[1];
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(Uploader.class);
FileInputFormat.setInputPaths(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Uploader.class);
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
return job;
}
/**
* Main entry point.
*
* @param otherArgs The command line parameters after ToolRunner handles standard.
* @throws Exception When running the job fails.
*/
public int run(String[] otherArgs) throws Exception {
if(otherArgs.length != 2) {
System.err.println("Wrong number of arguments: " + otherArgs.length);
System.err.println("Usage: " + NAME + " <input> <tablename>");
return -1;
}
Configuration conf = getConf();
//在本地调试使用
conf.set("fs.defaultFS", "hdfs://192.168.0.10:9000");
conf.set("hbase.zookeeper.quorum", "192.168.0.10"); //hbase 服务地址
conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号
System.setProperty("HADOOP_USER_NAME", "root");
Job job = configureJob(conf, otherArgs);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
Scanner sc = new Scanner(System.in);
//两个参数一个是Hadoop 中文件的路径 一个是要传入数据的表名
String arg1 = sc.next();
String arg2 = sc.next();
int status = ToolRunner.run(HBaseConfiguration.create(), new SampleUploader(), new String[]{arg1,arg2});
System.exit(status);
}
}
将HBase表中的数据复制到另一张表且调换row和value
package com.test;
import java.io.IOException;
import java.util.Scanner;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class IndexBuilder extends Configured implements Tool {
/** the column family containing the indexed row key */
public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");
/** the qualifier containing the indexed row key */
public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");
/**
* Internal Mapper to be run by Hadoop.
*/
public static class Map extends
Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put> {
private byte[] family;
private TreeMap<byte[], ImmutableBytesWritable> indexes;
@Override
protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
throws IOException, InterruptedException {
for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
byte[] qualifier = index.getKey();
ImmutableBytesWritable tableName = index.getValue();
byte[] value = result.getValue(family, qualifier);
if (value != null) {
// original: row 123 attribute:phone 555-1212
// index: row 555-1212 INDEX:ROW 123
//将row 和value 换位置
Put put = new Put(value);
put.addColumn(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());
context.write(tableName, put);
}
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration configuration = context.getConfiguration();
String tableName = configuration.get("index.tablename");
String[] fields = configuration.getStrings("index.fields");
String familyName = configuration.get("index.familyname");
family = Bytes.toBytes(familyName);
String familystr = Bytes.toString(family);
indexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (String field : fields) {
// if the table is "people" and the field to index is "email", then the
// index table will be called "people-email"
indexes.put(Bytes.toBytes(field),
new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field)));
}
}
}
/**
* Job configuration.
*/
public static Job configureJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
String columnFamily = args[1];
System.out.println("****" + tableName);
conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(new Scan()));
conf.set(TableInputFormat.INPUT_TABLE, tableName);
conf.set("index.tablename", tableName);
conf.set("index.familyname", columnFamily);
String[] fields = new String[args.length - 2];
System.arraycopy(args, 2, fields, 0, fields.length);
conf.setStrings("index.fields", fields);
Job job = new Job(conf, tableName);
job.setJarByClass(IndexBuilder.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
return job;
}
public int run(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create(getConf());
conf.set("hbase.zookeeper.quorum", "192.168.0.10"); //hbase 服务地址
conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号
Scanner sc = new Scanner(System.in);
/*if(args.length < 3) {
//使用命令行参数
System.err.println("Only " + args.length + " arguments supplied, required: 3");
System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");
System.exit(-1);
}*/
String arg1 = sc.next();
String arg2 = sc.next();
String arg3 = sc.next();
Job job = configureJob(conf, new String[]{arg1,arg2,arg3});
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(HBaseConfiguration.create(), new IndexBuilder(), args);
System.exit(result);
}
}