概述
HBase 是一款基于 Hadoop 的 key-value 数据库,它提供了对 HDFS 上数据的高效随机读写服务,完美地填补了 Hadoop MapReduce 仅适于批处理的缺陷,正在被越来越多的用户使用。作为 HBase 的一项重要特性,Coprocessor 在 HBase 0.92 版本中被加入,并广受欢迎
利用协处理器,用户可以编写运行在 HBase Server 端的代码。HBase 支持两种类型的协处理器,Endpoint 和 Observer。Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行,势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体的执行效率就会提高很多。
另外一种协处理器叫做 Observer Coprocessor,这种协处理器类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子,在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。
开发环境
- maven-3.3.9
- jdk 1.7
- cdh-hbase-1.2.0
- myeclipse 10
hbase协处理器加载
进入hbase命令行
# hbase shell
hbase(main):> disable 'test'
hbase(main):> alter 'test',CONFIGURATION => {'hbase.table.sanity.checks'=>'false'} //-----》建立表后,执行一次就行
hbase(main):> alter 'test','coprocessor'=>'hdfs:///code/jars/regionObserver-put5.jar|com.hbase.observer.App|1001' //----》加载jar包
hbase(main):> alter 'test', METHOD => 'table_att_unset', NAME => 'coprocessor$1' //--------》卸载jar包
hbase(main):> desc 'test' //-------》查看表的属性描述
hbase(main):> enable 'test'
完整工程代码
package com.hbase.observer;
/**
* hbase 二级索引
* @author wing
* @createTime 2017-4-7
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
public class App extends BaseRegionObserver {
private HTablePool pool = null;
private final static String SOURCE_TABLE = "test";
@Override
public void start(CoprocessorEnvironment env) throws IOException {
pool = new HTablePool(env.getConfiguration(), 10);
}
@Override
public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> c,
Get get, List<Cell> results) throws IOException {
HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
String newRowkey = Bytes.toString(get.getRow());
String pre = newRowkey.substring(0, 1);
if (pre.equals("t")) {
String[] splits = newRowkey.split("_");
String prepre = splits[0].substring(1, 3);
String timestamp = splits[0].substring(3);
String uid = splits[1];
String mid = "";
for (int i = 2; i < splits.length; i++) {
mid += splits[i];
mid += "_";
}
mid = mid.substring(0, mid.length() - 1);
String rowkey = prepre + uid + "_" + timestamp + "_" + mid;
System.out.println(rowkey);
Get realget = new Get(rowkey.getBytes());
Result result = table.get(realget);
List<Cell> cells = result.listCells();
results.clear();
for (Cell cell : cells) {
results.add(cell);
}
}
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
try {
String rowkey = Bytes.toString(put.getRow());
HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
String pre = rowkey.substring(0, 2);
if (pre.equals("aa") || pre.equals("ab") || pre.equals("ac")
|| pre.equals("ba") || pre.equals("bb") || pre.equals("bc")
|| pre.equals("ca") || pre.equals("cb") || pre.equals("cc")) {
String[] splits = rowkey.split("_");
String uid = splits[0].substring(2);
String timestamp = splits[1];
String mid = "";
for (int i = 2; i < splits.length; i++) {
mid += splits[i];
mid += "_";
}
mid = mid.substring(0, mid.length() - 1);
String newRowkey = "t" + pre + timestamp + "_" + uid + "_"
+ mid;
System.out.println(newRowkey);
Put indexput2 = new Put(newRowkey.getBytes());
indexput2.addColumn("relation".getBytes(),
"column10".getBytes(), "45".getBytes());
table.put(indexput2);
}
table.close();
} catch (Exception ex) {
}
}
@Override
public boolean postScannerNext(
ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s,
List<Result> results, int limit, boolean hasMore)
throws IOException {
HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
List<Result> newresults = new ArrayList<Result>();
for (Result result : results) {
String newRowkey = Bytes.toString(result.getRow());
String pre = newRowkey.substring(0, 1);
if (pre.equals("t")) {
String[] splits = newRowkey.split("_");
String prepre = splits[0].substring(1, 3);
String timestamp = splits[0].substring(3);
String uid = splits[1];
String mid = "";
for (int i = 2; i < splits.length; i++) {
mid += splits[i];
mid += "_";
}
mid = mid.substring(0, mid.length() - 1);
String rowkey = prepre + uid + "_" + timestamp + "_" + mid;
Get realget = new Get(rowkey.getBytes());
Result newresult = table.get(realget);
newresults.add(newresult);
}
}
results.clear();
for (Result result : newresults) {
results.add(result);
}
return hasMore;
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
pool.close();
}
}
通过maven工程打包后上传到hdfs相应目录,再通过命令加载jar包。
即可完成二级索引。
- 当用户put操作时,会将原rowkey,转换为新的rowkey,再存一份索引。
- 当用户get操作时,会将rowkey映射为实际的rowkey,再根据实际的rowkey获取实际的结果。
- 当用户执行scanner操作时,会将scanner的结果映射为实际rowkey的结果,返回给用户。
通过hbase的BaseRegionObserver 协处理器,可以封装处理很多hbase操作。
BaseRegionObserver的java接口(注意hbase版本)
https://hbase.apache.org/1.2/apidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html