pyspark底层浅析
pyspark简介
pyspark是Spark官方提供的API接口,同时pyspark也是Spark中的一个程序。
在terminal中输入pyspark指令,可以打开python的shell,同时其中默认初始化了SparkConf和SparkContext.
在编写Spark应用的.py文件时,可以通过import pyspark引入该模块,并通过SparkConf对Spark的启动参数进行设置。不过,如果你仅完成了Spark的安装,直接用python指令运行py文件并不能检索到pyspark模块。你可以通过pip等包管理工具安装该模块,也可以直接使用pyspark(新版本已不支持)或spark-submit直接提交.py文件的作业。
pyspark program
这里指的是spark中的bin/pyspark,github地址 。
实际上pyspark只不过解析了命令行中的参数,并进行了python方面的设置,然后调用spark-submit
exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"
在较新一些的版本如Spark2.2中,已经不支持用pyspark运行py脚本文件,一切spark作业都应该使用spark-submit提交。
pyspark module
Spark是用scala编写的框架,不过考虑到主要是机器学习的应用场景,Spark官方提供了可以用python的API。但是,一方面,python的API是不全的,即不是所有的scala的函数都可以用pyspark调用到,虽然新的API也在随着版本迭代不断开放;另一方面,pyspark模块,对于很多复杂算法,是通过反射机制调用的Spark中JVM里正在运行的scala编写的类、方法。所以,如果你将频繁应用spark于业务或研究,建议学习直接使用scala语言编写程序,而不是python。
这篇博客并不会讲述如何去使用pyspark来编写python的spark应用。各类API以及模块如何使用,你完全可以前往官方文档查看。这里的链接是最新版pyspark的文档,如果你的机器上的spark不是最新版,请去找对应版本的pyspark文档。因为正如我上面所说,不同版本的pyspark逐步开放了新的API并有对旧API进行改进,你在最新版本看到的类、函数,不一定能在旧版本使用。这里一提,对于大部分机器学习算法,你都会看到ml模块与mllib模块都提供了接口,它们的区别在于ml模块接受DataFrame格式的数据而mllib模块接受RDD格式的数据。
关于pyspark底层,这里主要探索两个地方。一个是其初始化时的工作,一个是其对JVM中scala代码的调用
SparkContext
SparkContext类在pyspark/context.py中,在python代码里通过初试化该类的实例来完成Spark的启动与初始化。这个类的__init__方法中执行了下面几行代码
self._callsite = first_spark_call() or CallSite(None, None, None)
SparkContext._ensure_initialized(self, gateway=gateway)
try:
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf, jsc, profiler_cls)
except:
# If an error occurs, clean up in order to allow future SparkContext creation:
self.stop()
raise
first_spark_call和CallSite方法都是用来获取JAVA虚拟机中的堆栈,它们在pyspark/traceback_util.py中。
之后调用了类函数_ensure_initialized函数,对Spark的Java的gate_way和jvm进行设置。
最后调用了类中的_do_init_函数,从函数就可以看出是对内部类成员SparkConf的实例_conf函数进行设置,判断各参数值是否为None,非空的话就进行设置,并读取一些本地的python环境参数,启动Spark。
调用JVM类与方法
以mllib库为例,主要逻辑都在pyspark/mllib/common.py中。你去查看mllib模块中机器学习算法的类与函数,你会发现基本都是使用self.call或者callMLlibFunc,将函数名与参数传入。
各类模型的Model类都继承自common.JavaModelWrapper,这个类代码很短:
class JavaModelWrapper(object):
"""
Wrapper for the model in JVM
"""
def __init__(self, java_model):
self._sc = SparkContext._active_spark_context
self._java_model = java_model
def __del__(self):
self._sc._gateway.detach(self._java_model)
def call(self, name, *a):
"""Call method of java_model"""
return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
_java_model是来自Java或Scala的类的实例,在调用对应的训练算法时由对应的scala代码在末尾将这些类初始化并返回,其关键的类方法call,同callMLLibFunc方法一样,都是调用了callJavaFunc的方法。对于调用某一类的方法,是运用python的getattr函数,将类实例与方法名传入,使用反射机制获取函数;而对于调用一些不属于类的方法,即使用callMLLibFunc时,是传入的PythonMLLibAPI类的实例以及方法名,来获取函数:
def callMLlibFunc(name, *args):
""" Call API in PythonMLLibAPI """
sc = SparkContext.getOrCreate()
api = getattr(sc._jvm.PythonMLLibAPI(), name)
return callJavaFunc(sc, api, *args)
最终callJavaFunc做的也很简单,将python的参数*a,使用_py2java方法转换为java的数据类型,并执行函数,再将结果使用_java2py方法转换为python的数据类型返回:
def callJavaFunc(sc, func, *args):
""" Call Java Function """
args = [_py2java(sc, a) for a in args]
return _java2py(sc, func(*args))
这里的_java2py,对很多数据格式的支持不是很好,所以当你尝试用底层的call方法调用一些pyspark尚未支持但scala中已经有的函数时,可能在scala部分可以执行,但是python的返回结果却不尽如人意。
ml模块的调用机制与mllib的机制有些许的不同,但本质上都还是去调用在Spark的JVM中scala代码的class。
总结
本篇博客其实说的非常简单,pyspark即使是不涉及具体算法的部分,也还有很多内容尚未讨论。这里仅是对pyspark产生一个初步的认识,同时简单分析了一下底层对scala的调用过程。
你兴许会有这样的疑问–“去看这些源代码有什么用呢?好像就算知道这些,实际使用时不还是用一下API就好了吗?”。
实际上,看源代码首先的就是满足一下好奇心,对Spark有一个更充分的了解;其次关于具体用途,我举个例子,很多情况你使用的集群可能不是最新版本的,因为复杂的配置导致一般而言也不可能有一个新版本就更新一次,这时你想用新版本的API怎么办?看了这篇博客想必你也会有一些“大胆的想法”。后一篇博客会举例说明我在实际工作中相关的一个问题,以及如何利用这些源码去解决的。