Spark SQL(三)DataSource

使用Spark SQL的DataFrame接口,用户可以方便快速的从多种不同数据源(json/parquet/rdbms等),经过混合处理(比如json join parquet),再将处理结果以特定的格式(比如json/parquet等)写回到指定的系统(比如HDFS/S3)上去。

从Spark SQL 1.2引入了外部数据源的概念,有外部肯定有内部,内部数据源指的就是Spark SQL内置支持的数据源,包括json, parquet, jdbc, orc, libsvm, csv, text等

  • 内置数据源

比如读取json格式的数据:

val peopleDF = spark.read.format("json").load("E:/ATempFile/people.json") //这是标准写法

parquet格式是Spark SQL默认处理数据格式,所以可以简写成如下形式:

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") //只读取两个字段,并写入新的parquet文件

关于默认处理parquet格式的数据,底层代码是这么写的:

val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default")
    .doc("The default data source to use in input/output.")
    .stringConf
    .createWithDefault("parquet")

此外,如果你不想把文件转换成DataFrame进行操作,可以直接使用SQL查询文件,像这样:

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

下边以parquet和jdbc数据源举例介绍如何操作:

  1. 处理parquet文件
    可以像上面说的一样:
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet") //只读取两个字段,并 写入新的parquet文件

也可以在spark-sql –master local[2] 中:

CREATE TEMPORARY VIEW parquetTable1
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
)
SELECT * FROM parquetTable

注意上边USING的用法

  1. 处理MYSQL数据
spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/hive").option("dbtable", "hive.TBLS").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")

val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306", "hive.TBLS", connectionProperties)

也可以使用spark-sql命令行:

CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://localhost:3306",
  dbtable "hive.TBLS",
  user 'root',
  password 'root',
  driver 'com.mysql.jdbc.Driver'
)
  • 外部数据源

Spark SQL 1.2引入了外部数据源,开发人员并不需要把外部数据源代码合并到spark中,而是可以通过–jars指定相关jar包即可,这样读取数据可以更加多种多样,使用也更加方便,具体有哪些外部数据源,可以打开这个网站:https://spark-packages.org,点击Data Source后可以看到支持的各种数据源。
比如操作avro文件,开发环境中只需引入(假设使用maven管理jar吧,sbt的同理):

<dependencies>
  <!-- list of dependencies -->
  <dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>4.0.0</version>
  </dependency>
</dependencies>

如果是在生产环境,可以使用–packages

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0
  • Save Modes

不管是什么类型的数据源,涉及到写操作的时候都有一个Save Modes,意思是要写入的文件已存在时该如何处理,下图截自官网,一目了然:

《Spark SQL(三)DataSource》 save modes

  • Saving to Persistent Tables

DataFrame 可以通过saveAsTable操作将数据作为持久表保存到Hive的元数据中。使用这个功能不一定需要Hive的部署。Spark将创建一个默认的本地的Hive的元数据保存(通过用Derby(一种数据库))。不同于createOrReplaceTempView,saveAsTable将实现DataFrame内容和创建一个指向这个Hive元数据的指针。持久表在你的spark程序重启后仍然存在,只要你保存你和元数据存储的连接。可以通过SparkSession调用table这个方法,来将DataFrame保存为一个持久表。

通过默认的saveAsTable 将会创建一个“管理表”,意思是数据的位置将被元数据控制。在数据表被删除的时候管理表也会被删除。

    原文作者:Sx_Ren
    原文地址: https://www.jianshu.com/p/a717c988129d
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞