Spark连接MongoDB指南(基于python)

(本文为对官网内容的翻译)

英文原文网址:

https://docs.mongodb.com/spark-connector/current/python-api/

为什么使用MongoDB+Spark:

http://www.mongoing.com/tj/mongodb_shanghai_spark 

以下例子的源码详见introduction.py:

https://github.com/mongodb/mongo-spark/blob/master/examples/src/test/python/introduction.py

 

目录  

1 前提

2 入门指南

  2.1 Python Spark Shell

  2.2 创建一个SparkSession对象

3 教程

  3.1 写入MongoDB

  3.2 读取MongoDB

  3.3 聚合

  3.4 过滤器和SQL

 

 

1 前提

  • MongoDBApache Spark的基本操作知识。详见MongoDB文档 Spark文档

  • 运行MongoDB2.6及以上版本)

  • Spark 2.1.x

  • Scala 2.11.x

 

2 入门指南

2.1 Python Spark Shell

本指南使用pyspark shell,但使用的代码也适用于独立的Python应用程序。

 

当使用pyspark shell时,你可以设置:

  •  –packages 选项用来下载MongoDB Spark Connector包。可用以下程序包:

      mongo-spark-connector_2.11 (与Scala 2.11.x 结合使用)

  •  –conf选项用来配置MongoDB Spark Connector。这些设置用来配置SparkConf对象。

注:当通过SparkConf设置连接器配置时,你必须为这些设置加上合适的前缀。详情以及其它MongoDB Spark Connector选项参见Configuration Options

以下示例为用命令行运行pyspark shell

./bin/pyspark –conf “spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred” \

         –conf “spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection” \

         –packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0

  • spark.mongodb.output.uri用于设置存放输出数据的MongoDB服务器地址(127.0.0.1)、连接的数据库( test)和集合(myCollection),默认连接27017端口。

  • spark.mongodb.input.uri用于设置需要读取的数据所在的MongoDB服务器地址(127.0.0.1)、连接的数据库( test)和集合(myCollection),以及读取的优先级。

本指南中将使用以上数据库和集合。

2.2 创建一个SparkSession对象

注:当运行pyspark时,会默认得到一个名为spark的SparkSession对象。在一个独立的Python应用程序中,你需要明确地创建你的SparkSession对象,如下所示。

如果你在运行pyspark时设置spark.mongodb.input.uri spark.mongodb.output.uriSparkSession对象默认使用它们。如果你想要在pyspark 里创建自己的SparkSession对象,你可以使用SparkSession.builder并且设置不同的配置项。

from pyspark.sql import SparkSession

my_spark = SparkSession \

    .builder \

    .appName(“myApp”) \

    .config(“spark.mongodb.input.uri”,”mongodb://127.0.0.1/test.coll”) \

    .config(“spark.mongodb.output.uri”,”mongodb://127.0.0.1/test.coll”) \

    .getOrCreate()

你可以使用一个SparkSession对象来向MongoDB写入数据、读取 MongoDB中的数据、创建DataFrames(数据框架)、执行 SQL操作。

 

3 教程

3.1 写入MongoDB

在创建一个DataFrame(数据框架)之前,首先要创建一个SparkSession对象,然后使用这个对象的 createDataFrame()函数。在以下示例中,createDataFrame()函数中有多个包含了名字和年龄的元组,以及由列名组成的数组。

people = spark.createDataFrame([(“Bilbo Baggins”,  50), (“Gandalf”, 1000), (“Thorin”, 195), (“Balin”, 178), (“Kili”, 77), (“Dwalin”, 169), (“Oin”, 167), (“Gloin”, 158), (“Fili”, 82), (“Bombur”, None)], [“name”, “age”])

 

想要将这个名为people”的DataFrame 写入在spark.mongodb.output.uri选项中设置的MongoDB数据库和集合,可以用write 函数:

people.write.format(“com.mongodb.spark.sql.DefaultSource”).mode(“append”).save()

以上操作将在你连接到pyspark shell时,将数据写入spark.mongodb.output.uri选项中设置的 MongoDB数据库和集合中。

 

想要查看DataFrame中的内容,可以使用show()

people.show()

 

pyspark shell中,这一步操作打印出以下内容:

+————-+—-+

|         name| age|

+————-+—-+

|Bilbo Baggins|  50|

|      Gandalf|1000|

|       Thorin| 195|

|        Balin| 178|

|         Kili|  77|

|       Dwalin| 169|

|          Oin| 167|

|        Gloin| 158|

|         Fili|  82|

|       Bombur|null|

+————-+—-+

 

printSchema()函数可以打印出DataFrame的架构:

people.printSchema()

 

pyspark shell中,这一步操作打印出以下内容:

root

 |– _id: struct (nullable = true)

 |    |– oid: string (nullable = true)

 |– age: long (nullable = true)

 |– name: string (nullable = true)

 

如果需要将数据写入MongoDB的另一个集合中,可以结合使用.option() .write()函数。

 

想要将数据写入一个名为people”的数据库中的名为“contacts ”的集合里,要在输出URI选项中设置people.contacts

people.write.format(“com.mongodb.spark.sql.DefaultSource”).mode(“append”).option(“database”,”people”).option(“collection”, “contacts”).save()

 

3.2 读取MongoDB

你可以创建一个Spark DataFrame来保存从MongoDB集合(为在 SparkSession中用spark.mongodb.input.uri选项中设置的集合)中读取的数据。

 

假设有一个包含了以下document(文档)的名为“ fruit”的集合:

{ “_id” : 1, “type” : “apple”, “qty” : 5 }

{ “_id” : 2, “type” : “orange”, “qty” : 10 }

{ “_id” : 3, “type” : “banana”, “qty” : 15 }

 

pyspark shell中,用spark.read()将集合赋值给 DataFrame

df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).load()

 

可以用spark抽取记录来推断出集合的架构:

df.printSchema()

以上操作会产生如下的shell输出:

root

 |– _id: double (nullable = true)

 |– qty: double (nullable = true)

 |– type: string (nullable = true)

 

如果你想从另一个MongoDB集合中读取数据,可以在读取数据到DataFrame时使用 .option函数。

想要读取一个名为people”的数据库中名为“contacts ”的集合里的数据,要在输入URI选项中设置people.contacts

df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).option(“uri”,

“mongodb://127.0.0.1/people.contacts”).load()

 

3.3 聚合

使用MongoDB聚合管道 可以实现将MongoDB中的数据读取到Spark中时应用过滤规则、执行聚合操作。

 

假设有一个包含了以下document(文档)的名为“ fruit”的集合:

{ “_id” : 1, “type” : “apple”, “qty” : 5 }

{ “_id” : 2, “type” : “orange”, “qty” : 10 }

{ “_id” : 3, “type” : “banana”, “qty” : 15 }

 

pyspark shell中,向spark.read()里增加 option()函数,就可以在创建DataFrame时设置一个聚合管道来使用。

pipeline = “{‘$match’: {‘type’: ‘apple’}}”

df=spark.read.format(“com.mongodb.spark.sql.DefaultSource”).option(“pipeline”, pipeline).load()

df.show()

 

pyspark shell中,这一步操作打印出以下内容:

+—+—+—–+

|_id|qty| type|

+—+—+—–+

|1.0|5.0|apple|

+—+—+—–+

 

3.4 过滤器和SQL

过滤器(Filters

 

注:当将过滤器与DataFrame或Python API结合使用时,Mongo连接器的底层代码会创建一个聚合管道,用来在数据送往Spark之前就在MongoDB中过滤数据。

filter()来读取MongoDB集合中的一个子集。

 

假设有一个包含了以下document(文档)的名为“ fruit”的集合:

{ “_id” : 1, “type” : “apple”, “qty” : 5 }

{ “_id” : 2, “type” : “orange”, “qty” : 10 }

{ “_id” : 3, “type” : “banana”, “qty” : 15 }

 

首先,创建一个dataframe来连接默认的 MongoDB数据源:

df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).load()

 

以下示例只包含qty字段大于等于10的记录

df.filter(df[‘qty’] >= 10).show()

 

这一步操作打印出以下结果:

+—+—-+——+

|_id| qty|  type|

+—+—-+——+

|2.0|10.0|orange|

|3.0|15.0|banana|

+—+—-+——+

 

SQL

 

在运行SQL查询DataFrame之前, 你需要注册一个临时表。

以下示例注册了一个名为temp”的临时表,然后用SQL 来查询type字段含有字母e的记录:

df.createOrReplaceTempView(“temp”)

some_fruit = spark.sql(“SELECT type, qty FROM temp WHERE type LIKE ‘%e%'”)

some_fruit.show()

 

pyspark shell中,这步操作打印出如下结果:

+——+—-+

|  type| qty|

+——+—-+

| apple| 5.0|

|orange|10.0|

+——+—-+

“阅读原文”查看原汁原味的英文文档以及相关链接~ 

    原文作者:SQL
    原文地址: https://juejin.im/entry/59b160d95188253902147e6a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞