PySpark使用小结(二)

工作中用PySpark更多的是做数据处理的工作,PySpark提供了很多对Spark DataFrame(RDD)操作的函数,有点类似Pandas,但这种函数的缺点是可读性比较差,尤其是代码达到几百行的时候(捂脸)。所以推荐尽量使用SQL模块,让代码具有很高的可读性。如果很难用SQL进行处理的时候,再考虑使用函数。如果存在大量的SQL嵌套,建议像C语言一样使用多层缩进让代码逻辑清晰可见。

  • 配置spark context
sc = SparkSession.builder \
.appName("Spark_CLOB_Split") \
.config("hive.metastore.sasl.enabled", "true") \
.enableHiveSupport() \
.getOrCreate()
  • 清空已有表的数据:试了SQL最常用的delete * from 和delete from都不管用,竟然支持的是”truncate table <表名>”这种次常用的语法
  • 查询数据:用标准的sql语句即可,注意不支持having语法,另外表连接只支持等值连接
sc.sql("""select some columns, sum(any solumn) as col_name from your_table1 a
left join your_table2 b
on a.key = b.key
where a.col_name > 0
group by some columns
  • 插入表:假设有一个名为df的spark frame
df.registerTempTable("tmp")
spark.sql("insert into your_table select * from tmp")
  • 新生成一列常量:需要使用lit函数
from pyspark.sql.functions import lit
df.withColumn('your_col_name' ,lit(your_const_var))
  • 新生成一列:利用自定义函数对某一列进行运算,生成新的一列
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType
def func(s):
    return s[:3]
my_func = udf(func, StringType())
df = df.withColumn('new_col_name', my_func('col_name'))
  • 表之间Cross Join笛卡尔积问题:
sc.conf.set("spark.sql.crossJoin.enabled", True)

  • 创建临时视图:
df.createOrReplaceTempView("view_name")

  • 创建dataframe
data = [(0, 1)]
col_names = ['col1', 'col2']
spark.createDataFrame(, col_names)
  • 聚合运算
df = df.groupby(['col_group']).agg({'col1':'sum', 'col2':'count'})
  • 合并RDD
df = df.union(df_tmp)
  • 对RDD重命名
df = df.withColumnRenamed(old_colname, new_colname)

  • python的timestamp转为spark的date格式
df = df.withColumn('col', lit(col.date()))
  • 表连接
cols = ['col1', 'col2']
df = df.join(df2, cols, how = 'left')
  • 列合并
df = df.withColumn('col', concat('col1', 'col2'))
# concat_ws can use separator, concat can't
df = claim_group_info.withColumn('col', concat_ws('-','col1', 'col2'))

    原文作者:刘子慕
    原文地址: https://zhuanlan.zhihu.com/p/31134940
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞