spark-python版本依赖与三方模块方案

spark-python版本依赖与三方模块方案

更新:2018-9-21

推翻以前的方法,新方法是在每个节点安装相同的pytho环境

更新:2018-10-25

2018-9-21 的更新中,发现还是无法使用虚拟环境,如果多个项目版本不一致的话容易出问题。 因此,修改--conf.python中的python路径,使之能引用虚拟环境。

1. 背景

公司有统一的spark大数据集群,但spark用的python版本是python2.7,项目组这边都是用python3.5,甚至有些项目用的是python3.6,对某些第三方包,有些项目用到pandas0.18,有些是pandas0.23等。相信这个问题用python的同学都遇到过,就是python的版本管理和第三包版本管理问题,一般用python虚拟环境就能解决。

针对我们遇到的spark的python版本问题,当然也是要通过python虚拟环境来解决,管理虚拟环境,当然还是conda好用。

解决方法

首先在每个节点安装相同的python版本

注意,需要在spark集群的每个节点安装相同的python版本,如果依赖第三方包,要求每个节点也安装有相同的第三方包。 确实很麻烦,比较偷懒的办法是用用ftp同步的方式,在一个节点安装后自动同步更新到其余节点

spark任务的提交参数

注意,启动spark的时候,依然要先切换到python2.7的版本,然后再提交。 为了能让python引用我们的项目代码,需要将项目代码目录添加到pythonpath环境变量中。

code_path=/usr/local/suzhenyu/da_groups  # 项目代码目录

cd ${code_path}

export PATH=/usr/bin:$PATH               # 切换到系统的python2.7 

export PYTHONPATH=${code_path}:$PYTHONPATH  # 将项目代码添加到site-package索引

python -V                                # 查看此时Python的版本
pwd

# 提交方式1:使用统一的python环境执行任务
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit --master yarn --queue ai \ 
--name {job_name} \                                       
--conf "spark.pyspark.driver.python=/usr/bin/python3" \  # 这是重点,指定python的版本
--conf "spark.pyspark.python=/usr/bin/python3" \         # 这是重点,指定python的版本   
python_file.py

# 提交方式2:使用虚拟环境提交任务,核心是修改 driver.python 的配置项
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit --master yarn --queue ai \ 
--name {job_name} \                                       
--conf "spark.pyspark.driver.python=/usr/local/miniconda/envs/my_project/bin/python3" \  # 这是重点,指定虚拟环境中的python的版本
--conf "spark.pyspark.python=/usr/local/miniconda/envs/my_project/bin/python3" \         # 这是重点,指定虚拟环境中的python的版本
python_file.py

python_file.py的内容如下,我们测试是否真的引用了虚拟环境的python。

import pymysql
import pandas as pd
print('\n'*3)
print('*'*100)
print(pymysql.__path__)     # 这里打印虚拟环境中的包
print(pd.__path__)          # 这里打印虚拟环境中的包
print('*'*100)
print('\n'*3)

from pyspark.sql import SparkSession
def main():
    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    print('\n'*3)
    print('*'*100)
    print(pymysql.__path__) # 这里打印虚拟环境中的包
    print(pd.__path__)      # 这里打印虚拟环境中的包
    print('*'*100)
    print('\n'*3)
    spark.stop()

if __name__ == '__main__':
     main()

输出如下,我们发现真的可以使用虚拟环境执行任务了,比相对通用环境有了更多的定制和便利。

****************************************************************************************************
['/home/dm/.conda/envs/fusionpredict/lib/python3.6/site-packages/pymysql']
['/home/dm/.conda/envs/fusionpredict/lib/python3.6/site-packages/pandas']
****************************************************************************************************

可以看到,要求在每个节点在相同的位置安装相同的python还是挺麻烦的,当集群变大了之后就需要自动运维平台来管理了。

当然,下面提高将python环境打包上传到hdfs的方式,如果能成功更方便,可惜一直没成功。

注意,下面的方法是错误的,之所以有这个错觉是因为只在driver节点执行,并没有分发到worker节点,上面的才是对的。

之前的方法

2. 解决方法

原理其实很简单,就是在提交pyspark作业的时候,设定项目的python版本路径即可。

2.1. 虚拟环境安装

首先我们创建一个python3.6的虚拟环境

$ conda create -n test_pyspark python=3.6
#
# To activate this environment, use:
# > source activate test_pyspark
#
# To deactivate an active environment, use:
# > source deactivate
#

接着我们在刚才新建的虚拟环境中安装pandas包和logzero包。

$ source activate test_pyspark
$ pip install logzero
$ pip install pandas

为了测试能够引用第三方包外,还要测试能引用项目的其他代码,因此,需要将项目地址添加到python的索引目录,也就是site-package

# 进入刚才新建的虚拟环境的site-package
$ cd /usr/local/miniconda/envs/test_pyspark/lib/python3.6/site-packages/
# 创建一个 .pth 结尾的文件,内容是我们的python项目地址
$ vi my_project.pth
/home/suzhenyu/python_code    # 内容只有一行,就是这个地址

好了接下来我们新建一个需要测试的python文件,包含三个部分内容: – 从hive读数据 – 将hive读取的数据转成pandas的dataframe – 引用我们的项目代码其中的某个函数

2.2. 测试文件

test_spark.py

# -*- coding: utf-8 -*-
import pandas as pd 
from aipurchase.common.log.log import logger  # 自定义函数
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .enableHiveSupport() \
        .getOrCreate()

# 读取hive表
df = spark.sql("""select saledate, salecount from ai.rpt_orderdetial_dailysale limit 5""")
df.show()

# 转成pandas的dataframe
df2 = df.toPandas()

# 使用自定义函数
df3 = logger.create_pretty_table(df)
print(df3)

2.3. 提交测试

需要说明的是,由于集群spark的python是2.7,因此必须将当前用户环境的python切换到2.7才能提交,在提交后根据我们设定的python路径来决定使用的python脚本。

# 切换到python2.7
export PATH=/usr/local/miniconda/envs/python27_base/bin:$PATH 

# 重要,设定pyspark的python执行器版本为3.6
export PYSPARK_PYTHON=/usr/local/miniconda/envs/test_pyspark/bin/python
export PYSPARK_DRIVER_PYTHON=/usr/local/miniconda/envs/test_pyspark/bin/python

# 提交spark作业
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G  \
/home/suzhenyu/test_spark.py

当然,你也可以使用--conf的方式设置python执行器版本,不过需要注意的是,--conf spark.pyspark.python--conf spark.pyspark.driver.python都是需要小写的。

# 切换到python2.7
export PATH=/usr/local/miniconda/envs/python27_base/bin:$PATH 

# 提交,将配置写在--conf中
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G  \
--conf spark.pyspark.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--conf spark.pyspark.driver.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
/home/suzhenyu/test_spark.py

下面是执行的结果,说明是没问题的。

+------------+-----------+
|  saledate  | salecount |
+------------+-----------+
| 2017-09-09 |     3     |
| 2017-09-09 |     1     |
| 2017-09-09 |     1     |
| 2017-09-09 |     1     |
| 2017-09-09 |     5     |
+------------+-----------+

3. 提交参数问题

在spark的提交参数中,有一个 --py-files参数,对于 Python 来说,可以使用 spark-submit 的 --py-files 参数来添加 .py, .zip.egg 文件以与应用程序一起分发。如果依赖了多个 Python 文件推荐将它们打包成一个 .zip 或者 .egg 文件。 这句话的意思就是,将项目的代码打包,然后随程序一起提交到spark中。

当前,如果我们前面的配置没问题,其实这一步是没有必要的。

# 压缩项目代码
zip -r project_name.zip  project_name

# 提交spark作业
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G  \
--conf spark.pyspark.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--conf spark.pyspark.driver.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--py-files /home/suzhenyu/project_name.zip \
/home/suzhenyu/test_spark.py

4. 其他

网上还有其他的方法,但是试了很多,就试出这个方法是可行的,也许是我打开的方式不对,比如,将python环境打包到hdfs,执行的时候指定python压缩解压的地址,可惜没成功,要是成功了也是挺方便吧,这是地址,点击跳转

5. 参考

spark-python版本依赖与三方模块方案 首发于知乎专栏 从推公式到写代码, CSDN

欢迎大家关注点赞。spark-python版本依赖与三方模块方案

更新:2018-9-21

1. 背景

公司有统一的spark大数据集群,但spark用的python版本是python2.7,项目组这边都是用python3.5,甚至有些项目用的是python3.6,对某些第三方包,有些项目用到pandas0.18,有些是pandas0.23等。相信这个问题用python的同学都遇到过,就是python的版本管理和第三包版本管理问题,一般用python虚拟环境就能解决。

针对我们遇到的spark的python版本问题,当然也是要通过python虚拟环境来解决,管理虚拟环境,当然还是conda好用。

解决方法

首先在每个节点安装相同的python版本

注意,需要在spark集群的每个节点安装相同的python版本,如果依赖第三方包,要求每个节点也安装有相同的第三方包。 确实很麻烦,比较偷懒的办法是用用ftp同步的方式,在一个节点安装后自动同步更新到其余节点

spark任务的提交参数

注意,启动spark的时候,依然要先切换到python2.7的版本,然后再提交。 为了能让python引用我们的项目代码,需要将项目代码目录添加到pythonpath环境变量中。

code_path=/usr/local/suzhenyu/da_groups  # 项目代码目录

cd ${code_path}

export PATH=/usr/bin:$PATH               # 切换到系统的python2.7 

export PYTHONPATH=${code_path}:$PYTHONPATH  # 将项目代码添加到site-package索引

python -V                                # 查看此时Python的版本
pwd

/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit --master yarn --queue ai \ 
--name {job_name} \                                       
--conf "spark.pyspark.driver.python=/usr/bin/python3" \  # 这是重点,指定python的版本
--conf "spark.pyspark.python=/usr/bin/python3" \         # 这是重点,指定python的版本   
python_file.py

可以看到,要求在每个节点在相同的位置安装相同的python还是挺麻烦的,当集群变大了之后就需要自动运维平台来管理了。

当然,下面提高将python环境打包上传到hdfs的方式,如果能成功更方便,可惜一直没成功。

注意,下面的方法是错误的,之所以有这个错觉是因为只在driver节点执行,并没有分发到worker节点,上面的才是对的。

之前的方法

2. 解决方法

原理其实很简单,就是在提交pyspark作业的时候,设定项目的python版本路径即可。

2.1. 虚拟环境安装

首先我们创建一个python3.6的虚拟环境

$ conda create -n test_pyspark python=3.6
#
# To activate this environment, use:
# > source activate test_pyspark
#
# To deactivate an active environment, use:
# > source deactivate
#

接着我们在刚才新建的虚拟环境中安装pandas包和logzero包。

$ source activate test_pyspark
$ pip install logzero
$ pip install pandas

为了测试能够引用第三方包外,还要测试能引用项目的其他代码,因此,需要将项目地址添加到python的索引目录,也就是site-package

# 进入刚才新建的虚拟环境的site-package
$ cd /usr/local/miniconda/envs/test_pyspark/lib/python3.6/site-packages/
# 创建一个 .pth 结尾的文件,内容是我们的python项目地址
$ vi my_project.pth
/home/suzhenyu/python_code    # 内容只有一行,就是这个地址

好了接下来我们新建一个需要测试的python文件,包含三个部分内容: – 从hive读数据 – 将hive读取的数据转成pandas的dataframe – 引用我们的项目代码其中的某个函数

2.2. 测试文件

test_spark.py

# -*- coding: utf-8 -*-
import pandas as pd 
from aipurchase.common.log.log import logger  # 自定义函数
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .enableHiveSupport() \
        .getOrCreate()

# 读取hive表
df = spark.sql("""select saledate, salecount from ai.rpt_orderdetial_dailysale limit 5""")
df.show()

# 转成pandas的dataframe
df2 = df.toPandas()

# 使用自定义函数
df3 = logger.create_pretty_table(df)
print(df3)

2.3. 提交测试

需要说明的是,由于集群spark的python是2.7,因此必须将当前用户环境的python切换到2.7才能提交,在提交后根据我们设定的python路径来决定使用的python脚本。

# 切换到python2.7
export PATH=/usr/local/miniconda/envs/python27_base/bin:$PATH 

# 重要,设定pyspark的python执行器版本为3.6
export PYSPARK_PYTHON=/usr/local/miniconda/envs/test_pyspark/bin/python
export PYSPARK_DRIVER_PYTHON=/usr/local/miniconda/envs/test_pyspark/bin/python

# 提交spark作业
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G  \
/home/suzhenyu/test_spark.py

当然,你也可以使用--conf的方式设置python执行器版本,不过需要注意的是,--conf spark.pyspark.python--conf spark.pyspark.driver.python都是需要小写的。

# 切换到python2.7
export PATH=/usr/local/miniconda/envs/python27_base/bin:$PATH 

# 提交,将配置写在--conf中
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G  \
--conf spark.pyspark.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--conf spark.pyspark.driver.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
/home/suzhenyu/test_spark.py

下面是执行的结果,说明是没问题的。

+------------+-----------+
|  saledate  | salecount |
+------------+-----------+
| 2017-09-09 |     3     |
| 2017-09-09 |     1     |
| 2017-09-09 |     1     |
| 2017-09-09 |     1     |
| 2017-09-09 |     5     |
+------------+-----------+

3. 提交参数问题

在spark的提交参数中,有一个 --py-files参数,对于 Python 来说,可以使用 spark-submit 的 --py-files 参数来添加 .py, .zip.egg 文件以与应用程序一起分发。如果依赖了多个 Python 文件推荐将它们打包成一个 .zip 或者 .egg 文件。 这句话的意思就是,将项目的代码打包,然后随程序一起提交到spark中。

当前,如果我们前面的配置没问题,其实这一步是没有必要的。

# 压缩项目代码
zip -r project_name.zip  project_name

# 提交spark作业
/usr/hdp/2.6.4.0-91/spark2/bin/spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G  \
--conf spark.pyspark.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--conf spark.pyspark.driver.python=/usr/local/miniconda/envs/test_pyspark/bin/python \
--py-files /home/suzhenyu/project_name.zip \
/home/suzhenyu/test_spark.py

4. 其他

网上还有其他的方法,但是试了很多,就试出这个方法是可行的,也许是我打开的方式不对,比如,将python环境打包到hdfs,执行的时候指定python压缩解压的地址,可惜没成功,要是成功了也是挺方便吧,这是地址,点击跳转

5. 参考

spark-python版本依赖与三方模块方案 首发于知乎专栏 从推公式到写代码, CSDN

欢迎大家关注点赞。

    原文作者:master苏
    原文地址: https://zhuanlan.zhihu.com/p/43434216
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞