有个在线系统,Spark1.6.3,有一个spark streaming程序定期产生一个parquet目录, 后面一个spark定期批处理检测目录下_SUCCESS文件是否生成结束,然后读入dataframe处理。
大部分情况下没有问题,但是每天总会遇到几个批次后面读取失败的,一般都是报错_metadata和_common_metadata读取的问题。
18/09/14 16:58:00 WARN TaskSetManager: Lost task 7.0 in stage 0.0 (TID 7, localhost): java.io.IOException: Could not read footer: java.lang.RuntimeException: file:/data/parquet/_common_metadata is not a Parquet file (too small)
at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(
ParquetFileReader.java:247)at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:786)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:775)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at
org.apache.spark.scheduler.Task.run(Task.scala:89)at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1149)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)at
java.lang.Thread.run(
Thread.java:748)Caused by: java.lang.RuntimeException: file:/data/parquet/_common_metadata is not a Parquet file (too small)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(
ParquetFileReader.java:412)at
org.apache.parquet.hadoop.ParquetFileReader$2.call(
ParquetFileReader.java:237)at
org.apache.parquet.hadoop.ParquetFileReader$2.call(
ParquetFileReader.java:233)at
java.util.concurrent.FutureTask.run(
FutureTask.java:266)… 3 more
怀疑是SUCCESS比两个metadata文件先生成,导致后面开始读数据了但是读到了空的或是未完整的_metadata, _common_metadata文件了。
为了验证猜测,直接在本地spark-shell环境做个实验
import org.apache.spark.sql.{SQLContext, SaveMode}
val path = "file:///data/parquet"
case class A(id:String)
val df = Seq("aaa", "bbbbb", "abcdfd").map(A(_)).toDF()
df.write.mode(SaveMode.Append).parquet(path)
看看输出目录的情况, stat一下文件时间,证实了我的猜想。
➜ parquet ls -lt /data/parquet/*
-rw-r–r– 1 paco paco 211 9月 14 17:06 /data/parquet/_common_metadata
-rw-r–r– 1 paco paco 5844 9月 14 17:06 /data/parquet/_metadata
-rw-r–r– 1 paco paco 0 9月 14 17:06 /data/parquet/_SUCCESS
-rw-r–r– 1 paco paco 211 9月 14 17:06 /data/parquet/part-r-00063-6a9d51c6-b204-4807-9bbb-9c202f1cd131.snappy.parquet
…
➜ parquet stat _*
文件:_common_metadata
大小:211 块:8 IO 块:4096 普通文件
设备:806h/2054d Inode:8140744 硬链接:1
权限:(0644/-rw-r–r–) Uid:( 1000/ paco) Gid:( 1000/ paco)
最近访问:2018-09-14 17:06:56.836039763 +0800
最近更改:2018-09-14 17:06:56.848039475 +0800
最近改动:2018-09-14 17:06:
56.848039475 +0800创建时间:-
文件:_metadata
大小:5844 块:16 IO 块:4096 普通文件
设备:806h/2054d Inode:8140738 硬链接:1
权限:(0644/-rw-r–r–) Uid:( 1000/ paco) Gid:( 1000/ paco)
最近访问:2018-09-14 17:06:56.820040147 +0800
最近更改:2018-09-14 17:06:56.836039763 +0800
最近改动:2018-09-14 17:06:
56.836039763 +0800创建时间:-
文件:_SUCCESS
大小:0 块:0 IO 块:4096 普通空文件
设备:806h/2054d Inode:8140732 硬链接:1
权限:(0644/-rw-r–r–) Uid:( 1000/ paco) Gid:( 1000/ paco)
最近访问:2018-09-14 17:06:56.740042067 +0800
最近更改:2018-09-14 17:06:56.740042067 +0800
最近改动:2018-09-14 17:06:
56.740042067 +0800创建时间:-
实际上,这俩文件删掉的话也是可以被sqlContext正常读取parquet数据的,
sqlContext.read.parquet(path)
然而如果是内容为空或者不完成,比如删掉后,touch一个空的,上面的读取就失败了,重现了上面的Exception了。
解决办法:
治本应该是找到某些配置,让读取parquet目录的时候忽略掉这俩文件。暂时没找到,有知道的请告诉我。
work around1: 则是,在detect到_SUCCESS文件之后,sleep一个安全的时间段,比如1s之后,再开始处理, 这个略显土鳖。
work around2:在读取的时候,扫描目录,只读取*.parquet 文件名的,经过观察这些文件先于_SUCCESS生成,目前采用了这个方法,解决了问题,代码如下
//过滤得到输出目录下所有*.parquet文件列表,避免处理到空的_*metadata文件造成读取失败 val parquetFiles = hdfs.list(inPath)
.map(_.getPath.toString)
.filter(_.endsWith(".parquet"))
val df_user_pos = sqlContext.read.parquet(parquetFiles:_*)