pyspark 使用总结
配置
SparkContext
from pyspark import SparkContext
from pyspark import SparkConf
# SparkContext配置初始化
conf = SparkConf()
conf.set('spark.app.name', 'your app mark label')
sc = SparkContext(conf)
SparkSession配置初始化
ss = SparkSession.builder \
.master("yarn") \
.appName("your app mark label") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
导入
sc从文本导入
# python2 读出是一整行字符串,而且是unicode类型
rdd = sc.textFile(yourfilepath)
pickle_rdd= sc.pickleFile(yourfilepath)
从csv导入
1.1版本:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('xxx.csv')
这种方式需要在运行的时候添加--packages com.databricks:spark-csv_2.10:1.3.0
2.0+版本:
spark\
.read\
.option("header", "true")\
.options(delimiter='|')\
.csv(basic_info_dir)
序列
rdd = sc.parallelize([..., ..., ...])
数据对象转换
rdd、DataFrame的相互转换
rdd to dataframe
df = rdd.toDF([columns])
dataframe to rdd
rdd = df.rdd
sparksession 获取sc
spark.sparkContext
转换操作
map
对各个元素进行操作,常用
rdd.map(lambda x: fun(x))
或者 rdd.map(lambda k, v: fun(x, y))
注:上述常用,但由于x或者k或者v可能是包含多个数据的元组,如果写成lambda x: x[0] ...
不易维护,所以可以将每个项写明用意,比如lambda word, count: fun(word, count)
filter
选择指定的元素,需要lambda的函数返回True或False
rdd.filter(lambda x: fun(x))
group by
rdd.groupByKey().mapValues(lambda x: fun(grouped))
join
需要将rdd变成k, v格式,然后join,join后v变为两个rdd的tuple
reduceByKey
对于只需要算数字的,最好不要用groupByKey而用reduceByKey,比如rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])
持久化操作
打印
# 查看条数
rdd.count()
# 查看前几条
rdd.take(3)
# 查看所有
rdd.collect()
存储文件
# 这种会存储成(k, v)文本
rdd.saveAsTextFile(dir)
# 如果希望一个个字段,可以flatten后再执行。
def flat(lst):
return reduce(
lambda l, e: (
not isinstance(e, str) and
(isinstance(e, list) or isinstance(e, tuple))
) and l + flat(e) or l + (e,), lst, ())
def generate_line(raw, operator=u'|'):
if not isinstance(raw, tuple) and not isinstance(raw, list):
return unicode(raw).encode('utf-8')
return operator.join([unicode(i) for i in flat(raw)]).encode('utf-8')
dataframe存储csv
spark 1.1:
df.write
.format('com.databricks.spark.csv')
.options(header='true')
.options(delimiter='|')
.save(dir)
spark1.2:
df\
.write\
.options(header='true')\
.csv(dir)
调用
集群运行
spark2-submit \
--master yarn \
--driver-cores 2 \
--driver-memory 2G \
--num-executors 10 \
--executor-cores 4 \
--executor-memory 4G \
pyfile params
调优
参考: 美团点评SPARK分享