概述
Spark SQL是用于结构化数据处理的Spark模块。它提供了一个称为DataFrames的编程抽象,也可以作为分布式SQL查询引擎。
Spark SQL也可用于从现有的Hive安装中读取数据。有关如何配置此功能的更多信息,请参阅Hive Tables部分。
DataFrames
DataFrame是组织成命名列的数据的分布式集合。它在概念上等同于关系数据库中的表或R / Python中的数据框架,但是在更加优化的范围内。DataFrames可以从各种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。
The DataFrame API is available in Scala, Java, Python, and R.
此页面上的所有示例都使用Spark分发中包含的示例数据,并且可以在spark-shell,pyspark shell或sparkR shell中运行。
Starting Point: SQLContext
Spark SQL中所有功能的入口点是SQLContext类或其后代。要创建一个基本的SQLContext,您只需要一个SparkContext。
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
除了基本的SQLContext之外,您还可以创建一个HiveContext,它提供了由基本SQLContext提供的功能的超集。其他功能包括使用更完整的HiveQL解析器编写查询,访问Hive UDF以及从Hive表读取数据的功能。要使用HiveContext,您不需要具有现有的Hive设置,SQLContext可用的所有数据源仍然可用。HiveContext只是单独打包,以避免在默认的Spark构建中包含Hive的所有依赖项。如果这些依赖关系对您的应用程序不是问题,那么对于1.3版本的Spark,建议使用HiveContext。未来的发行版将重点将SQLContext与HiveContext进行特征匹配。
用于解析查询的SQL的具体变体也可以使用spark.sql.dialect选项进行选择。可以使用SQLContext上的setConf方法或在SQL中使用SET key = value命令来更改此参数。对于SQLContext,唯一可用的方言是“sql”,它使用Spark SQL提供的简单SQL解析器。在HiveContext中,默认为“hiveql”,虽然“sql”也可用。由于HiveQL解析器更完整,因此对于大多数用例来说都是如此。
创建DataFrames
使用SQLContext,应用程序可以从现有的RDD,Hive表或数据源创建DataFrames。
这里我们包括使用DataFrames的结构化数据处理的一些基本示例:
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show();
DataFrame操作
DataFrames为Scala,Java和Python中的结构化数据操作提供了一个域专用语言。这里我们包括使用DataFrames的结构化数据处理的一些基本示例:
JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1
有关可以在DataFrame上执行的操作类型的完整列表,请参阅API Documentation.
除了简单的列引用和表达式,DataFrames还具有丰富的函数库,包括字符串操作,日期算术,常用的数学运算等。完整列表可在DataFrame Function Reference.
以编程方式运行SQL查询
SQLContext上的sql函数使应用程序以编程方式运行SQL查询,并将结果作为DataFrame返回。
SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext.sql("SELECT * FROM table")
Data Sources
Spark SQL支持通过DataFrame界面对各种数据源进行操作。DataFrame可以作为普通RDD操作,也可以注册为临时表。将DataFrame注册为表可以让您对其数据运行SQL查询。本节介绍使用Spark数据源加载和保存数据的一般方法,然后介绍可用于内置数据源的特定选项。
通用加载/保存功能
在最简单的形式中,默认数据源(parquet
,除非由spark.sql.sources.default另有配置)将用于所有操作。
DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet");
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
手动指定选项
您还可以手动指定将要使用的数据源以及要传递给数据源的任何其他选项。数据源由其完全限定名称(即org.apache.spark.sql.parquet)指定,但对于内置源,您也可以使用其简称(json,parquet,jdbc)。可以使用此语法将任何类型的DataFrames转换为其他类型。
DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json");
df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
保存模式
保存操作可以选择使用SaveMode,它指定如何处理现有数据(如果存在)。重要的是要意识到这些保存模式不会使用任何锁定,也不是原子的。另外,当执行覆盖时,数据将在写出新数据之前被删除。
Scala/Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists (default) | "error" (default) | 将DataFrame保存到数据源时,如果数据已经存在,则会抛出异常。 |
SaveMode.Append | "append" | 将DataFrame保存到数据源时,如果数据/表已存在,则DataFrame的内容将被附加到现有数据。 |
SaveMode.Overwrite | "overwrite" | 覆盖模式意味着当将DataFrame保存到数据源时,如果数据/表已经存在,则现有数据将被DataFrame的内容覆盖。 |
SaveMode.Ignore | "ignore" | 忽略模式意味着将DataFrame保存到数据源时,如果数据已经存在,则保存操作不会保存DataFrame的内容,并且不更改现有数据。这与SQL中的CREATE TABLE IF NOT EXISTS类似。 |
保存到持久表
当使用HiveContext时,DataFrames也可以使用saveAsTable命令保存为持久表。与registerTempTable命令不同,saveAsTable将实现数据帧的内容,并创建一个指向HiveMetastore中数据的指针。只要您保持与同一个转移点的连接,即使在您的Spark程序重新启动后,持久性表仍然存在。可以通过调用具有表名称的SQLContext上的表方法来创建持久表的DataFrame。
默认情况下,saveAsTable将创建一个“托管表”,这意味着数据的位置将由转移控制。托管表还会在删除表时自动删除其数据。
Parquet Files【列式存储文件】
Parquet是许多其他数据处理系统支持的柱状格式。Spark SQL支持读取和写入Parquet文件,可自动保留原始数据的模式。
以编程方式加载数据
使用上面的例子中的数据:
// sqlContext from the previous example is used in this example. DataFrame schemaPeople = ... // The DataFrame from the previous example. // DataFrames can be saved as Parquet files, maintaining the schema information. schemaPeople.write().parquet("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a DataFrame. DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); // Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile"); DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() { public String call(Row row) { return "Name: " + row.getString(0); } }).collect();
分区发现
表分区是一种常见的优化方法,用于像Hive这样的系统。在分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中。Parquet数据源现在能够自动发现和推断分区信息。例如,我们可以使用以下目录结构将所有以前使用的人口数据存储到分区表中,其中包含两个额外的列:性别和国家/地区作为分区列:
path └── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
通过将path / to / table传递给SQLContext.read.parquet或SQLContext.read.load,Spark SQL将自动从路径中提取分区信息。现在返回的DataFrame的模式变为:
root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) |-- country: string (nullable = true)
请注意,会自动推断分区列的数据类型。目前,支持数字数据类型和字符串类型。有些用户可能不想自动推断分区列的数据类型。对于这些用例,自动类型推断可以由spark.sql.sources.partitionColumnTypeInference.enabled配置,默认值为true。当禁用类型推断时,字符串类型将用于分区列。
模式合并
像ProtocolBuffer,Avro和Thrift一样,Parquet也支持模式演进。用户可以从简单的架构开始,并根据需要逐渐向架构添加更多的列。这样,用户可能会使用多个Parquet文件,而不同但相互兼容的模式。Parquet数据源现在能够自动检测这种情况并合并所有这些文件的模式。
由于模式合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,所以默认情况下从1.5.0开始。你可以启用它
1在阅读Parquet文件时,将数据源选项mergeSchema设置为true(如下面的示例所示)或
2将全局SQL选项spark.sql.parquet.mergeSchema设置为true。
Hive Tables
Spark SQL还支持读取和写入存储在Apache Hive中的数据。但是,由于Hive具有大量的依赖关系,因此它不包含在默认的Spark程序集中。通过向Spark的构建添加-Phive和-Phive-thriftserver标志来启用Hive支持。此命令构建一个包含Hive的新的程序集jar。请注意,此Hive组装jar也必须存在于所有工作节点上,因为它们将需要访问Hive序列化和反序列化库(SerDes),以访问Hive中存储的数据。
配置Hive是通过将hive-site.xml文件放在Spark的conf /目录中完成的。请注意,在YARN群集(yarn-cluster 模式)上运行查询时,lib_managed / jars目录下的datanucleus jar和conf /目录下的hive-site.xml需要在驱动程序和YARN启动的所有执行程序中可用簇。执行此操作的便捷方法是通过spark-submit命令的–jars选项和–file选项来添加它们。
当使用Hive时,必须构建一个继承自SQLContext的HiveContext,并添加了在MetaStore中查找表并使用HiveQL编写查询的支持。除了sql方法之外,HiveContext还提供了一个hql方法,它允许在HiveQL中表达查询。
// sc is an existing JavaSparkContext. HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc); sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();
与不同版本的Hive Metastore进行交互
Spark SQL的Hive支持最重要的部分之一是与Hive metastore进行交互,这使得Spark SQL能够访问Hive表的元数据。从Spark 1.4.0开始,可单独构建Spark SQL的二进制,通过下面描述的配置来查询不同版本的Hive。请注意,独立于用于与转移对话的Hive版本,内部Spark SQL将针对Hive 1.2.1进行编译,并使用这些类进行内部执行(serdes,UDF,UDAF等)。
以下选项可用于配置用于检索元数据的Hive版本:
Property Name | Default | Meaning |
---|---|---|
spark.sql.hive.metastore.version | 1.2.1 | Version of the Hive metastore. Available options are 0.12.0 through 1.2.1 . |
spark.sql.hive.metastore.jars | builtin | Location of the jars that should be used to instantiate the HiveMetastoreClient. This property can be one of three options:
Use Hive 1.2.1, which is bundled with the Spark assembly jar when
Use Hive jars of specified version downloaded from Maven repositories. This configuration is not generally recommended for production deployments. |
spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc, | A comma separated list of class prefixes that should be loaded using the classloader that is shared between Spark SQL and a specific version of Hive. An example of classes that should be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need to be shared are those that interact with classes that are already shared. For example, custom appenders that are used by log4j. |
spark.sql.hive.metastore.barrierPrefixes | (empty) | A comma separated list of class prefixes that should explicitly be reloaded for each version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a prefix that typically would be shared (i.e. |