碰壁Spark + Mongodb

在尝试Spark + MongoDB过程中,总是遇到Cursor xxxxx not found错误, 尝试加入keep_alive_ms 和 pipeline 也不能解决问题。

目前总数据量在10000条左右,从Mongodb中加载后交给Spark的NaiveBayes training.

    pipeline = {{ $limit: 5000 },{ $skip: 2000 }}
    has_train = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("spark.mongodb.input.uri", "mongodb://mongo_and_spark_server:27017/resume_db.has_train") \
        .option("spark.mongodb.keep_alive_ms", "3600000") \
        .option("pipeline", pipeline) \
        .load()

在1.6之前,我们需要手动部署并指明额外加载第三方jar文件路径,在实验2.1的时候,这些Package会自动下载

./spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
    --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 \
    --py-files ./utility.py \
    --files ./dicts/statistic_college.txt \
    --files ./dicts/degrees.txt \
    --files ./dicts/diming.txt \
    --files ./dicts/subjects.txt \
    --files ./dicts/training_org.txt \
    naive_bayes.py

Output:

# ./submit.sh 
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/home/pluto/spark/spark-2.1.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found org.mongodb.spark#mongo-spark-connector_2.11;2.0.0 in central
    found org.mongodb#mongo-java-driver;3.2.2 in central
:: resolution report :: resolve 221ms :: artifacts dl 4ms
    :: modules in use:
    org.mongodb#mongo-java-driver;3.2.2 from central in [default]
    org.mongodb.spark#mongo-spark-connector_2.11;2.0.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 2 already retrieved (0kB/7ms)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

当数据量增大时,总是遇到如下错误,目前还没有排查出错误根源 :(

17/05/29 01:23:16 INFO MongoClientCache: Closing MongoClient: [mongo_and_spark_server:27017]
17/05/29 01:23:16 INFO connection: Closed connection [connectionId{localValue:2, serverValue:42}] to mongo_and_spark_server:27017 because the pool has been closed.
17/05/29 01:23:16 INFO MongoClientCache: Closing MongoClient: [mongo_and_spark_server:27017]
17/05/29 01:23:16 INFO connection: Closed connection [connectionId{localValue:4, serverValue:46}] to mongo_and_spark_server:27017 because the pool has been closed.
17/05/29 01:27:56 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 5, mongo_and_spark_server, executor 2): com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 34611963569 not found on server mongo_and_spark_server:27017' on server mongo_and_spark_server:27017
    at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
    at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:215)
    at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
    at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

17/05/29 01:27:56 INFO TaskSetManager: Starting task 3.1 in stage 2.0 (TID 6, mongo_and_spark_server, executor 2, partition 3, ANY, 6787 bytes)
17/05/29 01:29:01 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 4, mongo_and_spark_server, executor 0): com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 34615739977 not found on server mongo_and_spark_server:27017' on server mongo_and_spark_server:27017
    at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
    at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:215)
    at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
    at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

17/05/29 01:29:01 INFO TaskSetManager: Starting task 2.1 in stage 2.0 (TID 7, mongo_and_spark_server, executor 2, partition 2, ANY, 6799 bytes)

参考:

https://docs.mongodb.com/spark-connector/v2.0/configuration/#spark-input-conf
https://docs.mongodb.com/manual/core/aggregation-pipeline-optimization/
http://www.mongoing.com/tj/mongodb_shanghai_spark

    原文作者:或然子
    原文地址: https://www.jianshu.com/p/02c6b28a893f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞