CSV格式的文件也称为逗号分隔值(Comma-Separated Values,CSV,有时也称为字符分隔值,因为分隔字符也可以不是逗号。在本文中的CSV格式的数据就不是简单的逗号分割的),其文件以纯文本形式存表格数据(数字和文本)。CSV文件由任意数目的记录组成,记录间以某种换行符分隔;每条记录由字段组成,字段间的分隔符是其它字符或字符串,最常见的是逗号或制表符。通常,所有记录都有完全相同的字段序列。
本篇文章将介绍如何使用Spark 1.3+的外部数据源接口来自定义CSV输入格式的文件解析器。这个外部数据源接口是由databricks公司开发并开源的(地址:https://github.com/databricks/spark-csv),通过这个类库我们可以在Spark SQL中解析并查询CSV中的数据。因为用到了Spark的外部数据源接口,所以我们需要在Spark 1.3+上面使用。在使用之前,我们需要引入以下的依赖:
2 | < groupId >com.databricks</ groupId > |
3 | < artifactId >spark-csv_2.10</ artifactId > |
4 | < version >1.0.3</ version > |
目前spark-csv_2.10的最新版就是1.0.3。如果我们想在Spark shell里面使用,我们可以在--jars
选项里面加入这个依赖,如下:
1 | [iteblog@spark $] bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 |
和《Spark SQL整合PostgreSQL》文章中用到的load函数类似,在使用CSV类库的时候,我们需要在options
中传入以下几个选项:
1、path
:看名字就知道,这个就是我们需要解析的CSV文件的路径,路径支持通配符;
2、header
:默认值是false。我们知道,CSV文件第一行一般是解释各个列的含义的名称,如果我们不需要加载这一行,我们可以将这个选项设置为true;
3、delimiter
:默认情况下,CSV是使用英文逗号分隔的,如果不是这个分隔,我们就可以设置这个选项。
4、quote
:默认情况下的引号是'”‘,我们可以通过设置这个选项来支持别的引号。
5、mode
:解析的模式。默认值是PERMISSIVE
,支持的选项有
(1)、PERMISSIVE
:尝试解析所有的行,nulls are inserted for missing tokens and extra tokens are ignored.
(2)、DROPMALFORMED
:drops lines which have fewer or more tokens than expected
(3)、FAILFAST
: aborts with a RuntimeException if encounters any malformed line
如何使用
1、在Spark SQL中使用
我们可以通过注册临时表,然后使用纯SQL方式去查询CSV文件:
2 | USING com.databricks.spark.csv |
3 | OPTIONS (path "cars.csv" , header "true" ) |
我们还可以在DDL中指定列的名字和类型,如下:
1 | CREATE TABLE cars (yearMade double , carMake string, carModel string, comments string, blank string) |
2 | USING com.databricks.spark.csv |
3 | OPTIONS (path "cars.csv" , header "true" ) |
推荐的方式是通过调用SQLContext
的load/save
函数来加载CSV数据:
1 | import org.apache.spark.sql.SQLContext |
3 | val sqlContext = new SQLContext(sc) |
4 | val df = sqlContext.load( "com.databricks.spark.csv" , Map( "path" -> "cars.csv" , "header" -> "true" )) |
5 | df.select( "year" , "model" ).save( "newcars.csv" , "com.databricks.spark.csv" ) |
当然,我们还可以使用com.databricks.spark.csv._
的隐式转换:
1 | import org.apache.spark.sql.SQLContext |
2 | import com.databricks.spark.csv. _ |
4 | val sqlContext = new SQLContext(sc) |
6 | val cars = sqlContext.csvFile( "cars.csv" ) |
7 | cars.select( "year" , "model" ).saveAsCsvFile( "newcars.tsv" ) |
3、在Java中使用
和在Scala中使用类似,我们也推荐调用SQLContext
类中 load/save
函数
07 | * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 |
08 | * 过往记忆博客微信公共帐号:iteblog_hadoop |
11 | import org.apache.spark.sql.SQLContext |
13 | SQLContext sqlContext = new SQLContext(sc); |
15 | HashMap<String, String> options = new HashMap<String, String>(); |
16 | options.put( "header" , "true" ); |
17 | options.put( "path" , "cars.csv" ); |
19 | DataFrame df = sqlContext.load( "com.databricks.spark.csv" , options); |
20 | df.select( "year" , "model" ).save( "newcars.csv" , "com.databricks.spark.csv" ); |
在Java或者是Scala中,我们可以通过CsvParser里面的函数来读取CSV文件:
1 | import com.databricks.spark.csv.CsvParser; |
2 | SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); |
4 | DataFrame cars = ( new CsvParser()).withUseHeader( true ).csvFile(sqlContext, "cars.csv" ); |
在Python中,我们也可以使用SQLContext
类中 load/save
函数来读取和保存CSV文件:
1 | from pyspark.sql import SQLContext |
2 | sqlContext = SQLContext(sc) |
4 | df = sqlContext.load(source = "com.databricks.spark.csv" , header = "true" , path = "cars.csv" ) |
5 | df.select( "year" , "model" ).save( "newcars.csv" , "com.databricks.spark.csv" ) |