一直有一个问题,今天调查了一下源码算是明白了。
===问题===
通过java api(如下代码所示)在创建表的时候,可以通过setMemStoreFlushSize函数来指定memstore的大小,
在集群配置文件中,也可以通过配置hbase.hregion.memstore.flush.size来指定memstore大小。
这两个地方指定的memestore的有什么区别和关联?
★参考代码
package api; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.regionserver.BloomType; public class create_table_sample1 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("t1")); desc.setMemStoreFlushSize(2097152L); //2M(默认128M) desc.setMaxFileSize(10485760L); //10M(默认10G) HColumnDescriptor family1 = new HColumnDescriptor(constants.COLUMN_FAMILY_DF.getBytes()); family1.setTimeToLive(2 * 60 * 60 * 24); //过期时间 family1.setMaxVersions(2); //版本数 desc.addFamily(family1); HColumnDescriptor family2 = new HColumnDescriptor(constants.COLUMN_FAMILY_EX.getBytes()); family2.setTimeToLive(3 * 60 * 60 * 24); //过期时间 family2.setMinVersions(2); //最小版本数 family2.setMaxVersions(3); //版本数 family2.setBloomFilterType(BloomType.ROW); //布隆过滤方式 desc.addFamily(family2); admin.createTable(desc); admin.close(); connection.close(); } }
===解答===
源码位置:hbase-1.3.1\hbase-server\src\main\java\org\apache\hadoop\hbase\regionserver\
文件名:HRegion.java
函数名:setHTableSpecificConf
调用位置:HRegion类的构造函数
函数内容:
void setHTableSpecificConf() { if (this.htableDescriptor == null) return; long flushSize = this.htableDescriptor.getMemStoreFlushSize(); if (flushSize <= 0) { flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); } this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); }
从上面的源代码中可以得到如下结论:
1、HRegion(每个Table会分为很多个HRegion分布在不同的HRegionServer中)对象在创建时,会初始化memstoreFlushSize。
2、它的计算首先是由Table决定的,即每个表可以设定自己的memstoreFlushSize。
通过关键字MEMSTORE_FLUSHSIZE来设定,或通过HTableDescriptor类中的setMemStoreFlushSize()方法来设定。
3、如果表中未设定,则通过集群参数hbase.hregion.memstore.flush.size来初始化。
4、如果集群参数也未配置的话,则默认为1024*1024*128L,即128M。
所以,可以为不同的表配置不同的MemStore大小。需要在创建表的时候指定。
如果表未单独配置,则采用集群的统一配置。默认128M。
===扩展===
上面setHTableSpecificConf的源代码中,还进行了blockingMemStoreSize的初期化,这个参数是什么呢?
从代码中可以看到,这个参数来源于集群配置项hbase.hregion.memstore.block.multiplier。这个参数与hbase.hregion.memstore.flush.size息息相关。
参数作用:
当一个HRegion中的MemStore的总大小(包含多个Store)超过阈值后,会出发flush请求。
该参数是个倍数,表示一个HRegion的MemStore的总大小最大可以是“hbase.hregion.memstore.flush.size”的几倍。
如果超过这个值,则会阻塞该HRegion的写请求,等待flush。
HRegion.java中的put方法。调用了checkResources()
@Override public void put(Put put) throws IOException { checkReadOnly(); // Do a rough check that we have resources to accept a write. The check is // 'rough' in that between the resource check and the call to obtain a // read lock, resources may run out. For now, the thought is that this // will be extremely rare; we'll deal with it when it happens. checkResources(); startRegionOperation(Operation.PUT); try { // All edits for the given row (across all column families) must happen atomically. doBatchMutate(put); } finally { closeRegionOperation(Operation.PUT); } }
checkResources()方法内容如下:
/* * Check if resources to support an update. * * We throw RegionTooBusyException if above memstore limit * and expect client to retry using some kind of backoff */ private void checkResources() throws RegionTooBusyException { // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; if (this.memstoreSize.get() > this.blockingMemStoreSize) { blockedRequestsCount.increment(); requestFlush(); throw new RegionTooBusyException("Above memstore limit, " + "regionName=" + (this.getRegionInfo() == null ? "unknown" : this.getRegionInfo().getRegionNameAsString()) + ", server=" + (this.getRegionServerServices() == null ? "unknown" : this.getRegionServerServices().getServerName()) + ", memstoreSize=" + memstoreSize.get() + ", blockingMemStoreSize=" + blockingMemStoreSize); } }
–END–