(本文为对官网内容的翻译)
英文原文网址:
https://docs.mongodb.com/spark-connector/current/python-api/
为什么使用MongoDB+Spark:
http://www.mongoing.com/tj/mongodb_shanghai_spark
以下例子的源码详见introduction.py:
https://github.com/mongodb/mongo-spark/blob/master/examples/src/test/python/introduction.py
目录 1 前提 2 入门指南 2.1 Python Spark Shell 2.2 创建一个SparkSession对象 3 教程 3.1 写入MongoDB 3.2 读取MongoDB 3.3 聚合 3.4 过滤器和SQL
|
1 前提
MongoDB和Apache Spark的基本操作知识。详见MongoDB文档 和Spark文档
运行MongoDB(2.6及以上版本)
Spark 2.1.x
Scala 2.11.x
2 入门指南
2.1 Python Spark Shell
本指南使用pyspark shell,但使用的代码也适用于独立的Python应用程序。
当使用pyspark shell时,你可以设置:
–packages 选项用来下载MongoDB Spark Connector包。可用以下程序包:
mongo-spark-connector_2.11 (与Scala 2.11.x 结合使用)
–conf选项用来配置MongoDB Spark Connector。这些设置用来配置SparkConf对象。
注:当通过SparkConf设置连接器配置时,你必须为这些设置加上合适的前缀。详情以及其它MongoDB Spark Connector选项参见Configuration Options。 |
以下示例为用命令行运行pyspark shell:
./bin/pyspark –conf “spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred” \ –conf “spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection” \ –packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0 |
spark.mongodb.output.uri用于设置存放输出数据的MongoDB服务器地址(127.0.0.1)、连接的数据库( test)和集合(myCollection),默认连接27017端口。
spark.mongodb.input.uri用于设置需要读取的数据所在的MongoDB服务器地址(127.0.0.1)、连接的数据库( test)和集合(myCollection),以及读取的优先级。
本指南中将使用以上数据库和集合。
2.2 创建一个SparkSession对象
注:当运行pyspark时,会默认得到一个名为spark的SparkSession对象。在一个独立的Python应用程序中,你需要明确地创建你的SparkSession对象,如下所示。 |
如果你在运行pyspark时设置spark.mongodb.input.uri和 spark.mongodb.output.uri,SparkSession对象默认使用它们。如果你想要在pyspark 里创建自己的SparkSession对象,你可以使用SparkSession.builder并且设置不同的配置项。
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName(“myApp”) \ .config(“spark.mongodb.input.uri”,”mongodb://127.0.0.1/test.coll”) \ .config(“spark.mongodb.output.uri”,”mongodb://127.0.0.1/test.coll”) \ .getOrCreate() |
你可以使用一个SparkSession对象来向MongoDB写入数据、读取 MongoDB中的数据、创建DataFrames(数据框架)、执行 SQL操作。
3 教程
3.1 写入MongoDB
在创建一个DataFrame(数据框架)之前,首先要创建一个SparkSession对象,然后使用这个对象的 createDataFrame()函数。在以下示例中,createDataFrame()函数中有多个包含了名字和年龄的元组,以及由列名组成的数组。
people = spark.createDataFrame([(“Bilbo Baggins”, 50), (“Gandalf”, 1000), (“Thorin”, 195), (“Balin”, 178), (“Kili”, 77), (“Dwalin”, 169), (“Oin”, 167), (“Gloin”, 158), (“Fili”, 82), (“Bombur”, None)], [“name”, “age”]) |
想要将这个名为“people”的DataFrame 写入在spark.mongodb.output.uri选项中设置的MongoDB数据库和集合,可以用write 函数:
people.write.format(“com.mongodb.spark.sql.DefaultSource”).mode(“append”).save() |
以上操作将在你连接到pyspark shell时,将数据写入spark.mongodb.output.uri选项中设置的 MongoDB数据库和集合中。
想要查看DataFrame中的内容,可以使用show()
people.show() |
在pyspark shell中,这一步操作打印出以下内容:
+————-+—-+ | name| age| +————-+—-+ |Bilbo Baggins| 50| | Gandalf|1000| | Thorin| 195| | Balin| 178| | Kili| 77| | Dwalin| 169| | Oin| 167| | Gloin| 158| | Fili| 82| | Bombur|null| +————-+—-+ |
用printSchema()函数可以打印出DataFrame的架构:
people.printSchema() |
在pyspark shell中,这一步操作打印出以下内容:
root |– _id: struct (nullable = true) | |– oid: string (nullable = true) |– age: long (nullable = true) |– name: string (nullable = true) |
如果需要将数据写入MongoDB的另一个集合中,可以结合使用.option()和 .write()函数。
想要将数据写入一个名为“people”的数据库中的名为“contacts ”的集合里,要在输出URI选项中设置people.contacts。
people.write.format(“com.mongodb.spark.sql.DefaultSource”).mode(“append”).option(“database”,”people”).option(“collection”, “contacts”).save() |
3.2 读取MongoDB
你可以创建一个Spark DataFrame来保存从MongoDB集合(为在 SparkSession中用spark.mongodb.input.uri选项中设置的集合)中读取的数据。
假设有一个包含了以下document(文档)的名为“ fruit”的集合:
{ “_id” : 1, “type” : “apple”, “qty” : 5 } { “_id” : 2, “type” : “orange”, “qty” : 10 } { “_id” : 3, “type” : “banana”, “qty” : 15 } |
在pyspark shell中,用spark.read()将集合赋值给 DataFrame
df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).load() |
可以用spark抽取记录来推断出集合的架构:
df.printSchema() |
以上操作会产生如下的shell输出:
root |– _id: double (nullable = true) |– qty: double (nullable = true) |– type: string (nullable = true) |
如果你想从另一个MongoDB集合中读取数据,可以在读取数据到DataFrame时使用 .option函数。
想要读取一个名为“people”的数据库中名为“contacts ”的集合里的数据,要在输入URI选项中设置people.contacts。
df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).option(“uri”, “mongodb://127.0.0.1/people.contacts”).load() |
3.3 聚合
使用MongoDB的聚合管道 可以实现将MongoDB中的数据读取到Spark中时应用过滤规则、执行聚合操作。
假设有一个包含了以下document(文档)的名为“ fruit”的集合:
{ “_id” : 1, “type” : “apple”, “qty” : 5 } { “_id” : 2, “type” : “orange”, “qty” : 10 } { “_id” : 3, “type” : “banana”, “qty” : 15 } |
在pyspark shell中,向spark.read()里增加 option()函数,就可以在创建DataFrame时设置一个聚合管道来使用。
pipeline = “{‘$match’: {‘type’: ‘apple’}}” df=spark.read.format(“com.mongodb.spark.sql.DefaultSource”).option(“pipeline”, pipeline).load() df.show() |
在pyspark shell中,这一步操作打印出以下内容:
+—+—+—–+ |_id|qty| type| +—+—+—–+ |1.0|5.0|apple| +—+—+—–+ |
3.4 过滤器和SQL
过滤器(Filters)
注:当将过滤器与DataFrame或Python API结合使用时,Mongo连接器的底层代码会创建一个聚合管道,用来在数据送往Spark之前就在MongoDB中过滤数据。 |
用filter()来读取MongoDB集合中的一个子集。
假设有一个包含了以下document(文档)的名为“ fruit”的集合:
{ “_id” : 1, “type” : “apple”, “qty” : 5 } { “_id” : 2, “type” : “orange”, “qty” : 10 } { “_id” : 3, “type” : “banana”, “qty” : 15 } |
首先,创建一个dataframe来连接默认的 MongoDB数据源:
df = spark.read.format(“com.mongodb.spark.sql.DefaultSource”).load() |
以下示例只包含qty字段大于等于10的记录
df.filter(df[‘qty’] >= 10).show() |
这一步操作打印出以下结果:
+—+—-+——+ |_id| qty| type| +—+—-+——+ |2.0|10.0|orange| |3.0|15.0|banana| +—+—-+——+ |
SQL
在运行SQL查询DataFrame之前, 你需要注册一个临时表。
以下示例注册了一个名为“temp”的临时表,然后用SQL 来查询type字段含有字母e的记录:
df.createOrReplaceTempView(“temp”) some_fruit = spark.sql(“SELECT type, qty FROM temp WHERE type LIKE ‘%e%'”) some_fruit.show() |
在pyspark shell中,这步操作打印出如下结果:
+——+—-+ | type| qty| +——+—-+ | apple| 5.0| |orange|10.0| +——+—-+ |
“阅读原文”查看原汁原味的英文文档以及相关链接~