spark读取mongo数据(python)

使用mongo官方提供的spark connector可以很方便的让spark读写mongo中的数据。

示例:

from pyspark.sql import SparkSession
from pyspark import SparkConf


if __name__=='__main__':
    myconf = SparkConf()
    myconf.setAppName("test").setMaster("yarn")
    myconf.set('spark.executor.instances','4')
    myconf.set('spark.executor.memory','4G')
    myconf.set('spark.executor.cores','4')
    myconf.set('spark.task.cpus','4')    

    # 指定连接器对应的spark-package
    myconf.set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.11:2.2.0")
    # 指定mongo地址,需要每个工作节点都能访问到
    myconf.set("spark.mongodb.input.uri","mongodb://192.168.1.15:27017/")
    # 设置要读取的dbs名和collection名
    myconf.set("spark.mongodb.input.database","db_name")
    myconf.set("spark.mongodb.input.collection","collection_name")
    # 指定分区方式
    myconf.set("spark.mongodb.input.partitioner","MongoSplitVectorPartitioner")

    spark = SparkSession.builder.config(conf=myconf).getOrCreate()
    # 使用指定格式读取
    mg_data = spark.read.format("com.mongodb.spark.sql").load()
    mg_data.createOrReplaceTempView("tmp_table")
    mydf = spark.sql("select _id, trackName from tmp_table limit 4")

    print(mydf.rdd.collect())
    spark.stop()

有几个问题需要注意,有一些我自己也没搞清楚。

1. spark的py脚本提交到yarn上,有这样几种方式:

    • 使用spark-submit提交
    • 使用python提交
    • 之前还可以使用pyspark提交,但是spark2.3已经不支持了

使用第一种方式提交,原则上相关参数的传入有三种方式:一种是在脚本中设置,就像上面这样;一种是提交的时候传入参数;还有一种是将参数设置写在文件中,通过文件传入。在Spark文档中有详细介绍。

mongo-spark连接器通过‘spark.jars.packages’这个参数设置,如果是提交时传入对应的参数是‘–packages’。spark的这些“工具包”(参考spark-packages.org),感觉上类似python中import导入的工具包。

这里的第一个问题是:如果使用spark-submit提交脚本,package的参数只能在提交时传入;像实例这样在脚本中设置会出一些问题:java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html

2. 从spark文档来看,spark有意在弱化rdd,而强调DataFrame。因此spark程序的主要入口也从SparkContext转移到SparkSession。Dataframe这种格式支持sql,可以在map、reduce等计算之前对数据做一些预处理。

from pyspark import SparkConf
from pyspark.sql import SparkSession

if __name__=='__main__':
   myconf = SparkConf().setMaster('yarn')
   spark = SparkSession.builder.config(conf=myconf).getOrCreate()
   # 读取各种格式的数据,并返回dataframe
   mydata = spark.read.json('...')  # json格式文件
   mydata = spark.read.csv('...')
   mydata = spark.read.text('...')
   mydata = spark.read.format('..').load() # 自定义格式 读取mongo数据就是用的这种方式

这里的第二个问题是:这种方式读mongo中的表,好像是把整个表都读出来,因为读大表的时候明显感觉到比较慢。虽然读出来之后,可以使用sql语句做一些过滤操作。但是能不能读的时候就根据过滤条件只读一部分呢?

第二个问题的答案:可以在读mongo时使用filter或pipline,相关语句会传给mongo执行。使用sql的方式是将所有数据读入集群,然后并行的执行sql语句。两种方式适合不同的场景,可以参考这个链接

    df = spark.read.format("com.mongodb.spark.sql") \
        .option("uri", "mongodb://127.0.0.1:27017/dbname") \
        .option("collection", "collection_name") \
        .option("pipeline", "[{'$limit':100},{'$project':{'myfield':1}}]") \
        .load()

3.使用Dataframe做sql操作有两种方式。一种是直接使用Dataframe这种数据类型的方法,另一种是使用spark.sql方法

#使用Dataframe方法
newdata = mydata.filter("col_name > 3").limit(1000)
newdata = newdata.select(col_name1,col_name2).orderBy(...).limit(10)

#使用spark.sql方法
mydata.createOrReplaceTempView('tmp_name')
newdata = spark.sql('select * from tmp_name where ...')

第三个问题:不清楚上面两种方法各有什么优缺点,或者两者等价?

    原文作者:拿破仑
    原文地址: https://zhuanlan.zhihu.com/p/35651182
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞