Spark+Hbase 读取分片数据、深挖原理

大猪 见很多文章都写了Hbase如何设计rowkey避免热点问题,就连 大猪 的文章也写过这样的优化,但是只说到了优化的点上,那如何读取呢?刚才就有一位老朋友跟我说他的方案,他是做了16个预分区,然后就把16个分区的数据使用spark的union起来,组成16个RDD,牛批的孩子,看到他这么干,我得写篇文章出来探讨一下这个问题了。

Rowkey设计

在设计Hbase的rowkey的时候,我们往往会在高位上设置加上数字或者是Hash用来打散数据,特别是日志数据。
举个例子:

《Spark+Hbase 读取分片数据、深挖原理》

假设有8台RS,表创建的时候就要使用预分区,就像下面一样创建表。

《Spark+Hbase 读取分片数据、深挖原理》

实际的表生成rowkey范围就会像下面这样:

《Spark+Hbase 读取分片数据、深挖原理》

上面我们的三条就会根据rowkey前2个位自动选择分区
这样就达到打散的效果,热点问题就不会产生了。

但是:我们如何同一分钟的数据会打到不同的分区,我们不能预先知道数据在哪些分区,通过一个Scan是查不完的,必要把所有分区都查下遍,才知道分区中有没有我们想的数据。

《Spark+Hbase 读取分片数据、深挖原理》

Hbase给我们的 TableInputFormat API里面只有设置startend,那让我怎么去读取这些分区的数据?

我的那个老朋友的做法我已经猜到了他是怎么玩的了,因为他预分区是16个,就是0~f,也就是他去按照分区加start跟end去scan16次,每次得到一个RDD,再union起来就是他要的数据结果。

《Spark+Hbase 读取分片数据、深挖原理》

大概就是这样写,unionRdd是最后合成的一个大RDD,后面用来计算。
其实我的老朋友这样写其实也是可以的。我只想说,你真会玩。

《Spark+Hbase 读取分片数据、深挖原理》

不知道union在spark上,是会产生shuffle操作的么?

源码分析

来来来,我们来看一下TableInputFormat的源码到底是怎么处理读取Hbase的分区数据的:
我们看TableInputFormat类中,从getSplits => oneInputSplitPerRegion => 挖出这个方法

  1. 其实从前三句就可以看出来了,通admin去拿到hbase表的所有分片信息。
    返回的多个InputSplit对应上的就是Spark的多个分区,如果有Hbase16个分片就会有16个分区。

我们可以从NewHadoopRDD类中的getPartitions挖出来确实是这样子的:

《Spark+Hbase 读取分片数据、深挖原理》

  1. 看看oneInputSplitPerRegion方法上面注释重点的地方,其实就是你们写在程序conf配置上的 start = splitStartend = splitStop ,或者还有scan的各种过滤器等,再加的Hbase的 regionLocation 就组成一个分区查询了,我们的数据就是这么被Spark在每个分区上查出来的。

看到文章这里进度的小伙伴们,是不是已经想到怎么做了?

结论

既然的数据在分区上,我们重写TableInputFormat的getSplits获取分区就行了。

scala版本 TableInputFormat2

《Spark+Hbase 读取分片数据、深挖原理》

  1. 以上实现了10进制与16进制读取分区操作
  2. 也可以直接指定分区读取

要上scala的代码了

《Spark+Hbase 读取分片数据、深挖原理》

请看代码:

《Spark+Hbase 读取分片数据、深挖原理》

不会scala?

《Spark+Hbase 读取分片数据、深挖原理》 Spark 读取 Hbase 自定义读取分片数据

给你个Java版本的好了

《Spark+Hbase 读取分片数据、深挖原理》

Java程序

《Spark+Hbase 读取分片数据、深挖原理》

运行没错请告诉,反正我没运行过。

《Spark+Hbase 读取分片数据、深挖原理》
《Spark+Hbase 读取分片数据、深挖原理》

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注