7.Spark学习(Python版本):Spark SQL中的DataFrame的操作

DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。

people.json文件的内容如下:

{“name”:”Michael”}
{“name”:”Andy”, “age”:30}
{“name”:”Justin”, “age”:19}

people.txt文件的内容如下:

Michael, 29
Andy, 30
Justin, 19

创建这两个文件,进入pyspark。

1.从people.json文件中读取数据并生成DataFrame并显示数据。
>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark.read.json('file:///usr/local/spark/python_code/people.json')
>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
常见的DataFrame操作

选择 ; 过滤 ; 分组 ; 排序 ; 别名

  • df.printSchema():打印图表信息
  • df.select(df.name,df.age + 1).show():选择多列
  • df.filter(df.age > 20 ).show():条件过滤
  • df.groupBy("age").count().show():分组聚合
  • df.sort(df.age.desc()).show():排序
  • df.sort(df.age.desc(), df.name.asc()).show():多列排序
  • df.select(df.name.alias("username"),df.age).show():对列进行重命名
// 打印模式信息
>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 
// 选择多列
>>> df.select(df.name,df.age + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+
 
// 条件过滤
>>> df.filter(df.age > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
 
// 分组聚合
>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+
 
// 排序
>>> df.sort(df.age.desc()).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+
 
//多列排序
>>> df.sort(df.age.desc(), df.name.asc()).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+
 
//对列进行重命名
>>> df.select(df.name.alias("username"),df.age).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
|    Andy|  30|
|  Justin|  19|
+--------+----+
2. 从people.txt文件中读取数据并生成DataFrame并显示数据。

Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是,利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。

(一) 利用反射机制推断RDD模式

在利用反射机制推断RDD模式时,我们会用到toDF()方法

>>> from pyspark.sql.types import Row
>>> def f(x):
...     rel = {}
...     rel['name'] = x[0]
...     rel['age'] = x[1]
...     return rel
... 
>>> peopleDF = sc.textFile("file:///usr/local/spark/python_code/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
>>> peopleDF.createOrReplaceTempView("people")  //必须注册为临时表才能供下面的查询使用
 
>>> personsDF = spark.sql("select * from people")
>>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
 
Name: 19,Age:Justin
Name: 29,Age:Michael
Name: 30,Age:Andy
(二)使用编程方式定义RDD模式

使用createDataFrame(rdd, schema)编程方式定义RDD模式。

>>>  from pyspark.sql.types import Row
>>>  from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
 
//生成 RDD
>>> peopleRDD = sc.textFile("file:///usr/local/spark/python_code/people.txt")
 
//定义一个模式字符串
>>> schemaString = "name age"
 
//根据模式字符串生成模式
>>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
>>> schema = StructType(fields)
//从上面信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
 
>>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
 
>>> peopleDF = spark.createDataFrame(rowRDD, schema)//必须注册为临时表才能供下面查询使用
 
>>> results = spark.sql("SELECT * FROM people")
>>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)
 
name: Michael,age: 29
name: Andy,age: 30
name: Justin,age: 19
    原文作者:马淑
    原文地址: https://www.jianshu.com/p/3885e7d072dc
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞