Hbase协处理器

一、协处理器的介绍

Hbase可以让用户的部分逻辑在数据存放端及Hbase服务端进行计算的机制(框架)。协处理器允许用户在Hbase服务端上运行自己的代码。

二、协处理器的分类

加载角度分类—— 系统协处理器、 表协处理器(用户可以指定某一张表使用协处理器 )
功能角度分类—— Observer协处理器(相当于关系型数据库中的触发器 )、Endpoint协处理器(动态终端,类似于一个存储过程 )

2.1Observer分类

RegionObserver协处理器——允许处理region上的事件
RegionServerObserver协处理器——处理RegionServer上的事件
MasterObserver协处理器——专门处理HMaster上的一些事件,比如创建删除表等操作。
WalObserver协处理器——允许处理日志上的事件

2.2 Observer的执行流程

《Hbase协处理器》 Observer的执行流程图.png

2.3Endpoint协处理器

实现代码被部署在HBase服务器服务端,需要自己写一个客户端,去调用服务端上的实现。

《Hbase协处理器》 Endpoint.png

三、演示endpoint服务端编写

1.创建endpoint.proto文件,生成java文件

option java_pakage = “com.jkb.coprocessor.endpoint”;
option java_outer_classname = “Sum”;  
option java_generic_service = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequence{
required string family = 1;
required string column = 2;
}
message SumResponse{
required int64 sum = 1 [default = 0]; 
}
//定义rpc服务器类的定义
service SumService{
rpc getSum(SumRequest)
returns (SumResponse);
}

2.执行命令,将.proto文件生成java代码
protoc endpoint.proto –java_out=./
3.将java文件放入eclipse中相应的工程中相应的目录下,然后加载Hbase中的jar包
4.编写SumEndPoint.java

Package com.jkb.coprocessor.endpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;

import edu.endpoint.Sum.SumRequest;
import edu.endpoint.Sum.SumResponse;
import edu.endpoint.Sum.SumService;

public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService{
       private RegionCoprocessorEnvironment env;

    @Override
    public void getSum(RpcController controller, SumRequest request,
            RpcCallback<SumResponse> done) {
       Scan scan = new Scan(); 
       scan.addFamily(Bytes.toBytes(request.getFamily()));  
     scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
       SumResponse response = null;
       InternalScanner scanner = null;
       try{
           scanner = env.getRegion().getScanner(scan);
           List<Cell> results = new ArrayList<Cell>(); 
           boolean hasMore = false;
           Long sum = 0L;
           do{
               hasMore = scanner.next(results);
               for(Cell cell: results){
                   sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));
               }
               results.clear();
           }while(hasMore);
           response = SumResponse.newBuilder().setSum(sum).build();
       }catch(IOException e){
           ResponseConverter.setControllerException(controller, e);
       }finally{
           if(scanner!=null){
               try {
                scanner.close();
            } catch (IOException e) {
            }
           }
       }
        done.run(response);
    }

    @Override
    public Service getService() {
        return this;
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        if(env instanceof RegionCoprocessorEnvironment){
            this.env = (RegionCoprocessorEnvironment)env;
        }else{
            throw new CoprocessorException("no load region");
        }

    }

    @Override
    public void stop(CoprocessorEnvironment arg0) throws IOException {

    }

}
    原文作者:ahzhaojj
    原文地址: https://www.jianshu.com/p/9646a5bb5469
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞