MongoDB Spark Connector – 聚合很慢

我正在使用Spark应用程序和Mongos控制台运行相同的聚合管道.在控制台上,数据是在眨眼之间获取的,只需要第二次使用“it”来检索所有预期的数据.

然而,根据Spark WebUI,Spark应用程序大约需要两分钟.

《MongoDB Spark Connector – 聚合很慢》

如您所见,正在启动242个任务以获取结果.我不确定为什么启动如此大量的任务,而MongoDB聚合只返回了40个文档.看起来有很高的开销.

我在Mongos控制台上运行的查询:

db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])

Spark应用程序代码

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} , count: { $sum: 1 } } }")));

    JavaRDD<String> outputRdd = aggregatedRdd.map(new Function<Document, String>() {
        @Override
        public String call(Document arg0) throws Exception {
            String output = String.format("%s;%s;%s;%s", arg0.get("_id").toString(),
                    arg0.get("firstTimestamp").toString(), arg0.get("lastTimestamp").toString(),
                    arg0.get("count").toString());
            return output;
        }
    });

    outputRdd.saveAsTextFile("/user/spark/output");

之后,我使用hdfs dfs -getmerge / user / spark / output / output.csv并比较结果.

为什么聚合这么慢?不是调用withPipeline意味着减少需要转移到Spark的数据量吗?看起来它没有像Mongos控制台那样进行相同的聚合.在Mongos控制台上,它非常快速.我使用Spark 1.6.1和mongo-spark-connector_2.10版本1.1.0.

编辑:我想知道的另一件事是两个执行器被启动(因为我使用默认执行设置atm),但只有一个执行器完成所有工作.为什么第二执行者不做任何工作?

《MongoDB Spark Connector – 聚合很慢》

编辑2:当使用不同的聚合管道并调用.count()而不是saveAsTextFile(..)时,还会创建242个任务.这次将返回65.000份文件.
《MongoDB Spark Connector – 聚合很慢》

最佳答案 大量任务是由默认的Mongo Spark分区程序策略引起的.它在计算分区时忽略聚合管道,主要有两个原因:

>它降低了计算分区的成本
>确保分片和非分片分区器的行为相同

但是,正如您发现它们可以生成空分区,在您的情况下,这些分区成本很高.

修复的选择可以是:

>更改分区策略

用于选择替代分区程序以减少分区数.例如,PaginateByCount将数据库拆分为一定数量的分区.

创建自己的分区程序 – 只需实现特征,您就可以应用聚合管道并对结果进行分区.有关示例,请参见HalfwayPartitionercustom partitioner test.
>使用$out将结果汇总到集合中并从那里读取.
>使用coalesce(N)将分区合并在一起并减少分区数.
>增加spark.mongodb.input.partitionerOptions.partitionSizeMB配置以生成更少的分区.

自定义分区程序应该提供最佳解决方案,但有一些方法可以更好地利用可用的默认分区程序.

如果您认为应该有一个使用聚合管道来计算分区的默认分区程序,那么请向MongoDB Spark Jira project添加一个票证.

点赞