HBase Get请求源码分析
这儿看的client是包 hbase-client-1.2.0-cdh5.11.1.jar
先回顾一下HBase get 请求的流程
- 请求ZK 获得hbase:meta表所在regionserver,这个版本已经没有-root-表了代码中有写
- 请求hbase:meta所在的regionserver 获得 表table,row所在的regionserver 信息 这一步会缓存hbase:meta信息
- 向对应的包含table,row的的regisonserver 发起请求 这一步会缓存对应table和row的信息
private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
// if we are changing settings to the get, clone it.
if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
get = ReflectionUtils.newInstance(get.getClass(), get);
get.setCheckExistenceOnly(checkExistenceOnly);
if (get.getConsistency() == null){
get.setConsistency(defaultConsistency);
}
}
if (get.getConsistency() == Consistency.STRONG) {
// Good old call.
final Get getReq = get;
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), get.getRow()) {
@Override
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
首先get的Consistency默认是STRONG,HBASE一致性可以参考 (https://www.w3cschool.cn/hbase_doc/hbase_doc-rm8b2ruf.html)
也可以参考这个
https://yq.aliyun.com/articles/573730
然后新创建一个RegionServerCallable,call里面是回调函数,暂时不做分析M.然后rpcCallerFactory调用callWithRetries。
然后看下这个方法
public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException {
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
this.globalStartTime = EnvironmentEdgeManager.currentTime();
context.clear();
for (int tries = 0;; tries++) {
long expectedSleep;
try {
callable.prepare(tries != 0); // if called with false, check table status on ZK
interceptor.intercept(context.prepare(callable, tries));
return callable.call(getTimeout(callTimeout));
} catch (PreemptiveFastFailException e) {
throw e;
} catch (Throwable t) {
ExceptionUtil.rethrowIfInterrupt(t);
if (tries > startLogErrorsCnt) {
LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
(EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
+ "cancelled=" + cancelled.get() + ", msg="
+ callable.getExceptionMessageAdditionalDetail());
}
// translateException throws exception when should not retry: i.e. when request is bad.
interceptor.handleFailure(context, t);
t = translateException(t);
callable.throwable(t, retries != 1);
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTime(), toString());
exceptions.add(qt);
if (tries >= retries - 1) {
throw new RetriesExhaustedException(tries, exceptions);
}
// If the server is dead, we need to wait a little before retrying, to give
// a chance to the regions to be
// tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
expectedSleep = callable.sleep(pause, tries + 1);
// If, after the planned sleep, there won't be enough time left, we stop now.
long duration = singleCallDuration(expectedSleep);
if (duration > callTimeout) {
String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
": " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
}
} finally {
interceptor.updateFailureInfo(context);
}
try {
if (expectedSleep > 0) {
synchronized (cancelled) {
if (cancelled.get()) return null;
cancelled.wait(expectedSleep);
}
}
if (cancelled.get()) return null;
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries);
}
}
}
这是一个进行多次重试的循环,如果调用超时或者重试次数超过都会往上抛出异常,然后我们看下主要调用
callable.prepare(tries != 0); // if called with false, check table status on ZK
interceptor.intercept(context.prepare(callable, tries));
return callable.call(getTimeout(callTimeout));
查看RegionServer.prepare代码,主要功能就是遍历的去查找这表的这个行所在的主机和region的位置。
@Override
public void prepare(final boolean reload) throws IOException {
try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
this.location = regionLocator.getRegionLocation(row, reload);
}
if (this.location == null) {
throw new IOException("Failed to find location, tableName=" + tableName +
", row=" + Bytes.toString(row) + ", reload=" + reload);
}
setStub(getConnection().getClient(this.location.getServerName()));
}
最后多层调用到
org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation.locateRegion(org.apache.hadoop.hbase.TableName, byte[], boolean, boolean, int)
@Override
public RegionLocations locateRegion(final TableName tableName,
final byte [] row, boolean useCache, boolean retry, int replicaId)
throws IOException {
if (this.closed) throw new IOException(toString() + " closed");
if (tableName== null || tableName.getName().length == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
}
if (tableName.equals(TableName.META_TABLE_NAME)) {
if (useMetaReplicas) {
return locateMeta(tableName, useCache, replicaId);
} else {
return this.registry.getMetaRegionLocation();
}
} else {
// Region not in the cache - have to go to the meta RS
return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
}
}
第一次调用是table是用户自定义的表,所以这会调用locateRegionInMeta方法,首先判断是否使用了cache,如果是的话,则会从缓存中加载region的信息。
根据传入的tablename和row生成一个key,这个key就是meta表中的key。生成的算法 类似如下的样子,tablename,,row.hash
session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7
然后发起ClientSmallReversedScanner 调用返回的结果如下
session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:regioninfo, timestamp=1534252868373, value={ENCODED => 3921e38c29c3e99f8d4b8f857447d2e7, NAME => 'session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7.
. ', STARTKEY => '', ENDKEY => '2|8620485395|2'}
session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:seqnumDuringOpen, timestamp=1534252868373, value=\x00\x00\x00\x00\x06Oq\xE5
.
session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:server, timestamp=1534252868373, value=hb3.hdp.cn:60020
.
session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:serverstartcode, timestamp=1534252868373, value=1531762503774
下面也是一个重试的循环,
private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
boolean useCache, boolean retry, int replicaId) throws IOException {
// If we are supposed to be using the cache, look in the cache to see if
// we already have the region.
if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
}
// build the key of the meta region we should be looking for.
// the extra 9's on the end are necessary to allow "exact" matches
// without knowing the precise region names.
byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
Scan s = null;
if (useMetaReplicas) {
// for CDH-5.0 compatibility, we are not going to use reverse scan
// unless the user enabled meta replicas. In this case we know that
// the server support reverse scan.
s = new Scan();
s.setReversed(true);
s.setStartRow(metaKey);
s.setSmall(true);
s.setCaching(1);
s.setConsistency(Consistency.TIMELINE);
}
int localNumRetries = (retry ? numTries : 1);
for (int tries = 0; true; tries++) {
if (tries >= localNumRetries) {
throw new NoServerForRegionException("Unable to find region for "
+ Bytes.toStringBinary(row) + " in " + tableName +
" after " + localNumRetries + " tries.");
}
if (useCache) {
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
} else {
// If we are not supposed to be using the cache, delete any existing cached location
// so it won't interfere.
metaCache.clearCache(tableName, row);
}
RegionLocations metaLocation = null;
// Query the meta region
try {
Result regionInfoRow = null;
if (s != null) {
ReversedClientScanner rcs = null;
try {
rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
regionInfoRow = rcs.next();
} finally {
if (rcs != null) {
rcs.close();
}
}
}
// if we are using meta replicas, we may end up with an empty row,
// if the replica did not received the update yet.
// In this case we fallback to the old method for safety.
if (regionInfoRow == null) {
// locate the meta region (compatible with versions that does not support reverse scan)
metaLocation = locateRegion(TableName.META_TABLE_NAME, metaKey, false, false);
// If null still, go around again.
if (metaLocation == null) continue;
ClientService.BlockingInterface service =
getClient(metaLocation.getDefaultRegionLocation().getServerName());
// This block guards against two threads trying to load the meta
// region at the same time. The first will load the meta region and
// the second will use the value that the first one found.
if (useCache) {
if (TableName.META_TABLE_NAME.equals(tableName) && getRegionCachePrefetch(tableName)) {
// Check the cache again for a hit in case some other thread made the
// same query while we were waiting on the lock.
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
// If the parent table is META, we may want to pre-fetch some
// region info into the global region cache for this table.
prefetchRegionCache(tableName, row);
}
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
} else {
// If we are not supposed to be using the cache, delete any existing cached location
// so it won't interfere.
metaCache.clearCache(tableName, row);
}
// Query the meta region for the location of the meta region
regionInfoRow = ProtobufUtil.getRowOrBefore(service,
metaLocation.getDefaultRegionLocation().getRegionInfo().getRegionName(),
metaKey, HConstants.CATALOG_FAMILY);
}
if (regionInfoRow == null) {
throw new TableNotFoundException(tableName);
}
// convert the row result into the HRegionLocation we need!
RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
if (locations == null || locations.getRegionLocation(replicaId) == null) {
throw new IOException("HRegionInfo was null in " +
tableName + ", row=" + regionInfoRow);
}
HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
if (regionInfo == null) {
throw new IOException("HRegionInfo was null or empty in " +
TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
}
// possible we got a region of a different table...
if (!regionInfo.getTable().equals(tableName)) {
throw new TableNotFoundException(
"Table '" + tableName + "' was not found, got: " +
regionInfo.getTable() + ".");
}
if (regionInfo.isSplit()) {
throw new RegionOfflineException("the only available region for" +
" the required row is a split parent," +
" the daughters should be online soon: " +
regionInfo.getRegionNameAsString());
}
if (regionInfo.isOffline()) {
throw new RegionOfflineException("the region is offline, could" +
" be caused by a disable table call: " +
regionInfo.getRegionNameAsString());
}
ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
if (serverName == null) {
throw new NoServerForRegionException("No server address listed " +
"in " + TableName.META_TABLE_NAME + " for region " +
regionInfo.getRegionNameAsString() + " containing row " +
Bytes.toStringBinary(row));
}
if (isDeadServer(serverName)){
throw new RegionServerStoppedException("hbase:meta says the region "+
regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
", but it is dead.");
}
// Instantiate the location
cacheLocation(tableName, locations);
return locations;
} catch (TableNotFoundException e) {
// if we got this error, probably means the table just plain doesn't
// exist. rethrow the error immediately. this should always be coming
// from the HTable constructor.
throw e;
} catch (IOException e) {
ExceptionUtil.rethrowIfInterrupt(e);
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
if (tries < localNumRetries - 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("locateRegionInMeta parentTable=" +
TableName.META_TABLE_NAME + ", metaLocation=" + metaLocation +
", attempt=" + tries + " of " +
localNumRetries + " failed; retrying after sleep of " +
ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
}
} else {
throw e;
}
// Only relocate the parent region if necessary
if(!(e instanceof RegionOfflineException ||
e instanceof NoServerForRegionException)) {
relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
}
}
try{
Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Giving up trying to location region in " +
"meta: thread is interrupted.");
}
}
}
因为metaReplicas是false(就是兼容 不支持reverse scan 版本))关于reverse sscan 官方介绍 ,所以这scanner是null,所以调用到
metaLocation = locateRegion(TableName.META_TABLE_NAME, metaKey, false, false);
看locateRegion代码,这次传入的是META_TABLE_NAME,所以会调用 locateMeta
private RegionLocations locateMeta(final TableName tableName,
boolean useCache, int replicaId) throws IOException {
// HBASE-10785: We cache the location of the META itself, so that we are not overloading
// zookeeper with one request for every region lookup. We cache the META with empty row
// key in MetaCache.
byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
RegionLocations locations = null;
if (useCache) {
locations = getCachedLocation(tableName, metaCacheKey);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
}
// only one thread should do the lookup.
synchronized (metaRegionLock) {
// Check the cache again for a hit in case some other thread made the
// same query while we were waiting on the lock.
if (useCache) {
locations = getCachedLocation(tableName, metaCacheKey);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
}
// Look up from zookeeper
locations = this.registry.getMetaRegionLocation();
if (locations != null) {
cacheLocation(tableName, locations);
}
}
return locations;
}
首先这儿可以看到meta肯定是有缓存的,useCache,然后一个线程去请求,并且会重新检查一下cache,然后开始情趣zk获取meta的地址。
然后看一下 org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation
@Override
public RegionLocations getMetaRegionLocation() throws IOException {
ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
}
List<ServerName> servers = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout,
hci.getConfiguration());
if (LOG.isTraceEnabled()) {
if (servers == null) {
LOG.trace("Looked up meta region location, connection=" + this +
"; servers = null");
} else {
StringBuilder str = new StringBuilder();
for (ServerName s : servers) {
str.append(s.toString());
str.append(" ");
}
LOG.trace("Looked up meta region location, connection=" + this +
"; servers = " + str.toString());
}
}
if (servers == null) return null;
HRegionLocation[] locs = new HRegionLocation[servers.size()];
int i = 0;
for (ServerName server : servers) {
HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(
HRegionInfo.FIRST_META_REGIONINFO, i);
if (server == null) locs[i++] = null;
else locs[i++] = new HRegionLocation(h, server, 0);
}
return new RegionLocations(locs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
zkw.close();
}
}
返回的region location 如下
region=hbase:meta,,1.1588230740, hostname=tr4.hdp.cn,60021,1514197831557, seqNum=0
数据结构很明显,告诉了hbase:meta,表所在的机器信息 tr4.hdp.cn ,端口号60021等信息
然后查询meta的regionserver 获得region的信息
如下,
{ENCODED => c6b3becb3eddc91b4ae69d77c1c4eaf5, NAME => 'V3_UserProfile,22592009084298381312286811,1509355646476.c6b3becb3eddc91b4ae69d77c1c4eaf5.', STARTKEY => '22592009084298381312286811', ENDKEY => '34362040585555200000613694'}
这儿的结构也比较明显table sesssion,region,开始key和结束的key
然后转化为region的location对象。
获得location后,设置location
@Override
public void prepare(final boolean reload) throws IOException {
try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
this.location = regionLocator.getRegionLocation(row, reload);
}
if (this.location == null) {
throw new IOException("Failed to find location, tableName=" + tableName +
", row=" + Bytes.toString(row) + ", reload=" + reload);
}
setStub(getConnection().getClient(this.location.getServerName()));
}
然后发起调用callWithRetries