pyspark学习--dataframe操作

参考文章:master苏:pyspark系列–dataframe基础

1、连接本地spark

import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('my_first_app_name') \
    .getOrCreate()

2.、创建dataframe

# 从pandas dataframe创建spark dataframe
colors = ['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)

color_df=spark.createDataFrame(color_df)
color_df.show()

《pyspark学习--dataframe操作》
color_df输出结果

# 查看列的类型 ,同pandas
color_df.dtypes
# [('color', 'string'), ('length', 'bigint')]

# 查看有哪些列 ,同pandas
color_df.columns
# ['color', 'length']

# 查看行数,和pandas不一样
color_df.count()

# dataframe列名重命名
# pandas
df=df.rename(columns={'a':'aa'})

# spark-方法1
# 在创建dataframe的时候重命名
data = spark.createDataFrame(data=[("Alberto", 2), ("Dakota", 2)],
                              schema=['name','length'])
data.show()
data.printSchema()

# spark-方法2
# 使用selectExpr方法
color_df2 = color_df.selectExpr('color as color2','length as length2')
color_df2.show()

# spark-方法3
# withColumnRenamed方法
color_df2 = color_df.withColumnRenamed('color','color2')\
                    .withColumnRenamed('length','length2')
color_df2.show()

# spark-方法4
# alias 方法
color_df.select(color_df.color.alias('color2')).show()

3、 选择和切片筛选

# 1.列的选择
# 选择一列的几种方式,比较麻烦,不像pandas直接用df['cols']就可以了
# 需要在filter,select等操作符中才能使用
color_df.select('length').show()
color_df.select(color_df.length).show()
color_df.select(color_df[0]).show()
color_df.select(color_df['length']).show()
color_df.filter(color_df['length']>=4).show()   # filter方法

# 2.选择几列的方法
color_df.select('length','color').show()
# 如果是pandas,似乎要简单些
df[['length','color']]

# 3.多列选择和切片
color_df.select('length','color') \
        .select(color_df['length']>4).show()

# 4.between 范围选择
color_df.filter(color_df.length.between(4,5) )\
        .select(color_df.color.alias('mid_length')).show()


# 5.联合筛选
# 这里使用一种是 color_df.length, 另一种是color_df[0]
color_df.filter(color_df.length>4)\
        .filter(color_df[0]!='white').show()

# 6.filter运行类SQL
color_df.filter("color='green'").show()

color_df.filter("color like 'b%'").show()

# 7.where方法的SQL
color_df.where("color like '%yellow%'").show()

# 8.直接使用SQL语法
# 首先dataframe注册为临时表,然后执行SQL查询
color_df.createOrReplaceTempView("color_df")
spark.sql("select count(1) from color_df").show()

4、增加删除列

# pandas删除一列
# df.drop('length').show()

# 删除一列
color_df.drop('length').show()

#增加一列
from pyspark.sql.functions import lit
color_df.withColumn('newCol', lit(0)).show()

# dataframe转json,转完是个rdd
color_df.toJSON().first()

5、排序

# pandas的排序
df.sort_values(by='b')

# spark排序
color_df.sort('color',ascending=False).show()

# 多字段排序
color_df.filter(color_df['length']>=4)\
        .sort('length', 'color', ascending=False).show()

# 混合排序
color_df.sort(color_df.length.desc(), color_df.color.asc()).show()

# orderBy也是排序,返回的Row对象列表
color_df.orderBy('length','color').take(4)

6、处理缺失值

# 1.生成测试数据
import numpy as np
import pandas as pd

df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e'])\
    .applymap(lambda x: int(x*10))
df.iloc[2,2]=np.nan

spark_df = spark.createDataFrame(df)
spark_df.show()

# 2.删除有缺失值的行
df2 = spark_df.dropna()
df2.show()

# 3.或者
spark_df=spark_df.na.drop()

另外,如果col1为空则用col2填补,否则返回col1。

类似 pandas 的 where 或者 combine_first 方法

# pandas 
#where即if-else函数
np.where(isnull(a),b,a)

# combine_first方法
#如果a中值为空,就用b中的值填补
a[:-2].combine_first(b[2:])

#combine_first函数即对数据打补丁,用df2的数据填充df1中的缺失值
df1.combine_first(df2)


# pyspark
from pyspark.sql.functions import nanvl
df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).show()

7、分组统计

# 分组计算1
color_df.groupBy('length').count().show()

# 分组计算2:应用多函数
import pyspark.sql.functions as func
color_df.groupBy("color").agg(func.max("length"), func.sum("length")).show()

8、join操作

# 1.生成测试数据
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), 
             (5, "Kevin", 26), 
             (6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), 
             (9, "Larry", 29), (10, "Kimberly", 29),
             (11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
employees=spark.createDataFrame(employees, schema=["emp_id","name","age"])
employees.show()

salary=[(1,1000),(2,2000),(3,3000),(4,4000)]
salary=spark.createDataFrame(salary, schema=["emp_id","salary"])
salary.show()

department=[(1,1000),(2,2000),(3,3000),(4,4000)]
department=spark.createDataFrame(department, schema=["emp_id","departement"])
department.show()

# 2.连接
# join默认是内连接,最终结果会存在重复列名
# 如果是pandas,重复列会用_x,_y等后缀标识出来,但spark不会
# join会在最后的dataframe中存在重复列

final_data = employees.join(salary, employees.emp_id == salary.emp_id, 
                            how='left')\
    .join(department, employees.emp_id==department.emp_id)
final_data.show()

# 3.如果两边的关联字段名相同,也可以省去很多麻烦
final_data = employees.join(salary, on='emp_id', how='left')\
    .join(department, on='emp_id', how='left')
final_data.show()

在join操作中,我们得到一个有缺失值的dataframe,接下来将对这个带有缺失值的dataframe进行操作

# 1.删除有缺失值的行
clean_data=final_data.na.drop()
clean_data.show()

# 2.用均值替换缺失值
import math
from pyspark.sql import functions as func  # 导入spark内置函数
# 计算缺失值,collect()函数将数据返回到driver端,为Row对象,[0]可以获取Row的值
mean_salary = final_data.select(func.mean('salary')).collect()[0][0]
clean_data = final_data.na.fill({'salary':mean_salary})

# 3.如果一行至少2个缺失值才删除该行
final_data.na.drop(thresh=2).show()

# 4.填充缺失值
# 对所有列用同一个值填充缺失值
df1.na.fill('unknown').show()

# 5.不同的列用不同的值填充
df1.na.fill({'LastName':'--', 'Dob':'unknown'}).show()

9、空值判断

有两种空值判断,一种是数值类型是nan,另一种是普通的None

# 类似 pandas.isnull

from pyspark.sql.functions import isnull, isnan

# 1.None 的空值判断
df = spark.createDataFrame([(1, None), (None, 2)], ("a", "b"))
df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).show()

# 2.nan的空值判断
df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).show()

10、离群点

# 需要提醒的是,列的计算都是放在select里面的

# 1.先计算均值
mean_salary = final_data.select(func.mean('salary')).collect()[0][0]
# 2.再计算方差
devs=final_data.select(((final_data.salary-mean_salary)**2).alias('deviation'))
# 3.再计算标准差
stddev = math.floor(math.sqrt(devs.groupBy().avg('deviation').first()[0]))

# 4.用均值的两倍标准差替代离群值
no_outlier = final_data.select(
    final_data.emp_id, final_data.name, final_data.age, final_data.salary,
    func.when(final_data.salary.between(mean_salary-2*stddev, mean_salary+2*stddev), final_data.salary)
        .otherwise(mean_salary)
        .alias("updated_salary")
    )
no_outlier.show()


# func中有现成的常用统计函数,更加方便
# 1.计算均值
mean_salary = final_data.select(func.mean('salary')).collect()[0][0]
# 2.计算标准差
final_data.select(func.stddev('salary')).collect()[0][0]
# 离群值替代就和上面的一致了

11、去重

# 重复值的处理,和pandas很像啊
authors = [['Thomas','Hardy','June 2,1840'],
            ['Thomas','Hardy','June 2,1840'],
            ['Thomas','H',None],
            ['Jane','Austen','16 December 1775'],
            ['Emily',None,None]]

df1 = spark.createDataFrame(authors,schema=["FirstName","LastName","Dob"])
df1.show()

# 删除重复值行
df1.dropDuplicates().show()

# 只要某一列有重复值,则去重
df1.dropDuplicates(subset=['FirstName']).show()


# pandas的方法
df=pd.DataFrame(authors, columns=["FirstName","LastName","Dob"])
df.drop_duplicates(subset=['FirstName'])

12、 生成新列

# 数据转换,可以理解成列与列的运算
# 注意自定义函数的调用方式

# 0.创建udf自定义函数,对于简单的lambda函数不需要指定返回值类型
from pyspark.sql.functions import udf
concat_func = udf(lambda name,age:name+'_'+str(age))

# 1.应用自定义函数
concat_df = final_data.withColumn("name_age", 
                                   concat_func(final_data.name, final_data.age))
concat_df.show()

# 2.通过列生成另一列
data_new=concat_df.withColumn("age_incremented",concat_df.age+1)
data_new.show()

# 3.某些列是自带一些常用的方法的
df1.withColumn('Initial', df1.LastName.substr(1,1)).show()

# 4.顺便增加一新列
from pyspark.sql.functions import lit
df1.withColumn('newCol', lit(0)).show()

13、行的最大最小值

# 测试数据
df=[(1,1000),(2,2000),(3,3000),(4,4000)]
df=spark.createDataFrame(df, schema=["emp_id","salary"])
df.show()

# 求行的最大最小值
from pyspark.sql.functions import greatest, least
df.select(greatest('emp_id','salary').alias('greatest'),
          least('emp_id','salary').alias('least')
          ).show()

14、when操作

from pyspark.sql.functions import when

# 1.case when age=2 then 3 else 4
df.select(when(df['age'] == 2, 3).otherwise(4).alias("age"))show()

# 2.case when age=2 when age=age+1 
df.select(when(df.age == 2, df.age + 1).alias("age")).show()

    原文作者:刘叔
    原文地址: https://zhuanlan.zhihu.com/p/57792897
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞