连接spark
- 1. 连接spark
- 1.1. 简单连接spark
- 1.2. 连接spark集群
- 1.3. 集群python环境
- 1.4. config参数
- 2. 提交作业
1. 连接spark
1.1. 简单连接spark
from pyspark.sql import SparkSession
spark=SparkSession \
.builder \
.appName('my_first_app_name') \
.getOrCreate()
1.2. 连接spark集群
# 使支持hive
spark = SparkSession \
.builder \
.enableHiveSupport() \
.master("172.31.100.170:7077") \
.appName("my_first_app_name") \
.getOrCreate()
1.3. 集群python环境
如果需要在集群中使用指定的python版本(系统默认是2.6),如python3.5,那么就需要在每个节点都安装python3.5,而且将python3.5的目录增加到spark的环境变量中。
或者在python程序中指定也行。
import os
os.environ['SPARK_HOME'] = '/usr/local/workspace/spark-2.1.0-bin-hadoop2.7'
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.5'
os.environ['PYSPARK_DRIVER_PYTHON']='python3'
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.enableHiveSupport()\
.master("172.31.100.170:7077")\
.appName("my_first_app_name")\
.getOrCreate()
1.4. config参数
# 在连接spark的时候,还可以添加其他参数,用config
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.enableHiveSupport()\
.master("172.31.100.170:7077")\
.appName("my_first_app_name")\
.config('spark.some.config.option','value') \
.config('spark.some.config.option','value') \
...
.getOrCreate()
2. 提交作业
提交作业可以通过上面的方法,先连接spark,然后使用spark session做各种操作。另一种就是通过submit方式将.py文件提交到spark集群。
特别的,如果使用python虚拟环境,可以通过PYSPARK_DRIVER_PYTHON
,PYSPARK_PYTHON
指定。
# 提交spark作业
PYSPARK_DRIVER_PYTHON=/opt/anaconda3/envs/xxljob/bin/python \
PYSPARK_PYTHON=/opt/anaconda3/envs/xxljob/bin/python \
/usr/local/workspace/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \
--master yarn \ #也可以是 yarn-client,yarn-cluster
--queue ai \
--num-executors 12 \
--driver-memory 30g \
--executor-cores 4 \
--executor-memory 32G
/tmp/test_spark.py