spark-python版本依赖与三方模块方案
更新:2018-9-21
推翻以前的方法,新方法是在每个节点安装相同的pytho环境
更新:2018-10-25
2018-9-21 的更新中,发现还是无法使用虚拟环境,如果多个项目版本不一致的话容易出问题。 因此,修改--conf.python
中的python路径,使之能引用虚拟环境。
1. 背景
公司有统一的spark大数据集群,但spark用的python版本是python2.7,项目组这边都是用python3.5,甚至有些项目用的是python3.6,对某些第三方包,有些项目用到pandas0.18,有些是pandas0.23等。相信这个问题用python的同学都遇到过,就是python的版本管理和第三包版本管理问题,一般用python虚拟环境就能解决。
针对我们遇到的spark的python版本问题,当然也是要通过python虚拟环境来解决,管理虚拟环境,当然还是conda好用。
解决方法
首先在每个节点安装相同的python版本
注意,需要在spark集群的每个节点安装相同的python版本,如果依赖第三方包,要求每个节点也安装有相同的第三方包。 确实很麻烦,比较偷懒的办法是用用ftp同步的方式,在一个节点安装后自动同步更新到其余节点
spark任务的提交参数
注意,启动spark的时候,依然要先切换到python2.7的版本,然后再提交。 为了能让python引用我们的项目代码,需要将项目代码目录添加到pythonpath环境变量中。
code_path=/usr/local/suzhenyu/da_groups # 项目代码目录
cd ${code_path}
export PATH=/usr/bin:$PATH # 切换到系统的python2.7
export PYTHONPATH=${code_path}:$PYTHONPATH # 将项目代码添加到site-package索引
python -V # 查看此时Python的版本
pwd
# 提交方式1:使用统一的python环境执行任务
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit --master yarn --queue ai \
--name {job_name} \
--conf "spark.pyspark.driver.python=/usr/bin/python3" \ # 这是重点,指定python的版本
--conf "spark.pyspark.python=/usr/bin/python3" \ # 这是重点,指定python的版本
python_file.py
# 提交方式2:使用虚拟环境提交任务,核心是修改 driver.python 的配置项
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit --master yarn --queue ai \
--name {job_name} \
--conf "spark.pyspark.driver.python=/usr/local/miniconda/envs/my_project/bin/python3" \ # 这是重点,指定虚拟环境中的python的版本
--conf "spark.pyspark.python=/usr/local/miniconda/envs/my_project/bin/python3" \ # 这是重点,指定虚拟环境中的python的版本
python_file.py
python_file.py
的内容如下,我们测试是否真的引用了虚拟环境的python。
import pymysql
import pandas as pd
print('\n'*3)
print('*'*100)
print(pymysql.__path__) # 这里打印虚拟环境中的包
print(pd.__path__) # 这里打印虚拟环境中的包
print('*'*100)
print('\n'*3)
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
print('\n'*3)
print('*'*100)
print(pymysql.__path__) # 这里打印虚拟环境中的包
print(pd.__path__) # 这里打印虚拟环境中的包
print('*'*100)
print('\n'*3)
spark.stop()
if __name__ == '__main__':
main()
输出如下,我们发现真的可以使用虚拟环境执行任务了,比相对通用环境有了更多的定制和便利。
****************************************************************************************************
['/home/dm/.conda/envs/fusionpredict/lib/python3.6/site-packages/pymysql']
['/home/dm/.conda/envs/fusionpredict/lib/python3.6/site-packages/pandas']
****************************************************************************************************
可以看到,要求在每个节点在相同的位置安装相同的python还是挺麻烦的,当集群变大了之后就需要自动运维平台来管理了。
当然,下面提高将python环境打包上传到hdfs的方式,如果能成功更方便,可惜一直没成功。
注意,下面的方法是错误的,之所以有这个错觉是因为只在driver节点执行,并没有分发到worker节点,上面的才是对的。
之前的方法
2. 解决方法
原理其实很简单,就是在提交pyspark作业的时候,设定项目的python版本路径即可。
2.1. 虚拟环境安装
首先我们创建一个python3.6的虚拟环境
$ conda create -n test_pyspark python=3.6
#
# To activate this environment, use:
# > source activate test_pyspark
#
# To deactivate an active environment, use:
# > source deactivate
#
接着我们在刚才新建的虚拟环境中安装pandas包和logzero包。
$ source activate test_pyspark
$ pip install logzero
$ pip install pandas
为了测试能够引用第三方包外,还要测试能引用项目的其他代码,因此,需要将项目地址添加到python的索引目录,也就是site-package
# 进入刚才新建的虚拟环境的site-package
$ cd /usr/local/miniconda/envs/test_pyspark/lib/python3.6/site-packages/
# 创建一个 .pth 结尾的文件,内容是我们的python项目地址
$ vi my_project.pth
/home/suzhenyu/python_code # 内容只有一行,就是这个地址
好了接下来我们新建一个需要测试的python文件,包含三个部分内容: – 从hive读数据 – 将hive读取的数据转成pandas的dataframe – 引用我们的项目代码其中的某个函数
2.2. 测试文件
test_spark.py
# -*- coding: utf-8 -*-
import pandas as pd
from aipurchase.common.log.log import logger # 自定义函数
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.enableHiveSupport() \
.getOrCreate()
# 读取hive表
df = spark.sql("""select saledate, salecount from ai.rpt_orderdetial_dailysale limit 5""")
df.show()
# 转成pandas的dataframe
df2 = df.toPandas()
# 使用自定义函数
df3 = logger.create_pretty_table(df)
print(df3)
2.3. 提交测试
需要说明的是,由于集群spark的python是2.7,因此必须将当前用户环境的python切换到2.7才能提交,在提交后根据我们设定的python路径来决定使用的python脚本。
# 切换到python2.7
export PATH=/usr/local/miniconda/envs/python27_base/bin:$PATH
# 重要,设定pyspark的python执行器版本为3.6
export PYSPARK_PYTHON=/usr/local/miniconda/envs/test_pyspark/bin/python
export PYSPARK_DRIVER_PYTHON=/usr/local/miniconda/envs/test_pyspark/bin/python
# 提交spark作业
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G \
/home/suzhenyu/test_spark.py
当然,你也可以使用--conf
的方式设置python执行器版本,不过需要注意的是,--conf spark.pyspark.python
和--conf spark.pyspark.driver.python
都是需要小写的。
# 切换到python2.7
export PATH=/usr/local/miniconda/envs/python27_base/bin:$PATH
# 提交,将配置写在--conf中
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G \
--conf spark.pyspark.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--conf spark.pyspark.driver.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
/home/suzhenyu/test_spark.py
下面是执行的结果,说明是没问题的。
+------------+-----------+
| saledate | salecount |
+------------+-----------+
| 2017-09-09 | 3 |
| 2017-09-09 | 1 |
| 2017-09-09 | 1 |
| 2017-09-09 | 1 |
| 2017-09-09 | 5 |
+------------+-----------+
3. 提交参数问题
在spark的提交参数中,有一个 --py-files
参数,对于 Python 来说,可以使用 spark-submit 的 --py-files
参数来添加 .py, .zip
和 .egg
文件以与应用程序一起分发。如果依赖了多个 Python 文件推荐将它们打包成一个 .zip
或者 .egg
文件。 这句话的意思就是,将项目的代码打包,然后随程序一起提交到spark中。
当前,如果我们前面的配置没问题,其实这一步是没有必要的。
# 压缩项目代码
zip -r project_name.zip project_name
# 提交spark作业
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G \
--conf spark.pyspark.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--conf spark.pyspark.driver.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--py-files /home/suzhenyu/project_name.zip \
/home/suzhenyu/test_spark.py
4. 其他
网上还有其他的方法,但是试了很多,就试出这个方法是可行的,也许是我打开的方式不对,比如,将python环境打包到hdfs,执行的时候指定python压缩解压的地址,可惜没成功,要是成功了也是挺方便吧,这是地址,点击跳转。
5. 参考
- pyspark使用anaconda后spark-submit方法
- How do I set the driver’s python version in spark
- 基于YARN集群构建运行PySpark Application
- Python Environments for PySpark, Part 1: Using Condas
- Conda env && Spark Jobs
- Making Python on Apache Hadoop Easier with Anaconda and CDH
- Zeppelin %python.conda and %python.sql interpreters do not work without adding Anaconda libraries to %PATH
spark-python版本依赖与三方模块方案 首发于知乎专栏 从推公式到写代码, CSDN
欢迎大家关注点赞。spark-python版本依赖与三方模块方案
更新:2018-9-21
1. 背景
公司有统一的spark大数据集群,但spark用的python版本是python2.7,项目组这边都是用python3.5,甚至有些项目用的是python3.6,对某些第三方包,有些项目用到pandas0.18,有些是pandas0.23等。相信这个问题用python的同学都遇到过,就是python的版本管理和第三包版本管理问题,一般用python虚拟环境就能解决。
针对我们遇到的spark的python版本问题,当然也是要通过python虚拟环境来解决,管理虚拟环境,当然还是conda好用。
解决方法
首先在每个节点安装相同的python版本
注意,需要在spark集群的每个节点安装相同的python版本,如果依赖第三方包,要求每个节点也安装有相同的第三方包。 确实很麻烦,比较偷懒的办法是用用ftp同步的方式,在一个节点安装后自动同步更新到其余节点
spark任务的提交参数
注意,启动spark的时候,依然要先切换到python2.7的版本,然后再提交。 为了能让python引用我们的项目代码,需要将项目代码目录添加到pythonpath环境变量中。
code_path=/usr/local/suzhenyu/da_groups # 项目代码目录
cd ${code_path}
export PATH=/usr/bin:$PATH # 切换到系统的python2.7
export PYTHONPATH=${code_path}:$PYTHONPATH # 将项目代码添加到site-package索引
python -V # 查看此时Python的版本
pwd
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit --master yarn --queue ai \
--name {job_name} \
--conf "spark.pyspark.driver.python=/usr/bin/python3" \ # 这是重点,指定python的版本
--conf "spark.pyspark.python=/usr/bin/python3" \ # 这是重点,指定python的版本
python_file.py
可以看到,要求在每个节点在相同的位置安装相同的python还是挺麻烦的,当集群变大了之后就需要自动运维平台来管理了。
当然,下面提高将python环境打包上传到hdfs的方式,如果能成功更方便,可惜一直没成功。
注意,下面的方法是错误的,之所以有这个错觉是因为只在driver节点执行,并没有分发到worker节点,上面的才是对的。
之前的方法
2. 解决方法
原理其实很简单,就是在提交pyspark作业的时候,设定项目的python版本路径即可。
2.1. 虚拟环境安装
首先我们创建一个python3.6的虚拟环境
$ conda create -n test_pyspark python=3.6
#
# To activate this environment, use:
# > source activate test_pyspark
#
# To deactivate an active environment, use:
# > source deactivate
#
接着我们在刚才新建的虚拟环境中安装pandas包和logzero包。
$ source activate test_pyspark
$ pip install logzero
$ pip install pandas
为了测试能够引用第三方包外,还要测试能引用项目的其他代码,因此,需要将项目地址添加到python的索引目录,也就是site-package
# 进入刚才新建的虚拟环境的site-package
$ cd /usr/local/miniconda/envs/test_pyspark/lib/python3.6/site-packages/
# 创建一个 .pth 结尾的文件,内容是我们的python项目地址
$ vi my_project.pth
/home/suzhenyu/python_code # 内容只有一行,就是这个地址
好了接下来我们新建一个需要测试的python文件,包含三个部分内容: – 从hive读数据 – 将hive读取的数据转成pandas的dataframe – 引用我们的项目代码其中的某个函数
2.2. 测试文件
test_spark.py
# -*- coding: utf-8 -*-
import pandas as pd
from aipurchase.common.log.log import logger # 自定义函数
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.enableHiveSupport() \
.getOrCreate()
# 读取hive表
df = spark.sql("""select saledate, salecount from ai.rpt_orderdetial_dailysale limit 5""")
df.show()
# 转成pandas的dataframe
df2 = df.toPandas()
# 使用自定义函数
df3 = logger.create_pretty_table(df)
print(df3)
2.3. 提交测试
需要说明的是,由于集群spark的python是2.7,因此必须将当前用户环境的python切换到2.7才能提交,在提交后根据我们设定的python路径来决定使用的python脚本。
# 切换到python2.7
export PATH=/usr/local/miniconda/envs/python27_base/bin:$PATH
# 重要,设定pyspark的python执行器版本为3.6
export PYSPARK_PYTHON=/usr/local/miniconda/envs/test_pyspark/bin/python
export PYSPARK_DRIVER_PYTHON=/usr/local/miniconda/envs/test_pyspark/bin/python
# 提交spark作业
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G \
/home/suzhenyu/test_spark.py
当然,你也可以使用--conf
的方式设置python执行器版本,不过需要注意的是,--conf spark.pyspark.python
和--conf spark.pyspark.driver.python
都是需要小写的。
# 切换到python2.7
export PATH=/usr/local/miniconda/envs/python27_base/bin:$PATH
# 提交,将配置写在--conf中
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G \
--conf spark.pyspark.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--conf spark.pyspark.driver.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
/home/suzhenyu/test_spark.py
下面是执行的结果,说明是没问题的。
+------------+-----------+
| saledate | salecount |
+------------+-----------+
| 2017-09-09 | 3 |
| 2017-09-09 | 1 |
| 2017-09-09 | 1 |
| 2017-09-09 | 1 |
| 2017-09-09 | 5 |
+------------+-----------+
3. 提交参数问题
在spark的提交参数中,有一个 --py-files
参数,对于 Python 来说,可以使用 spark-submit 的 --py-files
参数来添加 .py, .zip
和 .egg
文件以与应用程序一起分发。如果依赖了多个 Python 文件推荐将它们打包成一个 .zip
或者 .egg
文件。 这句话的意思就是,将项目的代码打包,然后随程序一起提交到spark中。
当前,如果我们前面的配置没问题,其实这一步是没有必要的。
# 压缩项目代码
zip -r project_name.zip project_name
# 提交spark作业
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G \
--conf spark.pyspark.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--conf spark.pyspark.driver.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--py-files /home/suzhenyu/project_name.zip \
/home/suzhenyu/test_spark.py
4. 其他
网上还有其他的方法,但是试了很多,就试出这个方法是可行的,也许是我打开的方式不对,比如,将python环境打包到hdfs,执行的时候指定python压缩解压的地址,可惜没成功,要是成功了也是挺方便吧,这是地址,点击跳转。
5. 参考
- pyspark使用anaconda后spark-submit方法
- How do I set the driver’s python version in spark
- 基于YARN集群构建运行PySpark Application
- Python Environments for PySpark, Part 1: Using Condas
- Conda env && Spark Jobs
- Making Python on Apache Hadoop Easier with Anaconda and CDH
- Zeppelin %python.conda and %python.sql interpreters do not work without adding Anaconda libraries to %PATH
spark-python版本依赖与三方模块方案 首发于知乎专栏 从推公式到写代码, CSDN
欢迎大家关注点赞。