使用mongo官方提供的spark connector可以很方便的让spark读写mongo中的数据。
示例:
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__=='__main__':
myconf = SparkConf()
myconf.setAppName("test").setMaster("yarn")
myconf.set('spark.executor.instances','4')
myconf.set('spark.executor.memory','4G')
myconf.set('spark.executor.cores','4')
myconf.set('spark.task.cpus','4')
# 指定连接器对应的spark-package
myconf.set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.11:2.2.0")
# 指定mongo地址,需要每个工作节点都能访问到
myconf.set("spark.mongodb.input.uri","mongodb://192.168.1.15:27017/")
# 设置要读取的dbs名和collection名
myconf.set("spark.mongodb.input.database","db_name")
myconf.set("spark.mongodb.input.collection","collection_name")
# 指定分区方式
myconf.set("spark.mongodb.input.partitioner","MongoSplitVectorPartitioner")
spark = SparkSession.builder.config(conf=myconf).getOrCreate()
# 使用指定格式读取
mg_data = spark.read.format("com.mongodb.spark.sql").load()
mg_data.createOrReplaceTempView("tmp_table")
mydf = spark.sql("select _id, trackName from tmp_table limit 4")
print(mydf.rdd.collect())
spark.stop()
有几个问题需要注意,有一些我自己也没搞清楚。
1. spark的py脚本提交到yarn上,有这样几种方式:
- 使用spark-submit提交
- 使用python提交
- 之前还可以使用pyspark提交,但是spark2.3已经不支持了
使用第一种方式提交,原则上相关参数的传入有三种方式:一种是在脚本中设置,就像上面这样;一种是提交的时候传入参数;还有一种是将参数设置写在文件中,通过文件传入。在Spark文档中有详细介绍。
mongo-spark连接器通过‘spark.jars.packages’这个参数设置,如果是提交时传入对应的参数是‘–packages’。spark的这些“工具包”(参考spark-packages.org),感觉上类似python中import导入的工具包。
这里的第一个问题是:如果使用spark-submit提交脚本,package的参数只能在提交时传入;像实例这样在脚本中设置会出一些问题:java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
2. 从spark文档来看,spark有意在弱化rdd,而强调DataFrame。因此spark程序的主要入口也从SparkContext转移到SparkSession。Dataframe这种格式支持sql,可以在map、reduce等计算之前对数据做一些预处理。
from pyspark import SparkConf
from pyspark.sql import SparkSession
if __name__=='__main__':
myconf = SparkConf().setMaster('yarn')
spark = SparkSession.builder.config(conf=myconf).getOrCreate()
# 读取各种格式的数据,并返回dataframe
mydata = spark.read.json('...') # json格式文件
mydata = spark.read.csv('...')
mydata = spark.read.text('...')
mydata = spark.read.format('..').load() # 自定义格式 读取mongo数据就是用的这种方式
这里的第二个问题是:这种方式读mongo中的表,好像是把整个表都读出来,因为读大表的时候明显感觉到比较慢。虽然读出来之后,可以使用sql语句做一些过滤操作。但是能不能读的时候就根据过滤条件只读一部分呢?
第二个问题的答案:可以在读mongo时使用filter或pipline,相关语句会传给mongo执行。使用sql的方式是将所有数据读入集群,然后并行的执行sql语句。两种方式适合不同的场景,可以参考这个链接
df = spark.read.format("com.mongodb.spark.sql") \
.option("uri", "mongodb://127.0.0.1:27017/dbname") \
.option("collection", "collection_name") \
.option("pipeline", "[{'$limit':100},{'$project':{'myfield':1}}]") \
.load()
3.使用Dataframe做sql操作有两种方式。一种是直接使用Dataframe这种数据类型的方法,另一种是使用spark.sql
方法
#使用Dataframe方法
newdata = mydata.filter("col_name > 3").limit(1000)
newdata = newdata.select(col_name1,col_name2).orderBy(...).limit(10)
#使用spark.sql方法
mydata.createOrReplaceTempView('tmp_name')
newdata = spark.sql('select * from tmp_name where ...')
第三个问题:不清楚上面两种方法各有什么优缺点,或者两者等价?