参考这篇文章:master苏:pyspark系列–连接spark
- 1.连接spark集群
如果需要在集群中使用指定的python版本(系统默认是2.6),如python3.5,那么就需要在每个节点都安装python3.5,而且将python3.5的目录增加到spark的环境变量中。
或者在python程序中指定也行。
import os
os.environ['SPARK_HOME'] = '/usr/local/workspace/spark-2.1.0-bin-hadoop2.7'
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.5'
os.environ['PYSPARK_DRIVER_PYTHON']='python3'
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.enableHiveSupport() \
.appName("my_first_app_name") \
.getOrCreate()
spark.sql.SparkSession
: DataFrame和SQL功能的主要入口点。
A SparkSession can be used create DataFrame
, register DataFrame
as tables, execute SQL over tables, cache tables, and read parquet files.
要创建SparkSession,请使用以下构建器模式:
spark = SparkSession.builder \
... .master("local") \
... .appName("Word Count") \
... .config("spark.some.config.option", "some-value") \
... .enableHiveSupport() \
... .getOrCreate()
- 2. 提交作业
提交作业可以通过上面的方法,先连接spark,然后使用spark session做各种操作。另一种就是通过submit方式将.py文件提交到spark集群。
特别的,如果使用python虚拟环境,可以通过PYSPARK_DRIVER_PYTHON
,PYSPARK_PYTHON
指定。
# 提交spark作业
PYSPARK_DRIVER_PYTHON=/opt/anaconda3/envs/xxljob/bin/python \
PYSPARK_PYTHON=/opt/anaconda3/envs/xxljob/bin/python \
/usr/local/workspace/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \
--master yarn \ #也可以是 yarn-client,yarn-cluster
--queue ai \
--num-executors 12 \
--driver-memory 30g \
--executor-cores 4 \
--executor-memory 32G
/tmp/test_spark.py