001
Spark SQL 概述
Spark SQL is Apache Spark’s module for working with structured data.
集成
Spark Sql 可以使用 SQL 或者所熟悉的 DataFrame API 在 spark 程序中查询结构化的数据。
统一的数据访问
Spark Sql 提供了访问各种数据源的常用方法,包括 Hive,Avro,Parquet,ORC,JSON 和 JDBC。
兼容 Hive
Spark SQL 支持 HiveQL 语法以及 Hive SerDes 和 UDF,允许您访问现有的 Hive 仓库。
标准连接
Spark SQL 可以通过 JDBC 或 ODBC 连接外部的BI工具。
Spark SQL 架构
首先,Spark SQL 对外提供了多种访问方式,我们可以通过 Hive Hql、Spark 编程的方式(SQL 或者 DataFram/Dataset API)、Streaming SQL 的方式提交执行程序。
然后,会生成一个未完全解析的逻辑执行计划,再集合内部 schema 信息生生一个逻辑执行计划,最后再经过优化,最终生成一个优化后的逻辑执行计划,而这一切就是 Spark SQL 的核心 —— Catalyst 来完成。
最后,将优化后的逻辑执行计划交由 Spark Engine 来翻译执行我们提交的作业。
002
DataFrame&Dateset
DataFrame 产生背景:
DataFrame 并不是 Spark SQL 提出的,而是早起在 R/Pandas 语言就已经存在,但由于 R/Pandas 只能满足单机上的一些数据处理需求,无法完成一些大数据量的任务,但是 Spark SQL 作为 Spark 的模块,可以借助 Spark 的大数据处理性能,完全胜任这些大数据量的处理任务,另外由于 DataFrame 早期已存在,因此在编写 Spark SQL 程序的时候,无疑降低了不小的门槛。
DataFrame 概述:
以列的形式构成的分布式数据集,按照列赋予不同的名称(相当于加上了 schema 的 RDD)。可以看做是一个经过优化后的一个数据表(table)。
提供了类 SQL 的 API 如:select/filter/aggregation/where 操作结构化的数据。将 R/Pandas 处理小数据的经验复用到分布式的大数据上,应为它的灵感来自 R/Pandas。
DataFrame 基本 API:
- Create DataFrame
- printSchema
- show
- select
- filter
- …
样例数据(people.json):
{"name":"张三", "age":18, "sex":"man"}
{"name":"李四", "age":28, "sex":"female"}
{"name":"王五", "sex":"man"}
Spark SQL 编程:
val spark = SparkSession.builder().conf(sparkConf).getOrCreate()// 创建DataFrame 将json文件加载为DataFrameval peopleDF = spark.read.format("json").load("/data/people.json")// 输出Schema信息peopleDF.printSchema()// 展示结果---默认展示20条peopleDF.show()// 查询某一列peopleDF.select(peopleDF("name"))// 过滤出大于20岁的人peopleDF.filter("age > 20").show()
输出结果如下:
// 输出Schema信息
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- sex: string (nullable = true)// 展示结果---默认展示20条+----+----+------+| age|name| sex|
+----+----+------+| 18| 张三| man|
| 28| 李四|female|
|null| 王五| man|
+----+----+------+// 查询某一列
+----+|name|
+----+| 张三|
| 李四|
| 王五|
+----+// 过滤出大于20岁的人
+---+----+------+|age|name| sex|
+---+----+------+| 28| 李四|female|
+---+----+------+
003
DataFrame & RDD 互操作
反射的方式:
此种方式需要定义一个 object 类,在 spark 中可以直接使用 case class 来实现:
编程代码:
// 定义object
case class people(id: Long, name: String, sex: String)
val info = spark.sparkContext.textFile("/data/info.txt") // 此处一定要导入隐式转换,否则无法使用toDF()方法
import spark.implicits._
val infoDF = info.map(_.split(",")).map(line => people(line(0).toLong, line(1), line(2)))
.toDF()
infoDF.show()
输出结果:
+---+--------+------+| id| name| sex|+---+--------+------+| 1|zhangsan| man|| 2| lisi|female|| 3| wangwu| man|+---+--------+------+
编程方式:
- 将一个基本的 RDD 转换为一个 Row RDD;
- 定义 Schema 信息;
- 调用 SparkSession createDataFrame 方法创建 DataFrame;
编程代码:
// 将RDD转换为Row RDDval info = spark.sparkContext.textFile("/data/info.txt")
val infoRDD = info.map(_.split(",")).map(line => Row(line(0).toLong, line(1), line(2)))// 定义StrctTypeval structType = StructType(Array(
StructField("id", LongType, true),
StructField("name", StringType, true),
StructField("sex", StringType, true)
))// 调用方法创建DataFrameval infoDF = spark.createDataFrame(infoRDD, structType)// 输出结果infoDF.show()
输出结果:
+---+--------+------+| id| name| sex|+---+--------+------+| 1|zhangsan| man|| 2| lisi|female|| 3| wangwu| man|+---+--------+------+
反射 & 编程方式对比小结
- 如果已知数据结构,可以使用方式一反射的方式将 rdd 转换为 DataFrame,但是此种方式依赖于 case class,spark 中关于样例类的属性是有限制的,当字段过多的时候反射这种方式就无法使用。
- 在数据结构未知时,可以使用第二种方式,此种方式编程较第一种方式稍过繁琐,但使用范围更广一些。
今日抄书:
贤者弗拉巴米尔·彭的格言,
“你最热爱的歌曲,其实他们都是在骗你,他们并不是为你唱,只是希望你开心。”