Hbase API 详解

启用Hbase

  1. 下载hbase,下载.tar.gz文件,不下载src.tar.gz文件
  2. 在安装hbase前需要安装java环境,hbase-env.sh文件,可以直接在头行加上
export JAVA_HOME=///
standalone
  1. conf/hbase-site.xml问hbase的主要配置文件,需要在文件内写明hbase的主目录,zookeeper的主目录。
<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>file:///home/testuser/hbase</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/testuser/zookeeper</value>
  </property>
  <property>
    <name>hbase.unsafe.stream.capability.enforce</name>
    <value>false</value>
    <description>
      Controls whether HBase will check for stream capabilities (hflush/hsync).

      Disable this if you intend to run on LocalFileSystem, denoted by a rootdir
      with the 'file://' scheme, but be mindful of the NOTE below.

      WARNING: Setting this to false blinds you to potential data loss and
      inconsistent system state in the event of process and/or node failures. If
      HBase is complaining of an inability to use hsync or hflush it's most
      likely not a false positive.
    </description>
  </property>
</configuration>

standalone模式下hbase的主目录配置为本地的目录‘file://’
使用./start-hbase.sh启动hbase可以在jps看到一个HMaster,一个HReginServer,以及Zookeeper的线程。

Pseudo-distributed

Pseudo-Distributed 伪分布模式,在standalone模式下所有的进程(HMaster,….)运行在一个JVM里

<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>

配置分布式,让每一个进程一个jvm,改变根目录位于hdfs

<property>
  <name>hbase.rootdir</name>
  <value>hdfs://localhost:8020/hbase</value>
</property>

同时移除 hbase.unsafe.stream.capability.enforce 属性或者设置为 true

HMaster控制着Hbase-cluster 启用./bin/local-master-backup.sh start 2 3 hbase Master的备用进程,2,3为偏移量,16012 16013为备用端口(源端口为16010)

local-regionservers.sh start 2 3 4 5

启动多个RegionServer

fully distribution

首先在完全分布下,要关闭防火墙 no firewall rule,出现no route to host问题,注意防火墙问题。同时要配置机器之间的免密访问。

  1. 在HMaster节点配置
<property>
  <name>hbase.zookeeper.quorum</name>
  <value>node-a.example.com,node-b.example.com,node-c.example.com</value>
</property>
<property>
  <name>hbase.zookeeper.property.dataDir</name>
  <value>/usr/local/zookeeper</value>
</property>

hbase 配置

Hbase 2.0 支持Jdk8,Hbase1.0支持Jdk7,8

  1. hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>example1,example2,example3</value>
    <description>The directory shared by RegionServers.
    </description>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/export/zookeeper</value>
    <description>Property from ZooKeeper config zoo.cfg.
    The directory where the snapshot is stored.
    </description>
  </property>
  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://example0:8020/hbase</value>
    <description>The directory shared by RegionServers.
    </description>
  </property>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
    <description>The mode the cluster will be in. Possible values are
      false: standalone and pseudo-distributed setups with managed ZooKeeper
      true: fully-distributed with unmanaged ZooKeeper Quorum (see hbase-env.sh)
    </description>
  </property>
</configuration>

配值zookeeper信息,配值hbase的根目录,集群是否开启

  1. 配值regionservers信息
    配值regionserver节点
  2. 配值hbase-env.sh
    配值java_home等前提条件信息

动态更改配置文件,不需要重启服务,直接在shell中 update_config and update_all_config即可更新配置文件。

hbase.ipc.server.fallback-to-simple-auth-allowed

hbase.cleaner.scan.dir.concurrent.size

hbase.regionserver.thread.compaction.large

hbase.regionserver.thread.compaction.small

hbase.regionserver.thread.split

hbase.regionserver.throughput.controller

hbase.regionserver.thread.hfilecleaner.throttle

hbase.regionserver.hfilecleaner.large.queue.size
...
...

hbase shell

可以使用linux命令调用hbase shell

echo "describe 'test'" | ./hbase shell -n > /dev/null 2>&1

HBase shell使用返回0成功命令的值的标准约定,以及失败命令的一些非零值。
Bash将命令的返回值存储在一个名为的特殊环境变量中$?。

#!/bin/bash

echo "describe 'test'" | ./hbase shell -n > /dev/null 2>&1
status=$?
echo "The status was " $status
if ($status == 0); then
    echo "The command succeeded"
else
    echo "The command may have failed."
fi
return $status

快速查看hbase的配置值

@shell.hbase.configuration.get("hbase.zookeeper.quorum")

也可以进行设置覆盖
同时可以编写简单代码,HBase Shell实际上是一个Ruby环境,因此您可以使用简单的Ruby脚本来算法计算拆分

hbase(main):021:0> import java.text.SimpleDateFormat
hbase(main):022:0> import java.text.ParsePosition
hbase(main):023:0> SimpleDateFormat.new(“yy / MM / dd HH:mm:ss”)。parse(“08/08/16 20:56:29”,ParsePosition.new(0))。 getTime()=> 1218920189000
走向另一个方向:

hbase(main):021:0> import java.util.Date
hbase(main):022:0> Date.new(1218920189000).toString()=>“Sat Aug 16 20:56:29 UTC 2008”

也可以开启debug开关

 ./bin/hbase shell -d
 debug <RETURN>
加快统计方法
count '<tablename>', CACHE => 1000

数据模型

概念视图中空白cell在物理上是不存储的,因为根本没有必要存储。因此若一个请求为要获取t8时间的contents:html,他的结果就是空。相似的,若请求为获取t9时间的anchor:my.look.ca,结果也是空。但是,如果不指明时间,将会返回最新时间的行,每个最新的都会返回。

在物理上,一个的列族成员在文件系统上都是存储在一起。因为存储优化都是针对列族级别的,这就意味着,一个colimn family的所有成员的是用相同的方式访问的.

尽量在你的应用中使用一个列族。只有你的所有查询操作只访问一个列族的时候,可以引入第二个和第三个列族.例如,你有两个列族,但你查询的时候总是访问其中的一个,从来不会两个一起访问。

namespace是类比与传统数据库中的数据库的,hbase命名空间和default命名空间。

删除会写一个逻辑删除,只有在下一次主要压缩运行后才会消失。假设你删除了所有内容⇐T。在此之后你做了一个带有时间戳⇐T的新put。这个put,即使它发生在删除之后,也会被删除墓碑掩盖

所有数据模型操作HBase按排序顺序返回数据。首先是行,然后是ColumnFamily,后面是列限定符,最后是时间戳(反向排序,因此首先返回最新的记录)。

表的架构设计

一张表目标存储1050G的数据,超过的建议存储HDFS,一个表13个列族,50~100个连续区间,Java堆应该是32GB(20G区域,128M存储器,其余默认值)

如果ColumnFamilyA有100万行而ColumnFamilyB有10亿行,则ColumnFamilyA的数据可能会分布在许多区域(和RegionServers)中。这使得ColumnFamilyA的大规模扫描效率降低。

ColumnFamilies可以设置TTL长度(以秒为单位),HBase将在到达到期时间后自动删除行。

如果时间范围非常广(例如,长达一年的报告)并且数据量很大,则汇总表是一种常用方法。这些将通过MapReduce作业生成到另一个表中。

配置hbase

hbase.ipc.server.callqueue.read.ratio(hbase.ipc.server.callqueue.read.share在0.98中)将呼叫队列分成读写队列:

0.5 意味着将有相同数量的读写队列

< 0.5 更多阅读而非写作

> 0.5 更多写入而非阅读
hbase mapReduce 读写示例
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,        // output table
  MyTableReducer.class,    // reducer class
  job);
job.setNumReduceTasks(1);   // at least one, adjust as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}
在此示例映射器中,将选择具有String-value的列作为要汇总的值。此值用作从映射器发出的键,而a IntWritable表示实例计数器。

public static class MyMapper extends TableMapper<Text, IntWritable>  {
  public static final byte[] CF = "cf".getBytes();
  public static final byte[] ATTR1 = "attr1".getBytes();

  private final IntWritable ONE = new IntWritable(1);
  private Text text = new Text();

  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    String val = new String(value.getValue(CF, ATTR1));
    text.set(val);     // we can only emit Writables...
    context.write(text, ONE);
  }
}
在reducer中,计算“ones”(就像执行此操作的任何其他MR示例一样),然后发出一个Put。

public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {
  public static final byte[] CF = "cf".getBytes();
  public static final byte[] COUNT = "count".getBytes();

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.add(CF, COUNT, Bytes.toBytes(i));

    context.write(null, put);
  }
}

hbase 安全

设置hbase.ssl.enabled请true,hbase将仅仅提供http服务

可以通过使用hbase-site.xml中的hbase.security.authentication.ui 属性配置SPNEGO来启用对HBase Web UI的Kerberos身份验证。启用此身份验证要求HBase还配置为对RPC使用Kerberos身份验证。
hbase.security.authentication kerberos

在运行Thrift网关的每个群集节点的hbase-site.xml中,将该属性设置hbase.thrift.security.qop为以下三个值之一:

privacy - 身份验证,完整性和机密性检查。

integrity - 身份验证和完整性检查

authentication - 仅验证身份验证

安全HBase需要安全的ZooKeeper和HDFS,以便用户无法访问和/或修改HBase下的元数据和数据。HBase使用HDFS(或配置文件系统)来保存其数据文件以及预写日志(WAL)和其他数据。HBase使用ZooKeeper存储一些操作元数据(主地址,表锁,恢复状态等)。

连接Hbase

Representational State Transfer (REST)

# Foreground
$ bin/hbase rest start -p <port>

# Background, logging to a file in $HBASE_LOGS_DIR
$ bin/hbase-daemon.sh start rest -p <port>

$ bin/hbase-daemon.sh stop rest

JAVA

public class HBaseExample {
  public static void main(String[] args) throws Exception {
    AbstractHBaseDBO dbo = new HBaseDBOImpl();

    //*drop if table is already exist.*
    if(dbo.isTableExist("user")){
     dbo.deleteTable("user");
    }

    //*create table*
    dbo.createTableIfNotExist("user",HBaseOrder.DESC,"account");
    //dbo.createTableIfNotExist("user",HBaseOrder.ASC,"account");

    //create index.
    String[] cols={"id","name"};
    dbo.addIndexExistingTable("user","account",cols);

    //insert
    InsertQuery insert = dbo.createInsertQuery("user");
    UserBean bean = new UserBean();
    bean.setFamily("account");
    bean.setAge(20);
    bean.setEmail("ncanis@gmail.com");
    bean.setId("ncanis");
    bean.setName("ncanis");
    bean.setPassword("1111");
    insert.insert(bean);

    //select 1 row
    SelectQuery select = dbo.createSelectQuery("user");
    UserBean resultBean = (UserBean)select.select(bean.getRow(),UserBean.class);

    // select column value.
    String value = (String)select.selectColumn(bean.getRow(),"account","id",String.class);

    // search with option (QSearch has EQUAL, NOT_EQUAL, LIKE)
    // select id,password,name,email from account where id='ncanis' limit startRow,20
    HBaseParam param = new HBaseParam();
    param.setPage(bean.getRow(),20);
    param.addColumn("id","password","name","email");
    param.addSearchOption("id","ncanis",QSearch.EQUAL);
    select.search("account", param, UserBean.class);

    // search column value is existing.
    boolean isExist = select.existColumnValue("account","id","ncanis".getBytes());

    // update password.
    UpdateQuery update = dbo.createUpdateQuery("user");
    Hashtable<String, byte[]> colsTable = new Hashtable<String, byte[]>();
    colsTable.put("password","2222".getBytes());
    update.update(bean.getRow(),"account",colsTable);

    //delete
    DeleteQuery delete = dbo.createDeleteQuery("user");
    delete.deleteRow(resultBean.getRow());

    ////////////////////////////////////
    // etc

    // HTable pool with apache commons pool
    // borrow and release. HBasePoolManager(maxActive, minIdle etc..)
    IndexedTable table = dbo.getPool().borrow("user");
    dbo.getPool().release(table);

    // upload bigFile by hadoop directly.
    HBaseBigFile bigFile = new HBaseBigFile();
    File file = new File("doc/movie.avi");
    FileInputStream fis = new FileInputStream(file);
    Path rootPath = new Path("/files/");
    String filename = "movie.avi";
    bigFile.uploadFile(rootPath,filename,fis,true);

    // receive file stream from hadoop.
    Path p = new Path(rootPath,filename);
    InputStream is = bigFile.path2Stream(p,4096);

  }
}

SCALA

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection,ConnectionFactory,HBaseAdmin,HTable,Put,Get}
import org.apache.hadoop.hbase.util.Bytes


val conf = new HBaseConfiguration()
val connection = ConnectionFactory.createConnection(conf);
val admin = connection.getAdmin();

// list the tables
val listtables=admin.listTables()
listtables.foreach(println)

// let's insert some data in 'mytable' and get the row

val table = new HTable(conf, "mytable")

val theput= new Put(Bytes.toBytes("rowkey1"))

theput.add(Bytes.toBytes("ids"),Bytes.toBytes("id1"),Bytes.toBytes("one"))
table.put(theput)

val theget= new Get(Bytes.toBytes("rowkey1"))
val result=table.get(theget)
val value=result.value()
println(Bytes.toString(value))

PYTHON

import java.lang
from org.apache.hadoop.hbase import HBaseConfiguration, HTableDescriptor, HColumnDescriptor, TableName
from org.apache.hadoop.hbase.client import Admin, Connection, ConnectionFactory, Get, Put, Result, Table
from org.apache.hadoop.conf import Configuration

# First get a conf object.  This will read in the configuration
# that is out in your hbase-*.xml files such as location of the
# hbase master node.
conf = HBaseConfiguration.create()
connection = ConnectionFactory.createConnection(conf)
admin = connection.getAdmin()

# Create a table named 'test' that has a column family
# named 'content'.
tableName = TableName.valueOf("test")
table = connection.getTable(tableName)

desc = HTableDescriptor(tableName)
desc.addFamily(HColumnDescriptor("content"))

# Drop and recreate if it exists
if admin.tableExists(tableName):
    admin.disableTable(tableName)
    admin.deleteTable(tableName)

admin.createTable(desc)

# Add content to 'column:' on a row named 'row_x'
row = 'row_x'
put = Put(row)
put.addColumn("content", "qual", "some content")
table.put(put)

# Now fetch the content just added, returns a byte[]
get = Get(row)

result = table.get(get)
data = java.lang.String(result.getValue("content", "qual"), "UTF8")

print "The fetched row contains the value '%s'" % data

hbase和spark

spark和hbase的基本集成

所有Spark和HBase集成的根源都是HBaseContext。HBaseContext接受HBase配置并将它们推送到Spark执行程序。这允许我们在静态位置为每个Spark Executor建立一个HBase连接。

scala

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

...

val hbaseContext = new HBaseContext(sc, config)

rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
 val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
 it.foreach((putRecord) => {
. val put = new Put(putRecord._1)
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
. bufferedMutator.mutate(put)
 })
 bufferedMutator.flush()
 bufferedMutator.close()
})

java

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

try {
  List<byte[]> list = new ArrayList<>();
  list.add(Bytes.toBytes("1"));
  ...
  list.add(Bytes.toBytes("5"));

  JavaRDD<byte[]> rdd = jsc.parallelize(list);
  Configuration conf = HBaseConfiguration.create();

  JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

  hbaseContext.foreachPartition(rdd,
      new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
   public void call(Tuple2<Iterator<byte[]>, Connection> t)
        throws Exception {
    Table table = t._2().getTable(TableName.valueOf(tableName));
    BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
    while (t._1().hasNext()) {
      byte[] b = t._1().next();
      Result r = table.get(new Get(b));
      if (r.getExists()) {
       mutator.mutate(new Put(b));
      }
    }

    mutator.flush();
    mutator.close();
    table.close();
   }
  });
} finally {
  jsc.stop();
}
spark Streaming
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))

val rdd1 = ...
val rdd2 = ...

val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
    Array[Byte], Array[Byte])])]]()

queue += rdd1
queue += rdd2

val dStream = ssc.queueStream(queue)

dStream.hbaseBulkPut(
  hbaseContext,
  TableName.valueOf(tableName),
  (putRecord) => {
   val put = new Put(putRecord._1)
   putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
   put
  })
bulk load
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      (Bytes.toBytes("3"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  t => {
   val rowKey = t._1
   val family:Array[Byte] = t._2(0)._1
   val qualifier = t._2(0)._2
   val value = t._2(0)._3

   val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

   Seq((keyFamilyQualifier, value)).iterator
  },
  stagingFolder.getPath)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
 conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
SparkSQL/DataFrames
def catalog = s"""{
       |"table":{"namespace":"default", "name":"table1"},
       |"rowkey":"key",
       |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
         |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
         |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
         |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
       |}
     |}""".stripMargin

def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.hadoop.hbase.spark")
  .load()
}
val df = withCatalog(catalog)
    原文作者:张晓天a
    原文地址: https://www.jianshu.com/p/457a3e257c0c
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞