HBase协处理器统计表数据量

1.Java代码实现

import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;

/**
* <p>
* 协处理器统计HBase表数据量
* </p>
* 
*/
public class HBaseRecordsCounter {

/**
* HBase API添加协处理器
* */
public static void addCoprocessor(Configuration conf, String tableName) {
try {

  byte[] tableNameBytes = Bytes.toBytes(tableName);
  HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
  HTableDescriptor htd = hbaseAdmin.getTableDescriptor(tableNameBytes);
  if (!htd.hasCoprocessor(AggregateImplementation.class.getName())) {
    hbaseAdmin.disableTable(tableNameBytes);
    htd.addCoprocessor(AggregateImplementation.class.getName());
    hbaseAdmin.modifyTable(tableNameBytes, htd);
    hbaseAdmin.enableTable(tableNameBytes);
  }

  hbaseAdmin.close();

} catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 统计表数量 * */
public static void exeCount(Configuration conf, String tableName, String family) { try {  // 使用hbase提供的聚合coprocessor
  AggregationClient aggregationClient = new AggregationClient(conf); Scan scan = new Scan();  // 指定扫描列族,唯一值
 scan.addFamily(Bytes.toBytes(family));  long start = System.currentTimeMillis();  long rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan); System.out .println("Row count: " + rowCount + "; time cost: " + (System.currentTimeMillis() - start) + "ms"); } catch (Throwable e) { e.printStackTrace(); } } public static void main(String[] args) { String tableName = "test"; Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "host1,host2,host3"); conf.set("hbase.rootdir", "hdfs://host:8020/hbase");  // 提高RPC通信时长
  conf.setLong("hbase.rpc.timeout", 600000);  // 设置Scan缓存
  conf.setLong("hbase.client.scanner.caching", 1000);  addCoprocessor(conf, tableName); exeCount(conf, tableName, "info"); } }

2. 启用协处理器

启用协处理器方法1.

启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site.xml这个文件来实现,只需要添加如下代码:

<property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property>

启用协处理器方法2.

hbase shell添加coprocessor:

disable ‘member’
alter ‘member’,METHOD => ‘table_att’,’coprocessor’ => ‘hdfs://master24:9000/user/hadoop/jars/test.jar|mycoprocessor.SampleCoprocessor|1001|’
enable ‘member’

hbase shell 删除coprocessor:

disable ‘member’
alter ‘member’,METHOD => ‘table_att_unset’,NAME =>’coprocessor$1′
enable ‘member’

    原文作者:hbase
    原文地址: https://www.cnblogs.com/warmingsun/p/4916606.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞