启用kerberos的环境下在CDH中使用Spark SQL On HBase

一、前言

在大数据领域,海量存储与快速检索方面HBase早已有了自己的一席之地。MapReduce计算框架早已对接了HBase,以HBase作为数据源,完成批量数据的读写。而Hive默认底层以MapReduce作为计算引擎,支持 以HBase作为外部表,通过HQL对HBase中的数据进行分析,Hive On HBase 也是很好的满足在某些场景下通过SQL对HBase表中的数据进行分析。
如今即MapReduce之后,Spark在大数据领域有着举足轻重的地位,无论是跑批,流处理,甚至图计算等都有它的用武之地。因此类似于Hive On HBase这种通过SQL的方式对HBase数据做交互式分析。Spark SQL On HBase成为不少用户的需求。而截至目前Spark并未提供已HBase最为数据源。

二、Spark SQL On HBase社区相关的进展

三、如何使用Spark SQL On HBase

现在市面上的Spark对接HBase的方式多种多样,根据个人感觉,hortonworks公司的不错,因此本文选择hortonworks公司开源的对接方式。
以下是使用步骤:

  1. 编译源码
  2. 在源码中找到编译出来的jar,在提交作业时指定
  3. 在提交作业时,所使用的HBase jar,必须与编译源码时的HBase的版本对应
  4. 用过HBase用户去认证
  5. 通过命令行提交用用程序

以spark-shell为例,提交应用程序:
spark-shell –master yarn –jars shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar

在spark-shell中先导入相关包,并引用sqlContext的命令:
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.sql.{DataFrame, SparkSession}
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._

再按如下步骤依次执行:

  1. Define the catalog for the schema mapping:

def catalog = s"""{
|"table":{"namespace":"default","name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin

  1. Prepare the data and populate the HBase table:

case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte
)

object HBaseRecord {
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(
s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s”String$i: $t”,
i.toByte)
}
}

val data = (0 to 255).map { i => HBaseRecord(i, “extra”)}

sc.parallelize(data).
toDF.
write.
options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()

注意:在spark-shell中使用粘贴模式(:paste)执行case class HBaseRecord以及object HBaseRecord

  1. Load the DataFrame:

def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

val df = withCatalog(catalog)

  1. Language integrated query:

val s = df.filter((($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005"||
$"col0" === "row020"||
$"col0" === "r20"||
$"col0" <= "row005") &&
($"col4" === 1 ||
$"col4" === 42))
.select("col0", "col1", "col4")

s.show

  1. SQL query:

df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show

    原文作者:和腐朽说再见
    原文地址: https://www.jianshu.com/p/74a3365734ba
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞