今天同事在做一个模型训练的时候,job出现如下异常:
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@3f9bf2d0 rejected from java.util.concurrent.ThreadPoolExecutor
spark 日志如上所示。还有一些其他日志,
18/06/11 18:40:43 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.190.6.72:39413 is closed
原因分析
以上日志出现的比较明显, 就是线程池满了。还有就是仍有外部的请求,但是连接已经关闭。
看了一下同事的代码,里面有一个groupby的操作,而且指定了 partition num 是2000,executors 是60 。
很明显这样的话,则shuffle的时候会创建很多的连接。
解决
修改了 partitionnum 500,核数100,然后修改如下一个参数 spark.shuffle.io.numConnectionsPerPeer =5
测试结果 ok
参数的解释如下:
spark.shuffle.io.numConnectionsPerPeer 1 (Netty only) Connections between hosts are reused in order to reduce connection buildup for large clusters. For clusters with many hard disks and few hosts, this may result in insufficient concurrency to saturate all disks, and so users may consider increasing this value.