工作中用PySpark更多的是做数据处理的工作,PySpark提供了很多对Spark DataFrame(RDD)操作的函数,有点类似Pandas,但这种函数的缺点是可读性比较差,尤其是代码达到几百行的时候(捂脸)。所以推荐尽量使用SQL模块,让代码具有很高的可读性。如果很难用SQL进行处理的时候,再考虑使用函数。如果存在大量的SQL嵌套,建议像C语言一样使用多层缩进让代码逻辑清晰可见。
- 配置spark context
sc = SparkSession.builder \
.appName("Spark_CLOB_Split") \
.config("hive.metastore.sasl.enabled", "true") \
.enableHiveSupport() \
.getOrCreate()
- 清空已有表的数据:试了SQL最常用的delete * from 和delete from都不管用,竟然支持的是”truncate table <表名>”这种次常用的语法
- 查询数据:用标准的sql语句即可,注意不支持having语法,另外表连接只支持等值连接
sc.sql("""select some columns, sum(any solumn) as col_name from your_table1 a
left join your_table2 b
on a.key = b.key
where a.col_name > 0
group by some columns
- 插入表:假设有一个名为df的spark frame
df.registerTempTable("tmp")
spark.sql("insert into your_table select * from tmp")
- 新生成一列常量:需要使用lit函数
from pyspark.sql.functions import lit
df.withColumn('your_col_name' ,lit(your_const_var))
- 新生成一列:利用自定义函数对某一列进行运算,生成新的一列
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType
def func(s):
return s[:3]
my_func = udf(func, StringType())
df = df.withColumn('new_col_name', my_func('col_name'))
- 表之间Cross Join笛卡尔积问题:
sc.conf.set("spark.sql.crossJoin.enabled", True)
- 创建临时视图:
df.createOrReplaceTempView("view_name")
- 创建dataframe
data = [(0, 1)]
col_names = ['col1', 'col2']
spark.createDataFrame(, col_names)
- 聚合运算
df = df.groupby(['col_group']).agg({'col1':'sum', 'col2':'count'})
- 合并RDD
df = df.union(df_tmp)
- 对RDD重命名
df = df.withColumnRenamed(old_colname, new_colname)
- python的timestamp转为spark的date格式
df = df.withColumn('col', lit(col.date()))
- 表连接
cols = ['col1', 'col2']
df = df.join(df2, cols, how = 'left')
- 列合并
df = df.withColumn('col', concat('col1', 'col2'))
# concat_ws can use separator, concat can't
df = claim_group_info.withColumn('col', concat_ws('-','col1', 'col2'))