Spark SQL入门

1、SQL结合spark有两条线:

Spark SQL和Hive on Spark(还在开发状态,不稳定,暂时不建议使用)。

#Hive on Spark是在Hive中的,使用Spark作为hive的执行引擎,只需要在hive中修改一个参数即可:

# set hive.execution.engine=spark

 

2、Spark SQL

a.概述:

        Spark SQL是Spark处理数据的一个模块,跟基本的Spark RDD的API不同,Spark SQL中提供的接口将会提供给Spark更多关于结构化数据和计算的信息。其本质是,Spark SQL使用这些额外的信息去执行额外的优化,这儿有几种和Spark SQL进行交互的方法,包括SQL和Dataset API,当使用相同的执行引擎时,API或其它语言对于计算的表达都是相互独立的,这种统一意味着开发人员可以轻松地在不同的API之间进行切换。

b.SQL:

        Spark SQL的一大用处就是执行SQL查询语句,Spark SQL也可以用来从Hive中读取数据,当我们使用其它编程语言来运行一个SQL语句,结果返回的是一个Dataset或者DataFrame.你可以使用命令行,JDBC或者ODBC的方式来与SQL进行交互。

c.Dataset和DataFrame

        Dataset是一个分布式数据集合。Dataset是一个在Spark 1.6版本之后才引入的新接口,它既拥有了RDD的优点(强类型、能够使用强大的lambda函数),又拥有Spark SQL的优点(用来一个经过优化的执行引擎)。你可以将一个JVM对象构造成一个Dataset,之后就可以使用一些transformations操作啦。我们可以使用scala,java来访问Dataset API,不支持python哦,当然,由于python的动态特性,很多的Dataset API是可以使用的,R语言也是一样哦。

        DataFrame是Dataset中一个有名字的列。从概念上,它等价于关系型数据库中的一张表,或者等价于R/Python中的Data Frame,但它在底层做了更好的优化。构造DataFrame的数据源很多:结构化的数据文件、hive表、外部数据库、已经存在的RDD。DataFrame 的API支持java,scal.python,R

 

3、面试题

RDD  VS  DataFrame

esgd 

《Spark SQL入门》

a.基于RDD的编程,不同语言性能是不一样的,而DataFrame是一样的,因为底层会有一个优化器先将代码进行优化。

b.对于RDD,暴露给执行引擎的信息只有数据的类型,如RDD[Student]装的是Student,而对于DataFrame,对于外部可见的信息有字段类型,字段key,字段value等。

c.RDD是一个数组,DataFrame是一个列式表。

 

4、Spark SQL愿景

a.写更少的代码 

b.读更少的数据(压缩,存储格式,列裁剪)

c.对于不同语言的应用程序让优化器自动进行优化

 

5、Spark SQL架构

《Spark SQL入门》

客户端->未解析的逻辑执行计划(Schema Catalog 将schema作用在数据上)->逻辑执行计划->优化过后的逻辑执行计划->物理执行计划->Spark引擎。

#Spark SQL 要使用hive中的表,需要将hive-site.xml加入spark的配置文件目录。

 

6、执行计划(Hive 或Spark SQL)

explain extended +查询语句

 

7、SparkSession

添加依赖:

<dependency>

    <groupId>org.spark.apache</groupId>

    <artifactId>spark-sql_2.11</artifactId>    ##2.11位scala版本

    <version>${spark.version}</version>

</dependency>

        Spark中所有功能的入口点是SparkSession类,我们可以使用SparkSession.builder()来创建一个SparkSession,具体如下(scala):

 

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

           可以在spark repo下的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 路径下找到所有例子的代码。

        在Spark 2.0之后,SparkSession内置了对于hive特性的支持,允许使用HiveQL来书写查询语句访问UDF,以及从Hive表中读取数据。使用这些特性,你不需要进行任何Hive的设置。

 

8、创建DataFrame

        通过SparkSession,应用程序可以从一个现有的RDD、Hive表、Spark数据源来创建一个DataFrame。

        以下创建DataFrame是基于JSON格式的文件:

 

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

         可以在spark repo下的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 路径下找到所有例子的代码。

 

 

 

9、无类型的Dataset操作(又称DataFrame 操作)

      上面提到的,在Spark 2.0时,在java或者scala API中,DataFrame是Dataset的行,这些操作也被称为“非类型转换”,与“类型化转换”相比,具有强类型的Scala/Java Dataset。

        这儿包括一些使用Dataset处理结构化数据的例子:

 

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()   //groupBy返回一个Dataset,count返回一个DataFrame.
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

可以在spark repo下的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 路径下找到所有例子的代码。

对于可以在数据集上执行的操作类型的完整列表,请参阅API Documentation

除了简单的列引用和表达式之外,数据集还拥有丰富的函数库,包括字符串操作、日期算术、常见的数学运算等等。完整列表查看 DataFrame Function Reference.

 

10、以编程方式运行SQL查询语句

        SparkSession中的SQL函数可以让应用程序以编程的方式运行SQL查询语句,让结果返回一个DataFrame。

 

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

可以在spark repo下的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 路径下找到所有例子的代码。

 

11、全局临时视图

        Spark SQL中的临时视图作用域仅仅在于创建该视图的会话窗口,如果窗口关闭,该视图也终止。如果你想要一个在所有会话中都生效的临时视图,并且即使应用程序终止该视图仍然存活,你可以创建一个全局临时视图。 全局临时视图与系统保存数据库global_temp相关联,我们必须使用规范的名字来定义它,比如:SELECT * FROM global_temp.view1.

 

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

可以在spark repo下的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 路径下找到所有例子的代码。

 

12、创建Dataset

        Dataset有点像RDD,但它并不是使用java或Kryo这样的序列化方式,而是使用专用的编码器将对象进行序列化,以便于在网络上进行处理和传输。虽然编码器和标准的序列化都可以将对象转成字节,但编码器产生动态的代码,它使用的格式允许Spark在不执行反序列化的情况下去执行像过滤、排序、哈希等许许多多的操作。

    

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

 

 

 

    原文作者:Mr_249
    原文地址: https://blog.csdn.net/pengzonglu7292/article/details/79772800
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞