一、前言
在大数据领域,海量存储与快速检索方面HBase早已有了自己的一席之地。MapReduce计算框架早已对接了HBase,以HBase作为数据源,完成批量数据的读写。而Hive默认底层以MapReduce作为计算引擎,支持 以HBase作为外部表,通过HQL对HBase中的数据进行分析,Hive On HBase 也是很好的满足在某些场景下通过SQL对HBase表中的数据进行分析。
如今即MapReduce之后,Spark在大数据领域有着举足轻重的地位,无论是跑批,流处理,甚至图计算等都有它的用武之地。因此类似于Hive On HBase这种通过SQL的方式对HBase数据做交互式分析。Spark SQL On HBase成为不少用户的需求。而截至目前Spark并未提供已HBase最为数据源。
二、Spark SQL On HBase社区相关的进展
- hortonworks: Apache HBase Connector
- 华为: Fast SQL on HBase using SparkSQL
- cloudera: SparkOnHBase
三、如何使用Spark SQL On HBase
现在市面上的Spark对接HBase的方式多种多样,根据个人感觉,hortonworks公司的不错,因此本文选择hortonworks公司开源的对接方式。
以下是使用步骤:
- 编译源码
- 在源码中找到编译出来的jar,在提交作业时指定
- 在提交作业时,所使用的HBase jar,必须与编译源码时的HBase的版本对应
- 用过HBase用户去认证
- 通过命令行提交用用程序
以spark-shell为例,提交应用程序:
spark-shell –master yarn –jars shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar
在spark-shell中先导入相关包,并引用sqlContext的命令:
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.{DataFrame, SparkSession}
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
再按如下步骤依次执行:
- Define the catalog for the schema mapping:
def catalog = s"""{
|"table":{"namespace":"default","name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
- Prepare the data and populate the HBase table:
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte
)
object HBaseRecord {
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(
s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s”String$i: $t”,
i.toByte)
}
}
val data = (0 to 255).map { i => HBaseRecord(i, “extra”)}
sc.parallelize(data).
toDF.
write.
options(
Map(HBaseTableCatalog.tableCatalog -> catalog,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
注意:在spark-shell中使用粘贴模式(:paste)执行case class HBaseRecord
以及object HBaseRecord
。
- Load the DataFrame:
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val df = withCatalog(catalog)
- Language integrated query:
val s = df.filter((($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005"||
$"col0" === "row020"||
$"col0" === "r20"||
$"col0" <= "row005") &&
($"col4" === 1 ||
$"col4" === 42))
.select("col0", "col1", "col4")
s.show
- SQL query:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show