在尝试Spark + MongoDB过程中,总是遇到Cursor xxxxx not found错误, 尝试加入keep_alive_ms 和 pipeline 也不能解决问题。
目前总数据量在10000条左右,从Mongodb中加载后交给Spark的NaiveBayes training.
pipeline = {{ $limit: 5000 },{ $skip: 2000 }}
has_train = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
.option("spark.mongodb.input.uri", "mongodb://mongo_and_spark_server:27017/resume_db.has_train") \
.option("spark.mongodb.keep_alive_ms", "3600000") \
.option("pipeline", pipeline) \
.load()
在1.6之前,我们需要手动部署并指明额外加载第三方jar文件路径,在实验2.1的时候,这些Package会自动下载
./spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 \
--py-files ./utility.py \
--files ./dicts/statistic_college.txt \
--files ./dicts/degrees.txt \
--files ./dicts/diming.txt \
--files ./dicts/subjects.txt \
--files ./dicts/training_org.txt \
naive_bayes.py
Output:
# ./submit.sh
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/home/pluto/spark/spark-2.1.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found org.mongodb.spark#mongo-spark-connector_2.11;2.0.0 in central
found org.mongodb#mongo-java-driver;3.2.2 in central
:: resolution report :: resolve 221ms :: artifacts dl 4ms
:: modules in use:
org.mongodb#mongo-java-driver;3.2.2 from central in [default]
org.mongodb.spark#mongo-spark-connector_2.11;2.0.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 0 | 0 | 0 || 2 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/7ms)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
当数据量增大时,总是遇到如下错误,目前还没有排查出错误根源 :(
17/05/29 01:23:16 INFO MongoClientCache: Closing MongoClient: [mongo_and_spark_server:27017]
17/05/29 01:23:16 INFO connection: Closed connection [connectionId{localValue:2, serverValue:42}] to mongo_and_spark_server:27017 because the pool has been closed.
17/05/29 01:23:16 INFO MongoClientCache: Closing MongoClient: [mongo_and_spark_server:27017]
17/05/29 01:23:16 INFO connection: Closed connection [connectionId{localValue:4, serverValue:46}] to mongo_and_spark_server:27017 because the pool has been closed.
17/05/29 01:27:56 WARN TaskSetManager: Lost task 3.0 in stage 2.0 (TID 5, mongo_and_spark_server, executor 2): com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 34611963569 not found on server mongo_and_spark_server:27017' on server mongo_and_spark_server:27017
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:215)
at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/05/29 01:27:56 INFO TaskSetManager: Starting task 3.1 in stage 2.0 (TID 6, mongo_and_spark_server, executor 2, partition 3, ANY, 6787 bytes)
17/05/29 01:29:01 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 4, mongo_and_spark_server, executor 0): com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 34615739977 not found on server mongo_and_spark_server:27017' on server mongo_and_spark_server:27017
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:215)
at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/05/29 01:29:01 INFO TaskSetManager: Starting task 2.1 in stage 2.0 (TID 7, mongo_and_spark_server, executor 2, partition 2, ANY, 6799 bytes)
参考:
https://docs.mongodb.com/spark-connector/v2.0/configuration/#spark-input-conf
https://docs.mongodb.com/manual/core/aggregation-pipeline-optimization/
http://www.mongoing.com/tj/mongodb_shanghai_spark