java – 我可以在Spark中动态控制map函数的并发性吗?

我正在应用一个map函数,它对我的​​数据执行一些ETL.这个函数通常非常快,并且由于数据分布良好,因此创建了足够多的任务,因此我获得了良好且统一的利用率.

问题是,在某些数据组合上,map函数将成为I / O绑定.发生的情况通常是触发数据将出现在单个块上(它们按顺序到达),因此由单个节点/任务拾取.然后发生的是100GB的处理需要5-6秒,并且单个块处理(MapR中的256mb)需要20分钟,因为它由单个线程执行.

有没有办法增加这个块的并行化?
在这种情况下,通常会做什么?

到目前为止我已经确定的选项(我将其描述为解决方法)是:

> spark.default.parallelism:这将影响全局执行并导致次优总体时间.尽管文档声明这是shuffle操作的并行性,但我观察到它也会影响地图并行性.你能详细说明内部会发生什么吗?这会覆盖块的处理方式吗?
> spark.task.cpus:这太粗糙了,它会再次影响全局执行特性.
>在map函数中使用fork / join,并在检测到I / O绑定延迟时委托给ExecutorService:这会使事情变得复杂,并从框架中获取资源控制,这将在难以解决的恶劣情况下实现.
> sc.textFile(“theFile.txt”,100):这将影响我的主RDD(100GB)以及整个集合的后续转换/动作.更好一点,但仍然不理想(从pzecevic的答案更新)

最佳答案 您可以在要应用地图转换的RDD上设置并行度.

rdd.repartition(100)

我不知道你是如何创建RDD的,但有时你可以在RDD创建时指定并行性:

sc.textFile("theFile.txt", 100)

这将直接影响映射任务的数量(在这种情况下为100).

点赞