Spark 使用笔记

pyspark 使用总结

配置

SparkContext

from pyspark import SparkContext
from pyspark import SparkConf

# SparkContext配置初始化
conf = SparkConf()
conf.set('spark.app.name', 'your app mark label')


sc = SparkContext(conf)

SparkSession配置初始化

ss = SparkSession.builder \
    .master("yarn") \
    .appName("your app mark label") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

导入

sc从文本导入

# python2 读出是一整行字符串,而且是unicode类型
rdd = sc.textFile(yourfilepath)
pickle_rdd= sc.pickleFile(yourfilepath)

从csv导入

1.1版本:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('xxx.csv')

这种方式需要在运行的时候添加--packages com.databricks:spark-csv_2.10:1.3.0
2.0+版本:

spark\
.read\
.option("header", "true")\
.options(delimiter='|')\
.csv(basic_info_dir)

序列

rdd = sc.parallelize([..., ..., ...])

数据对象转换

rdd、DataFrame的相互转换

rdd to dataframe

df = rdd.toDF([columns])

dataframe to rdd

rdd = df.rdd

sparksession 获取sc

spark.sparkContext

转换操作

map

对各个元素进行操作,常用
rdd.map(lambda x: fun(x)) 或者 rdd.map(lambda k, v: fun(x, y))
注:上述常用,但由于x或者k或者v可能是包含多个数据的元组,如果写成lambda x: x[0] ...不易维护,所以可以将每个项写明用意,比如lambda word, count: fun(word, count)

filter

选择指定的元素,需要lambda的函数返回True或False
rdd.filter(lambda x: fun(x))

group by

rdd.groupByKey().mapValues(lambda x: fun(grouped))

join

需要将rdd变成k, v格式,然后join,join后v变为两个rdd的tuple

reduceByKey

对于只需要算数字的,最好不要用groupByKey而用reduceByKey,比如rdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])

持久化操作

打印

# 查看条数
rdd.count()
# 查看前几条
rdd.take(3)
# 查看所有
rdd.collect()

存储文件

# 这种会存储成(k, v)文本
rdd.saveAsTextFile(dir)
# 如果希望一个个字段,可以flatten后再执行。
def flat(lst):
    return reduce(
        lambda l, e: (
            not isinstance(e, str) and
            (isinstance(e, list) or isinstance(e, tuple))
        ) and l + flat(e) or l + (e,), lst, ())


def generate_line(raw, operator=u'|'):
    if not isinstance(raw, tuple) and not isinstance(raw, list):
        return unicode(raw).encode('utf-8')
    return operator.join([unicode(i) for i in flat(raw)]).encode('utf-8')

dataframe存储csv

spark 1.1:

df.write
.format('com.databricks.spark.csv')
.options(header='true')
.options(delimiter='|')
.save(dir)

spark1.2:

df\
.write\
.options(header='true')\
.csv(dir)

调用

集群运行

spark2-submit \
    --master yarn \
    --driver-cores 2 \
    --driver-memory 2G \
    --num-executors 10 \
    --executor-cores 4 \
    --executor-memory 4G \
    pyfile params

调优

参考: 美团点评SPARK分享

    原文作者:slowrabbit
    原文地址: https://www.jianshu.com/p/f027d2fbd8b8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞