Hbase Get 请求源码分析

HBase Get请求源码分析

这儿看的client是包 hbase-client-1.2.0-cdh5.11.1.jar
先回顾一下HBase get 请求的流程

  1. 请求ZK 获得hbase:meta表所在regionserver,这个版本已经没有-root-表了代码中有写
  2. 请求hbase:meta所在的regionserver 获得 表table,row所在的regionserver 信息 这一步会缓存hbase:meta信息
  3. 向对应的包含table,row的的regisonserver 发起请求 这一步会缓存对应table和row的信息
private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
    // if we are changing settings to the get, clone it.
    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
      get = ReflectionUtils.newInstance(get.getClass(), get);
      get.setCheckExistenceOnly(checkExistenceOnly);
      if (get.getConsistency() == null){
        get.setConsistency(defaultConsistency);
      }
    }

    if (get.getConsistency() == Consistency.STRONG) {
      // Good old call.
      final Get getReq = get;
      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
          getName(), get.getRow()) {
        @Override
        public Result call(int callTimeout) throws IOException {
          ClientProtos.GetRequest request =
            RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
          controller.setPriority(tableName);
          controller.setCallTimeout(callTimeout);
          try {
            ClientProtos.GetResponse response = getStub().get(controller, request);
            if (response == null) return null;
            return ProtobufUtil.toResult(response.getResult());
          } catch (ServiceException se) {
            throw ProtobufUtil.getRemoteException(se);
          }
        }
      };
      return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
          this.operationTimeout);
    }

首先get的Consistency默认是STRONG,HBASE一致性可以参考 (https://www.w3cschool.cn/hbase_doc/hbase_doc-rm8b2ruf.html
也可以参考这个
https://yq.aliyun.com/articles/573730
然后新创建一个RegionServerCallable,call里面是回调函数,暂时不做分析M.然后rpcCallerFactory调用callWithRetries。
然后看下这个方法

public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
  throws IOException, RuntimeException {
    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
      new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
    this.globalStartTime = EnvironmentEdgeManager.currentTime();
    context.clear();
    for (int tries = 0;; tries++) {
      long expectedSleep;
      try {
        callable.prepare(tries != 0); // if called with false, check table status on ZK
        interceptor.intercept(context.prepare(callable, tries));
        return callable.call(getTimeout(callTimeout));
      } catch (PreemptiveFastFailException e) {
        throw e;
      } catch (Throwable t) {
        ExceptionUtil.rethrowIfInterrupt(t);
        if (tries > startLogErrorsCnt) {
          LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
              (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
              + "cancelled=" + cancelled.get() + ", msg="
              + callable.getExceptionMessageAdditionalDetail());
        }

        // translateException throws exception when should not retry: i.e. when request is bad.
        interceptor.handleFailure(context, t);
        t = translateException(t);
        callable.throwable(t, retries != 1);
        RetriesExhaustedException.ThrowableWithExtraContext qt =
            new RetriesExhaustedException.ThrowableWithExtraContext(t,
                EnvironmentEdgeManager.currentTime(), toString());
        exceptions.add(qt);
        if (tries >= retries - 1) {
          throw new RetriesExhaustedException(tries, exceptions);
        }
        // If the server is dead, we need to wait a little before retrying, to give
        //  a chance to the regions to be
        // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
        expectedSleep = callable.sleep(pause, tries + 1);

        // If, after the planned sleep, there won't be enough time left, we stop now.
        long duration = singleCallDuration(expectedSleep);
        if (duration > callTimeout) {
          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
              ": " + callable.getExceptionMessageAdditionalDetail();
          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
        }
      } finally {
        interceptor.updateFailureInfo(context);
      }
      try {
        if (expectedSleep > 0) {
          synchronized (cancelled) {
            if (cancelled.get()) return null;
            cancelled.wait(expectedSleep);
          }
        }
        if (cancelled.get()) return null;
      } catch (InterruptedException e) {
        throw new InterruptedIOException("Interrupted after " + tries + " tries  on " + retries);
      }
    }
  }

这是一个进行多次重试的循环,如果调用超时或者重试次数超过都会往上抛出异常,然后我们看下主要调用

callable.prepare(tries != 0); // if called with false, check table status on ZK
interceptor.intercept(context.prepare(callable, tries));
return callable.call(getTimeout(callTimeout));

查看RegionServer.prepare代码,主要功能就是遍历的去查找这表的这个行所在的主机和region的位置。

@Override
  public void prepare(final boolean reload) throws IOException {
    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
      this.location = regionLocator.getRegionLocation(row, reload);
    }
    if (this.location == null) {
      throw new IOException("Failed to find location, tableName=" + tableName +
        ", row=" + Bytes.toString(row) + ", reload=" + reload);
    }
    setStub(getConnection().getClient(this.location.getServerName()));
  }

最后多层调用到
org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation.locateRegion(org.apache.hadoop.hbase.TableName, byte[], boolean, boolean, int)

 @Override
    public RegionLocations locateRegion(final TableName tableName,
      final byte [] row, boolean useCache, boolean retry, int replicaId)
    throws IOException {
      if (this.closed) throw new IOException(toString() + " closed");
      if (tableName== null || tableName.getName().length == 0) {
        throw new IllegalArgumentException(
            "table name cannot be null or zero length");
      }
      if (tableName.equals(TableName.META_TABLE_NAME)) {
        if (useMetaReplicas) {
          return locateMeta(tableName, useCache, replicaId);
        } else {
          return this.registry.getMetaRegionLocation();
        }
      } else {
        // Region not in the cache - have to go to the meta RS
        return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
      }
    }

第一次调用是table是用户自定义的表,所以这会调用locateRegionInMeta方法,首先判断是否使用了cache,如果是的话,则会从缓存中加载region的信息。
根据传入的tablename和row生成一个key,这个key就是meta表中的key。生成的算法 类似如下的样子,tablename,,row.hash
session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7
然后发起ClientSmallReversedScanner 调用返回的结果如下

session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:regioninfo, timestamp=1534252868373, value={ENCODED => 3921e38c29c3e99f8d4b8f857447d2e7, NAME => 'session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7.
 .                                                       ', STARTKEY => '', ENDKEY => '2|8620485395|2'}
 session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:seqnumDuringOpen, timestamp=1534252868373, value=\x00\x00\x00\x00\x06Oq\xE5
 .
 session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:server, timestamp=1534252868373, value=hb3.hdp.cn:60020
 .
 session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:serverstartcode, timestamp=1534252868373, value=1531762503774

下面也是一个重试的循环,

 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
                   boolean useCache, boolean retry, int replicaId) throws IOException {

      // If we are supposed to be using the cache, look in the cache to see if
      // we already have the region.
      if (useCache) {
        RegionLocations locations = getCachedLocation(tableName, row);
        if (locations != null && locations.getRegionLocation(replicaId) != null) {
          return locations;
        }
      }

      // build the key of the meta region we should be looking for.
      // the extra 9's on the end are necessary to allow "exact" matches
      // without knowing the precise region names.
      byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);

      Scan s = null;
      if (useMetaReplicas) {
        // for CDH-5.0 compatibility, we are not going to use reverse scan
        // unless the user enabled meta replicas. In this case we know that
        // the server support reverse scan.
        s = new Scan();
        s.setReversed(true);
        s.setStartRow(metaKey);
        s.setSmall(true);
        s.setCaching(1);
        s.setConsistency(Consistency.TIMELINE);
      }

      int localNumRetries = (retry ? numTries : 1);

      for (int tries = 0; true; tries++) {
        if (tries >= localNumRetries) {
          throw new NoServerForRegionException("Unable to find region for "
              + Bytes.toStringBinary(row) + " in " + tableName +
              " after " + localNumRetries + " tries.");
        }
        if (useCache) {
          RegionLocations locations = getCachedLocation(tableName, row);
          if (locations != null && locations.getRegionLocation(replicaId) != null) {
            return locations;
          }
        } else {
          // If we are not supposed to be using the cache, delete any existing cached location
          // so it won't interfere.
          metaCache.clearCache(tableName, row);
        }

        RegionLocations metaLocation = null;
        // Query the meta region
        try {
          Result regionInfoRow = null;
          if (s != null) {
            ReversedClientScanner rcs = null;
            try {
              rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
                rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
              regionInfoRow = rcs.next();
            } finally {
              if (rcs != null) {
                rcs.close();
              }
            }
          }
          // if we are using meta replicas, we may end up with an empty row,
          // if the replica did not received the update yet.
          // In this case we fallback to the old method for safety.
          if (regionInfoRow == null) {
            // locate the meta region (compatible with versions that does not support reverse scan)
            metaLocation = locateRegion(TableName.META_TABLE_NAME, metaKey, false, false);
            // If null still, go around again.
            if (metaLocation == null) continue;

            ClientService.BlockingInterface service =
              getClient(metaLocation.getDefaultRegionLocation().getServerName());

            // This block guards against two threads trying to load the meta
            // region at the same time. The first will load the meta region and
            // the second will use the value that the first one found.
            if (useCache) {
              if (TableName.META_TABLE_NAME.equals(tableName) && getRegionCachePrefetch(tableName)) {
                // Check the cache again for a hit in case some other thread made the
                // same query while we were waiting on the lock.
                RegionLocations locations = getCachedLocation(tableName, row);
                if (locations != null && locations.getRegionLocation(replicaId) != null) {
                  return locations;
                }
                // If the parent table is META, we may want to pre-fetch some
                // region info into the global region cache for this table.
                prefetchRegionCache(tableName, row);
              }
              RegionLocations locations = getCachedLocation(tableName, row);
              if (locations != null && locations.getRegionLocation(replicaId) != null) {
                return locations;
              }
            } else {
              // If we are not supposed to be using the cache, delete any existing cached location
              // so it won't interfere.
              metaCache.clearCache(tableName, row);
            }

            // Query the meta region for the location of the meta region
            regionInfoRow = ProtobufUtil.getRowOrBefore(service,
              metaLocation.getDefaultRegionLocation().getRegionInfo().getRegionName(),
              metaKey, HConstants.CATALOG_FAMILY);
          }

          if (regionInfoRow == null) {
            throw new TableNotFoundException(tableName);
          }

          // convert the row result into the HRegionLocation we need!
          RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
          if (locations == null || locations.getRegionLocation(replicaId) == null) {
            throw new IOException("HRegionInfo was null in " +
              tableName + ", row=" + regionInfoRow);
          }
          HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
          if (regionInfo == null) {
            throw new IOException("HRegionInfo was null or empty in " +
              TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
          }

          // possible we got a region of a different table...
          if (!regionInfo.getTable().equals(tableName)) {
            throw new TableNotFoundException(
                  "Table '" + tableName + "' was not found, got: " +
                  regionInfo.getTable() + ".");
          }
          if (regionInfo.isSplit()) {
            throw new RegionOfflineException("the only available region for" +
              " the required row is a split parent," +
              " the daughters should be online soon: " +
              regionInfo.getRegionNameAsString());
          }
          if (regionInfo.isOffline()) {
            throw new RegionOfflineException("the region is offline, could" +
              " be caused by a disable table call: " +
              regionInfo.getRegionNameAsString());
          }

          ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
          if (serverName == null) {
            throw new NoServerForRegionException("No server address listed " +
              "in " + TableName.META_TABLE_NAME + " for region " +
              regionInfo.getRegionNameAsString() + " containing row " +
              Bytes.toStringBinary(row));
          }

          if (isDeadServer(serverName)){
            throw new RegionServerStoppedException("hbase:meta says the region "+
                regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
                ", but it is dead.");
          }
          // Instantiate the location
          cacheLocation(tableName, locations);
          return locations;
        } catch (TableNotFoundException e) {
          // if we got this error, probably means the table just plain doesn't
          // exist. rethrow the error immediately. this should always be coming
          // from the HTable constructor.
          throw e;
        } catch (IOException e) {
          ExceptionUtil.rethrowIfInterrupt(e);

          if (e instanceof RemoteException) {
            e = ((RemoteException)e).unwrapRemoteException();
          }
          if (tries < localNumRetries - 1) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("locateRegionInMeta parentTable=" +
                  TableName.META_TABLE_NAME + ", metaLocation=" + metaLocation +
                ", attempt=" + tries + " of " +
                localNumRetries + " failed; retrying after sleep of " +
                ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
            }
          } else {
            throw e;
          }
          // Only relocate the parent region if necessary
          if(!(e instanceof RegionOfflineException ||
              e instanceof NoServerForRegionException)) {
            relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
          }
        }
        try{
          Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
        } catch (InterruptedException e) {
          throw new InterruptedIOException("Giving up trying to location region in " +
            "meta: thread is interrupted.");
        }
      }
    }

因为metaReplicas是false(就是兼容 不支持reverse scan 版本))关于reverse sscan 官方介绍 ,所以这scanner是null,所以调用到
metaLocation = locateRegion(TableName.META_TABLE_NAME, metaKey, false, false);
看locateRegion代码,这次传入的是META_TABLE_NAME,所以会调用 locateMeta

 private RegionLocations locateMeta(final TableName tableName,
        boolean useCache, int replicaId) throws IOException {
      // HBASE-10785: We cache the location of the META itself, so that we are not overloading
      // zookeeper with one request for every region lookup. We cache the META with empty row
      // key in MetaCache.
      byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
      RegionLocations locations = null;
      if (useCache) {
        locations = getCachedLocation(tableName, metaCacheKey);
        if (locations != null && locations.getRegionLocation(replicaId) != null) {
          return locations;
        }
      }

      // only one thread should do the lookup.
      synchronized (metaRegionLock) {
        // Check the cache again for a hit in case some other thread made the
        // same query while we were waiting on the lock.
        if (useCache) {
          locations = getCachedLocation(tableName, metaCacheKey);
          if (locations != null && locations.getRegionLocation(replicaId) != null) {
            return locations;
          }
        }

        // Look up from zookeeper
        locations = this.registry.getMetaRegionLocation();
        if (locations != null) {
          cacheLocation(tableName, locations);
        }
      }
      return locations;
    }

首先这儿可以看到meta肯定是有缓存的,useCache,然后一个线程去请求,并且会重新检查一下cache,然后开始情趣zk获取meta的地址。
然后看一下 org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation

@Override
  public RegionLocations getMetaRegionLocation() throws IOException {
    ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();

    try {
      if (LOG.isTraceEnabled()) {
        LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
      }
      List<ServerName> servers = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout,
          hci.getConfiguration());
      if (LOG.isTraceEnabled()) {
        if (servers == null) {
          LOG.trace("Looked up meta region location, connection=" + this +
            "; servers = null");
        } else {
          StringBuilder str = new StringBuilder();
          for (ServerName s : servers) {
            str.append(s.toString());
            str.append(" ");
          }
          LOG.trace("Looked up meta region location, connection=" + this +
            "; servers = " + str.toString());
        }
      }
      if (servers == null) return null;
      HRegionLocation[] locs = new HRegionLocation[servers.size()];
      int i = 0;
      for (ServerName server : servers) {
        HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(
                HRegionInfo.FIRST_META_REGIONINFO, i);
        if (server == null) locs[i++] = null;
        else locs[i++] = new HRegionLocation(h, server, 0);
      }
      return new RegionLocations(locs);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    } finally {
      zkw.close();
    }
  }

返回的region location 如下
region=hbase:meta,,1.1588230740, hostname=tr4.hdp.cn,60021,1514197831557, seqNum=0
数据结构很明显,告诉了hbase:meta,表所在的机器信息 tr4.hdp.cn ,端口号60021等信息
然后查询meta的regionserver 获得region的信息
如下,

{ENCODED => c6b3becb3eddc91b4ae69d77c1c4eaf5, NAME => 'V3_UserProfile,22592009084298381312286811,1509355646476.c6b3becb3eddc91b4ae69d77c1c4eaf5.', STARTKEY => '22592009084298381312286811', ENDKEY => '34362040585555200000613694'}

这儿的结构也比较明显table sesssion,region,开始key和结束的key
然后转化为region的location对象。
获得location后,设置location

  @Override
  public void prepare(final boolean reload) throws IOException {
    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
      this.location = regionLocator.getRegionLocation(row, reload);
    }
    if (this.location == null) {
      throw new IOException("Failed to find location, tableName=" + tableName +
        ", row=" + Bytes.toString(row) + ", reload=" + reload);
    }
    setStub(getConnection().getClient(this.location.getServerName()));
  }

然后发起调用callWithRetries

    原文作者:pcqlegend
    原文地址: https://www.jianshu.com/p/5e121be1912b
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞