我需要在for循环中执行一组不同的hive查询.
hc=HiveContext(sc)
queryList=[set of queries]
for i in range(0,X):
hc.sql(queryList[i])
sparkDF.write.saveAsTable('hiveTable', mode='append')
虽然此代码的作用类似于较小X值的魅力,但当X> 100时会导致问题.每个saveAsTable作业之间的延迟呈指数级增长,但每个作业或多或少需要大约5秒.
The things I tried to rectify this without any luck:
- Add a gc.collect() inside the for loop once (i%100==0). But this breaks the FOR loop
- Close the current Spark and Hive context once (i%100==0) and create a new ones – this still doesn’t solve the problem
- Use yarn-cluster instead of yarn-client – no luck!
是否有类似的选项,我创建一个连接到hive并在每次调用saveAsTable函数时关闭它?或者清理司机?
最佳答案 发生这种情况是因为您正在使用for循环,它在火花驱动程序模式下执行而不是在集群工作程序节点上分布意味着它不使用并行功能或不在工作节点上执行.尝试使用与分区并行化来创建RDD,这将有助于在工作节点上生成作业
或者如果您只想处理hiveContext,您可以创建全局HiveContext,如val hiveCtx = new HiveContext(sc),并在循环内重用.
您还可以在群集上运行作业时更改/优化执行程序的数量,以提高性能