HBase 2.0 MapReduce

HBase之MapReduce

由于HBase2.0API变化较大,HBase-MapReduce也有所变化,本文以2.0API为准做一些HBaseMapReduce操作。

首先是POM文件:

简化版:HBAse2.1.1 + Hadoop2.7.7

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.edu.hadoop</groupId>
    <artifactId>eduHadoop</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>2.7.7</hadoop.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.10.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-endpoint</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-metrics-api</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-thrift</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-rest</artifactId>
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

详细版本:Hbase2.1.1 + Hadoop2 兼容 Hadoop3

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <!--
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <artifactId>hbase-build-configuration</artifactId>
    <groupId>org.apache.hbase</groupId>
    <version>2.1.1</version>
    <relativePath>../hbase-build-configuration</relativePath>
  </parent>
  <artifactId>hbase-examples</artifactId>
  <name>Apache HBase - Examples</name>
  <description>Examples of HBase usage</description>
  <!--REMOVE-->
  <build>
    <plugins>
      <plugin>
        <!--Make it so assembly:single does nothing in here-->
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <skipAssembly>true</skipAssembly>
        </configuration>
      </plugin>
      <plugin>
        <artifactId>maven-surefire-plugin</artifactId>
        <configuration>
          <!-- Have to set the groups here because we only do
                    split tests in this package, so groups on live in this module -->
          <groups>${surefire.firstPartGroups}</groups>
        </configuration>
      </plugin>
      <!-- Make a jar and put the sources in the jar -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-source-plugin</artifactId>
      </plugin>
      <plugin>
        <groupId>org.xolstice.maven.plugins</groupId>
        <artifactId>protobuf-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>compile-protoc</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>net.revelc.code</groupId>
        <artifactId>warbucks-maven-plugin</artifactId>
      </plugin>
    </plugins>
    <pluginManagement>
      <plugins>
        <!--This plugin's configuration is used to store Eclipse m2e settings
             only. It has no influence on the Maven build itself.-->
        <plugin>
          <groupId>org.eclipse.m2e</groupId>
          <artifactId>lifecycle-mapping</artifactId>
          <configuration>
            <lifecycleMappingMetadata>
              <pluginExecutions>
                <pluginExecution>
                  <pluginExecutionFilter>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <versionRange>[2.8,)</versionRange>
                    <goals>
                      <goal>build-classpath</goal>
                    </goals>
                  </pluginExecutionFilter>
                  <action>
                    <ignore/>
                  </action>
                </pluginExecution>
              </pluginExecutions>
            </lifecycleMappingMetadata>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
  <dependencies>
    <dependency>
      <groupId>org.apache.hbase.thirdparty</groupId>
      <artifactId>hbase-shaded-miscellaneous</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase.thirdparty</groupId>
      <artifactId>hbase-shaded-netty</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-common</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-protocol</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-endpoint</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-thrift</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-metrics-api</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-testing-util</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.thrift</groupId>
      <artifactId>libthrift</artifactId>
    </dependency>
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
    </dependency>
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
    </dependency>
    <dependency>
      <groupId>com.github.stephenc.findbugs</groupId>
      <artifactId>findbugs-annotations</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-rest</artifactId>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <profiles>
    <!-- Skip the tests in this module -->
    <profile>
      <id>skipExamplesTests</id>
      <activation>
        <property>
          <name>skipExamplesTests</name>
        </property>
      </activation>
      <properties>
        <surefire.skipFirstPart>true</surefire.skipFirstPart>
        <surefire.skipSecondPart>true</surefire.skipSecondPart>
      </properties>
    </profile>
    <!-- Profiles for building against different hadoop versions -->
    <!-- There are a lot of common dependencies used here, should investigate
         if we can combine these profiles somehow -->
    <!-- profile for building against Hadoop 2.x.  This is the default -->
    <profile>
      <id>hadoop-2.0</id>
      <activation>
        <property>
          <!--Below formatting for dev-support/generate-hadoopX-poms.sh-->
          <!--h2-->
          <name>!hadoop.profile</name>
        </property>
      </activation>
      <dependencies>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
        </dependency>
      </dependencies>
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
              <execution>
                <id>create-mrapp-generated-classpath</id>
                <phase>generate-test-resources</phase>
                <goals>
                  <goal>build-classpath</goal>
                </goals>
                <configuration>
                  <!-- needed to run the unit test for DS to generate
                                 the required classpath that is required in the env
                                 of the launch container in the mini mr/yarn cluster
                                 -->
                  <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>
    <!--
       profile for building against Hadoop 3.0.x. Activate using:
        mvn -Dhadoop.profile=3.0
     -->
    <profile>
      <id>hadoop-3.0</id>
      <activation>
        <property>
          <name>hadoop.profile</name>
          <value>3.0</value>
        </property>
      </activation>
      <properties>
        <hadoop.version>3.0-SNAPSHOT</hadoop.version>
      </properties>
      <dependencies>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-minicluster</artifactId>
        </dependency>
      </dependencies>
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
              <execution>
                <id>create-mrapp-generated-classpath</id>
                <phase>generate-test-resources</phase>
                <goals>
                  <goal>build-classpath</goal>
                </goals>
                <configuration>
                  <!-- needed to run the unit test for DS to generate
                                 the required classpath that is required in the env
                                 of the launch container in the mini mr/yarn cluster
                                 -->
                  <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>
  </profiles>
</project>

接下来进入正题

使用MapReduce将HDFS数据导入到HBase
package com.test;

import java.io.IOException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class SampleUploader extends Configured implements Tool {

  private static final String NAME = "SampleUploader";

  static class Uploader
  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    private long checkpoint = 100;
    private long count = 0;

    @Override
    public void map(LongWritable key, Text line, Context context)
    throws IOException {

      // Each map() is a single line, where the key is the line number
      // 每一行的数据格式需遵守: row,family,qualifier,value

      // Split CSV line
      String [] values = line.toString().split(",");
      if(values.length != 4) {
        return;
      }

      // Extract each value
      byte [] row = Bytes.toBytes(values[0]);
      byte [] family = Bytes.toBytes(values[1]);
      byte [] qualifier = Bytes.toBytes(values[2]);
      byte [] value = Bytes.toBytes(values[3]);

      // Create Put
      Put put = new Put(row);
      put.addColumn(family, qualifier, value);

      // Uncomment below to disable WAL. This will improve performance but means
      // you will experience data loss in the case of a RegionServer crash.
      // put.setWriteToWAL(false);

      try {
        context.write(new ImmutableBytesWritable(row), put);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      // Set status every checkpoint lines
      if(++count % checkpoint == 0) {
        context.setStatus("Emitting Put " + count);
      }
    }
  }

  /**
   * Job configuration.
   */
  public static Job configureJob(Configuration conf, String[] args)
  throws IOException {
    Path inputPath = new Path(args[0]);
    String tableName = args[1];
    Job job = new Job(conf, NAME + "_" + tableName);
    job.setJarByClass(Uploader.class);
    FileInputFormat.setInputPaths(job, inputPath);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(Uploader.class);
    // No reducers.  Just write straight to table.  Call initTableReducerJob
    // because it sets up the TableOutputFormat.
    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
    job.setNumReduceTasks(0);
    return job;
  }

  /**
   * Main entry point.
   *
   * @param otherArgs  The command line parameters after ToolRunner handles standard.
   * @throws Exception When running the job fails.
   */
  public int run(String[] otherArgs) throws Exception {
    if(otherArgs.length != 2) {
      System.err.println("Wrong number of arguments: " + otherArgs.length);
      System.err.println("Usage: " + NAME + " <input> <tablename>");
      return -1;
    }
    Configuration conf = getConf();
    //在本地调试使用
    conf.set("fs.defaultFS", "hdfs://192.168.0.10:9000");
    conf.set("hbase.zookeeper.quorum", "192.168.0.10");  //hbase 服务地址
    conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号
    System.setProperty("HADOOP_USER_NAME", "root");
    Job job = configureJob(conf, otherArgs);
    return (job.waitForCompletion(true) ? 0 : 1);
  }

  public static void main(String[] args) throws Exception {
    Scanner sc = new Scanner(System.in);
  //两个参数一个是Hadoop 中文件的路径    一个是要传入数据的表名
    String arg1 = sc.next();
    String arg2 = sc.next();
    int status = ToolRunner.run(HBaseConfiguration.create(), new SampleUploader(), new String[]{arg1,arg2});
    System.exit(status);
  }
}

将HBase表中的数据复制到另一张表且调换row和value


package com.test;

import java.io.IOException;
import java.util.Scanner;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;


@InterfaceAudience.Private
public class IndexBuilder extends Configured implements Tool {
    /** the column family containing the indexed row key */
    public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");
    /** the qualifier containing the indexed row key */
    public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");

    /**
     * Internal Mapper to be run by Hadoop.
     */
    public static class Map extends
            Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put> {
        private byte[] family;
        private TreeMap<byte[], ImmutableBytesWritable> indexes;

        @Override
        protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
                throws IOException, InterruptedException {
            for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
                byte[] qualifier = index.getKey();
                ImmutableBytesWritable tableName = index.getValue();
                byte[] value = result.getValue(family, qualifier);
                if (value != null) {
                    // original: row 123 attribute:phone 555-1212
                    // index: row 555-1212 INDEX:ROW 123
                    //将row 和value 换位置
                    Put put = new Put(value);
                    put.addColumn(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());
                    context.write(tableName, put);
                }
            }
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            Configuration configuration = context.getConfiguration();
            String tableName = configuration.get("index.tablename");
            String[] fields = configuration.getStrings("index.fields");
            String familyName = configuration.get("index.familyname");
            family = Bytes.toBytes(familyName);
            String familystr = Bytes.toString(family);
            indexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);

            for (String field : fields) {
                // if the table is "people" and the field to index is "email", then the
                // index table will be called "people-email"
                indexes.put(Bytes.toBytes(field),
                        new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field)));
            }
        }
    }

    /**
     * Job configuration.
     */
    public static Job configureJob(Configuration conf, String[] args)
            throws IOException {
        String tableName = args[0];
        String columnFamily = args[1];
        System.out.println("****" + tableName);
        conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(new Scan()));
        conf.set(TableInputFormat.INPUT_TABLE, tableName);
        conf.set("index.tablename", tableName);
        conf.set("index.familyname", columnFamily);
        String[] fields = new String[args.length - 2];
        System.arraycopy(args, 2, fields, 0, fields.length);
        conf.setStrings("index.fields", fields);
        Job job = new Job(conf, tableName);
        job.setJarByClass(IndexBuilder.class);
        job.setMapperClass(Map.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(TableInputFormat.class);
        job.setOutputFormatClass(MultiTableOutputFormat.class);
        return job;
    }

    public int run(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create(getConf());
        conf.set("hbase.zookeeper.quorum", "192.168.0.10");  //hbase 服务地址
        conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号
        Scanner sc = new Scanner(System.in);
        /*if(args.length < 3) {
          //使用命令行参数
          System.err.println("Only " + args.length + " arguments supplied, required: 3");
          System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");
          System.exit(-1);
        }*/
        String arg1 = sc.next();
        String arg2 = sc.next();
        String arg3 = sc.next();

        Job job = configureJob(conf, new String[]{arg1,arg2,arg3});
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int result = ToolRunner.run(HBaseConfiguration.create(), new IndexBuilder(), args);
        System.exit(result);
    }
}

    原文作者:MasterXiao
    原文地址: https://www.jianshu.com/p/c8295c0ff2c6
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞