我一直在试图弄清楚当我循环遍历镶木地板文件和几个后处理功能时,由于内存问题导致Spark不会崩溃.对于大量的文本感到抱歉,但这不是一个特定的错误(我正在使用PySpark.)如果这打破了正确的Stack Overflow表单,请道歉!
基本伪代码是:
#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its "=" subdirectory
for counter in fileNums:
sparkDataFrame = sqlContext.read.parquet(counter)
summaryReportOne = sqlContext.sql.("SELECT.....")
summaryReportOne.write.partition("id").parquet("/")
summaryReportTwo = sqlContext.sql.("SELECT....")
summaryReportTwo.write.partition("id").parquet("/")
#several more queries, several involving joins, etc....
这段代码使用了spark SQL查询,所以我一直没有成功创建一个包含所有SQL查询/函数的包装器函数并将其传递给foreach(它不能将sparkContext或sqlQuery作为输入)而不是标准环.
从技术上讲,这是一个有分区的大型镶木地板文件,但是一次性读取并查询它是很大的;我需要在每个分区上运行这些功能.所以我只是在PySpark中运行一个常规的python循环,在每个循环中,我处理一个镶木地板分区(子目录)并编写相关的输出报告.
是否由于整个镶木地板文件的大小而不能确定是否将大型mapPartition()周围的所有代码包装起来?
但经过几次循环后,脚本因内存错误而崩溃 – 特别是Java堆错误. (我已经确认循环崩溃的文件没什么特别之处;它发生在第二个或第三个循环中读入的任何随机文件.)
Caused by: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space
我意识到Spark并不意味着在循环中运行,但是这些SQL查询对于标准的Spark SQL打包函数来说有点过于复杂,我们在不同的聚合统计信息上为每个文件写出多个摘要报告.
有没有办法在每个循环索引结束时基本清除内存?使用sqlContext.dropTempTable()删除任何已注册的临时表并使用sqlContext.clearCache()清除缓存没有帮助.如果我尝试停止sparkContext并在每个循环中重新启动它,我也会得到错误,因为有些进程还没有“包裹起来”(看起来你曾经能够“优雅地”停止上下文但是我在当前的PySpark文档中找不到这个.)
我还应该注意到,在我完成它之后,我没有在循环中的数据帧上调用unpersist(),但我也没有在它们上调用persist();我只是重写每个循环中的数据帧(这可能是问题的一部分).
我正在与我们的工程团队一起调整内存设置,但我们知道我们已经分配了足够的内存来完成这个脚本的一个循环(并且一个循环运行没有任何错误).
任何建议都会有所帮助 – 包括可能比Spark更好用于此用例的工具.我使用Spark版本1.6.1.
最佳答案 如果可以,请尝试升级到新发布的spark 2.0.
我遇到了与java堆空间非常相似的问题.通过简单地重复创建数据帧的过程并使用spark 1.6.2一次又一次地调用,我能够超过4G的堆空间.
使用SparkSession的spark 2.0,同一个程序只有1.2 GB的堆空间,并且内存使用率非常一致,正如我对我运行的程序所期望的那样.