Spark SQL

1 概述

    Spark SQL 是Spark的一个组件,用于结构化数据的计算。Spark SQL 提供一个称为DataFrames的编程对象,DataFrams 可以充当分布式sql查询引擎。

2 DataFrames

 DataFrame 是一个分布式的数据集合,该数据结合以命名列的方式进行整合,DataFrame 可以理解为关系数据库中的一张表。也可以理解为R/Python中的一个data frame .DataFrames 可以通过多种数据构造,例如 结构化的数据文件。hive 中的表,外部数据库,Spark 计算过程中生成的RDD等.

2.1 入口:SQLContext(Starting point: SQLContext)

Spark SQL 程序的主入口是SQLContext 类或它的子类。创建一个基本的SQLContext 只需要SparkContext Scala 代码如下:

val sc:SparkContext

val sqlConext=new org.apache.spark.sql.SQLConetext(sc)

除了基本的SQLContext 也可以创建HiveContext.SQLContext和HiveContext 区别联系为

1.SQLContext现在只支持SQL语法解析器(SQL-92语法)

2.Hive Context现在只支持SQL语法解析器和HiveSQL语法解析器。默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行HiveSQL不支持的语法

3. 使用HiveContext可以使用Hive 的UDF ,读写Hive表数据等Hive操作。SQLContext不可以对hive操作

HiveContext 包装了HIVE的依赖包。把HiveContext单独拿出来,可以在部署基本的Spark的时候就不需要Hive的依赖包,需要使用HiveContext 时再把Hive的各种依赖包加进来

2.2创建DataFrames

使用SQLContext ,Spark 应用程序。可以通过RDD,Hive 表,JSON 格式数据等数据源创建DataFrames .代码如下 Scala编写

val sc:SparkConetxt

val sqlContext=new org.apache.spark.SQLContext(sc)

val df=sqlContext.read.json(“file:///path”)

df.show()

2.3DataFrame操作  Scala  代码

val sc:SparkContext

val sqlContext=new org.apache.spark.sql.SQLContext(sc)

val df=sqlContext.read.json(“path”)

df.show()

df.printSchema()

df.select(“name”).show()

df.select(df(“name”),df(“age”)+1).show()

2.4 运行SQL查询程序

Spark Application可以使用SQLContext的sql()方法执行SQL查询操作,sql()方法返回的查询结果为DataFrame格式。scala 代码如下:

val  sqlContext = …// An existing SQLContext

val   df =sqlContext.sql(“SELECT * FROM table”)

2.5 DataFrames与RDDs的相互转换

Spark SQL支持两种RDDs转换为DataFrames的方式:

1.使用反射获取RDD内的Schema

        当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。

2.通过编程接口指定Schema

         通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。

这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema

2.5.1 使用反射获取Schema

Spark  SQL支持将JavaBean的RDD自动转换成DataFrame。通过反射获取Bean的基本信息,依据Bean的信息定义Schema。当前Spark

SQL不支持嵌套的JavaBeans和复杂数据类型(如:List、Array)。创建一个实现Serializable接口包含所有属性getters和setters的类来创建一个JavaBean。通过调用createDataFrame并提供JavaBean的Class

object,指定一个Schema给一个RDD。示例如下:

《Spark SQL》 person 类与接口Serializable 建立连接

《Spark SQL》 RDD与Spark SQL转换

3 数据源

Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。修改配置项spark.sql.sources.default,可修改默认数据源格式。读取Parquet文件示例如下:

val df=sqlContext.read.load(“path”)

df.select(“name”,favorite_color).write.save(“name.parquet”)

3.1.1 手动指定选项

当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称(json,parquet,jdbc)。通过指定的数据源格式名,可以对DataFrames进行类型转换操作。示例如下:

val    df = sqlContext.read.format(“json”).load(“examples/src/main/resources/people.json”)

df.select(“name”,”age”).write.format(“parquet”).save(“namesAndAges.parquet”)

3.1.2 存储模式

可以采用SaveMode执行存储操作,SaveMode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。SaveMode详细介绍如下表:

《Spark SQL》 如图

3.1.3 持久化到表

当使用HiveContext时,可以通过saveAsTable方法将DataFrames存储到表中。与registerTempTable方法不同的是,saveAsTable将DataFrame中的内容持久化到表中,并在HiveMetastore中存储元数据。存储一个DataFrame,可以使用SQLContext的table方法。table先创建一个表,方法参数为要创建的表的表名,然后将DataFrame持久化到这个表中。

默认的saveAsTable方法将创建一个“managed table”,表示数据的位置可以通过metastore获得。当存储数据的表被删除时,managed table也将自动删除。

3.2 Parquet文件

Parquet是一种支持多种数据处理系统的柱状的数据格式,Parquet文件中保留了原始数据的模式。Spark SQL提供了Parquet文件的读写功能。

3.2.1 读取Parquet文件

读取Parquet文件示例如下:

《Spark SQL》 读取Parquet文件

3.2.2 解析分区信息

对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。Parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:

《Spark SQL》 目录结构

3.3 JSON数据集

Spark SQL能自动解析JSON数据集的Schema,读取JSON数据集为DataFrame格式。读取JSON数据集方法为SQLContext.read().json()。该方法将String格式的RDD或JSON文件转换为DataFrame。

需要注意的是,这里的JSON文件不是常规的JSON格式。JSON文件每一行必须包含一个独立的、自满足有效的JSON对象。如果用多行描述一个JSON对象,会导致读取出错。读取JSON数据集示例如下:

《Spark SQL》 json

3.4 Hive表
Spark SQL 支持对hive的读写操作,需要注意的是HIVE所依赖的包,没包含在Spark assembly 包中,增加Hive时候。需要在需要在Spark的build中添加 -Phive 和

-Phivethriftserver配置。这两个配置将build一个新的assembly包,这个assembly包含了Hive的依赖包。注意,必须上这个心的assembly包到所有的worker节点上。因为worker节点在访问Hive中数据时,会调用Hive的serialization and deserialization libraries(SerDes),此时将用到Hive的依赖包

Hive的配置文件为conf/目录下的hive-site.xml文件。在YARN上执行查询命令之前,lib_managed/jars目录下的datanucleus包和conf/目录下的hive-site.xml必须可以被driverhe和所有的executors所访问。确保被访问,最方便的方式就是在spark-submit命令中通过–jars选项和–file选项指定。

操作Hive时,必须创建一个HiveContext对象,HiveContext继承了SQLContext,并增加了对MetaStore和HiveQL的支持。除了sql方法,HiveContext还提供了一个hql方法,hql方法可以执行HiveQL语法的查询语句。示例如下:

《Spark SQL》 hive

    原文作者:起个什么呢称呢
    原文地址: https://www.jianshu.com/p/82079d6b8c9c
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞