DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。
people.json文件的内容如下:
{“name”:”Michael”}
{“name”:”Andy”, “age”:30}
{“name”:”Justin”, “age”:19}
people.txt文件的内容如下:
Michael, 29
Andy, 30
Justin, 19
创建这两个文件,进入pyspark。
1.从people.json文件中读取数据并生成DataFrame并显示数据。
>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark.read.json('file:///usr/local/spark/python_code/people.json')
>>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
常见的DataFrame操作
选择 ; 过滤 ; 分组 ; 排序 ; 别名
df.printSchema():打印图表信息
df.select(df.name,df.age + 1).show():选择多列
df.filter(df.age > 20 ).show():条件过滤
df.groupBy("age").count().show():分组聚合
df.sort(df.age.desc()).show():排序
df.sort(df.age.desc(), df.name.asc()).show():多列排序
df.select(df.name.alias("username"),df.age).show():对列进行重命名
// 打印模式信息
>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
// 选择多列
>>> df.select(df.name,df.age + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
// 条件过滤
>>> df.filter(df.age > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
// 分组聚合
>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
// 排序
>>> df.sort(df.age.desc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//多列排序
>>> df.sort(df.age.desc(), df.name.asc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//对列进行重命名
>>> df.select(df.name.alias("username"),df.age).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
| Andy| 30|
| Justin| 19|
+--------+----+
2. 从people.txt文件中读取数据并生成DataFrame并显示数据。
Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是,利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。
(一) 利用反射机制推断RDD模式
在利用反射机制推断RDD模式时,我们会用到toDF()方法
>>> from pyspark.sql.types import Row
>>> def f(x):
... rel = {}
... rel['name'] = x[0]
... rel['age'] = x[1]
... return rel
...
>>> peopleDF = sc.textFile("file:///usr/local/spark/python_code/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
>>> peopleDF.createOrReplaceTempView("people") //必须注册为临时表才能供下面的查询使用
>>> personsDF = spark.sql("select * from people")
>>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
Name: 19,Age:Justin
Name: 29,Age:Michael
Name: 30,Age:Andy
(二)使用编程方式定义RDD模式
使用createDataFrame(rdd, schema)编程方式定义RDD模式。
>>> from pyspark.sql.types import Row
>>> from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
//生成 RDD
>>> peopleRDD = sc.textFile("file:///usr/local/spark/python_code/people.txt")
//定义一个模式字符串
>>> schemaString = "name age"
//根据模式字符串生成模式
>>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
>>> schema = StructType(fields)
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
>>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
>>> peopleDF = spark.createDataFrame(rowRDD, schema)//必须注册为临时表才能供下面查询使用
>>> results = spark.sql("SELECT * FROM people")
>>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)
name: Michael,age: 29
name: Andy,age: 30
name: Justin,age: 19