Hbase Scan 流程分析
公司在集群在从0.94.6升到0.98.6-cdh5.2.0后, 原来执行的hbase scan 任务出现很多问题.
表现在:
setBatch() 与filter 不兼容, 导致代码需要修改(删掉setBatch())
scan 效率变慢, 并且经常超时. mapper 报OutOfOrderException. 影响任务执行效率.
所以有必要在理解scan 流程的基础上, 进行优化.
从应用hbase角度来讲, 需要理解scan 几个配置. 包括setCaching() setBatch(), 以及scan的过程.
另外一个, 是过滤器如何工作. 在哪一步发挥作用.
1, Hbase MR
- Hbase MR 主流程代码
Configurationconf= getConf();
Scanscan= buildScan(conf);
//初始化认证
TableMapReduceUtil.initCredentials(job);
// run job
TableMapReduceUtil.initTableMapperJob(tableName,scan, HbaseSearchCheckerMapper.class, Text.class, Text.class,job);
- 创建Scan实例
Scanscan=newScan();
scan.setCaching(1000);
scan.setCacheBlocks(false);// no read cache
scan.addFamily(Bytes.toBytes("t"));
- 和普通MR程序一样定义mapper类 map 函数
public void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
String rowKey = null;
for (Cell kv : value.rawCells()) {
if(rowKey == null) {
rowKey = Bytes.toString(CellUtil.cloneRow(kv));
}
count ++;
}
}
以上是Hbase MR 程序. 非常简单.
但是在执行过程中, scan 实例的设置影响最后的性能. 一般情况下, 可以通过过滤器filter, 来优化scan.
所以有必要了解scan 的流程.
2, Scan 流程
Hbase 结构
Hbase 通过Region Server 管理数据
Hbase 作为列式数据库, 不同种类数据通过CF管理 [ 分为不同目录] ,数据保存在HFile中. 内存中数据在flush前保留在MemStore中.
Hbase RowKey 结构. rowKey 是有序的.
Scan client 流程
- Scan 类
hbase-client 下package:org.apache.hadoop.hbase.client
这个相当于是配置类, 作用是构造用户scan 的条件. 也就是用户buildScan时的配置.
比如用户需要scan的column, column会转化为如下结构.
Map> fams = scan.getFamilyMap();
注意下这俩函数.
publicScan addFamily(byte[]family) {
familyMap.remove(family);
familyMap.put(family,null);
returnthis;
}
publicScan addColumn(byte[]family,byte[]qualifier) {
NavigableSet set = familyMap.get(family);
if(set ==null) {
set =newTreeSet(Bytes.BYTES_COMPARATOR);
}
if(qualifier ==null) {
qualifier = HConstants.EMPTY_BYTE_ARRAY;
}
set.add(qualifier);
familyMap.put(family, set);
returnthis;
}
这里有个坑. 如果你调用addFamily, 然后在用addColumn, 那么显然addFamily等于是无效的. 需要注意.
- ClientScanner 类
这个类是封装了scan请求, 以及返回结构.
在构造这个类的时候, 调用nextScanner 构造
ScannerCallable callable = getScannerCallable(localStartKey, nbRows);
发起scan请求:callable.call():
this.scannerId = openScanner();
request = RequestConverter.buildScanRequest(scannerId, caching,false, nextCallSeq);
response = getStub().scan(controller, request);
// Results are returned via controller
CellScanner cellScanner = controller.cellScanner();
rrs = ResponseConverter.getResults(cellScanner, response);
client 发起scan请求, 并接受返回的结果.
Scan Server 流程
- Scanner 在server 结构
1), 请求到RegionServer 构造 RegionScanner
2), Region Server 管理一堆ColumnFamily. 构造StoreFileScanner, 包括MemStoreScanner
3),StoreFile 管理一堆HFile, 构造HFileScanner. 这个是实际读取数据的地方.
这里有一个问题. 过滤器是在哪一步来执行的?
- RegionSanner
实际在HRegion中, 通过RegionScannerImpl构造
HRegion:
protectedRegionScanner getScanner(Scan scan,
List additionalScanners)throwsIOException {
startRegionOperation(Operation.SCAN);
try{
// Verify families are all valid
prepareScanner(scan);
if(scan.hasFamilies()) {
for(byte[] family : scan.getFamilyMap().keySet()) {
checkFamily(family);
}
}
returninstantiateRegionScanner(scan, additionalScanners);
}finally{
closeRegionOperation(Operation.SCAN);
}
}
//instantiateRegionScanner 函数里:
returnnewRegionScannerImpl(scan, additionalScanners,this);
- StoreScanner 类:
RegionScannerImpl 构造StoreScanner:
for(Map.Entry> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(),this.readPt);
if(this.filter ==null|| !scan.doLoadColumnFamiliesOnDemand()
||this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
}else{
joinedScanners.add(scanner);
}
}
- Server Scan 处理流程
1), HRegionServer 获取到Scan的RPC请求
前面client提到:
response = getStub().scan(controller, request);
getStub() 返回的是ClientService.BlockingInterface
HRegionServer 的定义:
public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
所以HRegionServer 其实是继承自一个protobuf 类. 可以方便的交换数据. 并且定义了scan接口. 调用方式可能是通过序列化, 反射的方式来执行.这个后面再说.
2), scan 流程
// 初始化
region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
Scan scan = ProtobufUtil.toScan(protoScan);
region.prepareScanner(scan);
RegionScannerscanner = region.getScanner(scan);
// scan 函数:
region.startRegionOperation(Operation.SCAN);
while (i < rows) {
// Stop collecting results if maxScannerResultSize is set and we have exceeded it
if ((maxScannerResultSize < Long.MAX_VALUE) &&
(currentScanResultSize >= maxResultSize)) {
break;
}
// Collect values to be returned here
boolean moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
for (Cell cell : values) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
currentScanResultSize += kv.heapSize();
totalKvSize += kv.getLength();
}
results.add(Result.create(values));
i++;
}
if (!moreRows) {
break;
}
values.clear();
}
注意上面的rows 就是scan.setCaching(rows)的设置的.
核心的代码是 RegionScannerImp 实现的nextRaw函数
nextRaw 调用nextInternal 函数
while (true) {
// First, check if we are at a stop row. If so, there are no more results.
if (stopRow) {
if (filter != null && filter.hasFilterRow()) {
filter.filterRowCells(results);
}
return false;
}
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear();
continue;
}
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
length);
// Ok, we are good, let's try to get some results from the main heap.
if (nextKv == KV_LIMIT) {
if (this.filter != null && filter.hasFilterRow()) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
}
return true; // We hit the limit.
}
FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (filter != null && filter.hasFilterRow()) {
ret = filter.filterRowCellsWithRet(results);
}
if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
results.clear();
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!stopRow) continue;
return false;
}
} else {
// Populating from the joined heap was stopped by limits, populate some more.
populateFromJoinedHeap(results, limit);
}
}
// nextRow 代码:
protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
KeyValue next;
while ((next = this.storeHeap.peek()) != null &&
next.matchingRow(currentRow, offset, length)) {
this.storeHeap.next(MOCKED_LIST);
}
resetFilters();
// Calling the hook in CP which allows it to do a fast forward
return this.region.getCoprocessorHost() == null
|| this.region.getCoprocessorHost()
.postScannerFilterRow(this, currentRow, offset, length);
}
注意, 这里的limit 是setBatch(N)设置的.
也就是说, 这里的limit是不能设置的. 否则也会报错.
但是如果不设置, scan 会找到一个rowKey 一行所有的列. 对于某些较大的数据, 就会非常慢.
如上代码里, 过滤器在哪一步执行也非常明显了.
那么之前提到的StoreScanner 在哪里呢, 也就是真正去读HFile文件的地方呢?
答案来了:
// scanners 就是之前定义的CF scanner
this.storeHeap =newKeyValueHeap(scanners, region.comparator);
//this.storeHeap.peek():
我们知道, StoreScanner 下管理很多的HFile. 这相当于是一个多路归并拉数据的算法.
这块的调用比较复杂. 但是看StoreScanner下next() 函数
这里是个递归的调用:
LOOP: while((kv = this.heap.peek()) != null) {
if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
checkScanOrder(prevKV, kv, comparator);
prevKV = kv;
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
switch(qcode) {
case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW:
case INCLUDE_AND_SEEK_NEXT_COL:
this.countPerRow++;
if (storeLimit > -1 &&
this.countPerRow > (storeLimit + storeOffset)) {
// do what SEEK_NEXT_ROW does.
if (!matcher.moreRowsMayExistAfter(kv)) {
return false;
}
seekToNextRow(kv);
break LOOP;
}
// add to results only if we have skipped #storeOffset kvs
// also update metric accordingly
if (this.countPerRow > storeOffset) {
outResult.add(kv);
count++;
}
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (!matcher.moreRowsMayExistAfter(kv)) {
return false;
}
seekToNextRow(kv);
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
seekAsDirection(matcher.getKeyForNextColumn(kv));
} else {
this.heap.next();
}
if (limit > 0 && (count == limit)) {
break LOOP;
}
continue;
default:
throw new RuntimeException("UNEXPECTED");
}
}
注意storeLimit变量
java:
// set storeLimit
this.storeLimit = scan.getMaxResultsPerColumnFamily();
也就是, 在这里, 可以通过这个给函数. 在实际scan table的时候, 对那些column 非常多的行, 做过滤. 实际上不需要所有的行都读.
这样可以近似的加快数据的统计.