spark读parquet目录遇到的元数据文件不完整的问题

有个在线系统,Spark1.6.3,有一个spark streaming程序定期产生一个parquet目录, 后面一个spark定期批处理检测目录下_SUCCESS文件是否生成结束,然后读入dataframe处理。

大部分情况下没有问题,但是每天总会遇到几个批次后面读取失败的,一般都是报错_metadata和_common_metadata读取的问题。

18/09/14 16:58:00 WARN TaskSetManager: Lost task 7.0 in stage 0.0 (TID 7, localhost): java.io.IOException: Could not read footer: java.lang.RuntimeException: file:/data/parquet/_common_metadata is not a Parquet file (too small)

at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(
ParquetFileReader.java:247)

at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:786)

at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$27.apply(ParquetRelation.scala:775)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at
org.apache.spark.scheduler.Task.run(Task.scala:89)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:624)

at
java.lang.Thread.run(
Thread.java:748)

Caused by: java.lang.RuntimeException: file:/data/parquet/_common_metadata is not a Parquet file (too small)

at org.apache.parquet.hadoop.ParquetFileReader.readFooter(
ParquetFileReader.java:412)

at
org.apache.parquet.hadoop.ParquetFileReader$2.call(
ParquetFileReader.java:237)

at
org.apache.parquet.hadoop.ParquetFileReader$2.call(
ParquetFileReader.java:233)

at
java.util.concurrent.FutureTask.run(
FutureTask.java:266)

… 3 more

怀疑是SUCCESS比两个metadata文件先生成,导致后面开始读数据了但是读到了空的或是未完整的_metadata, _common_metadata文件了。

为了验证猜测,直接在本地spark-shell环境做个实验

import org.apache.spark.sql.{SQLContext, SaveMode}
val path = "file:///data/parquet"
case class A(id:String)
val df = Seq("aaa", "bbbbb", "abcdfd").map(A(_)).toDF()
df.write.mode(SaveMode.Append).parquet(path)

看看输出目录的情况, stat一下文件时间,证实了我的猜想。

➜ parquet ls -lt /data/parquet/*

-rw-r–r– 1 paco paco 211 9月 14 17:06 /data/parquet/_common_metadata

-rw-r–r– 1 paco paco 5844 9月 14 17:06 /data/parquet/_metadata

-rw-r–r– 1 paco paco 0 9月 14 17:06 /data/parquet/_SUCCESS

-rw-r–r– 1 paco paco 211 9月 14 17:06 /data/parquet/part-r-00063-6a9d51c6-b204-4807-9bbb-9c202f1cd131.snappy.parquet

➜ parquet stat _*

文件:_common_metadata

大小:211 块:8 IO 块:4096 普通文件

设备:806h/2054d Inode:8140744 硬链接:1

权限:(0644/-rw-r–r–) Uid:( 1000/ paco) Gid:( 1000/ paco)

最近访问:2018-09-14 17:06:56.836039763 +0800

最近更改:2018-09-14 17:06:56.848039475 +0800

最近改动:2018-09-14 17:06:
56.848039475 +0800

创建时间:-

文件:_metadata

大小:5844 块:16 IO 块:4096 普通文件

设备:806h/2054d Inode:8140738 硬链接:1

权限:(0644/-rw-r–r–) Uid:( 1000/ paco) Gid:( 1000/ paco)

最近访问:2018-09-14 17:06:56.820040147 +0800

最近更改:2018-09-14 17:06:56.836039763 +0800

最近改动:2018-09-14 17:06:
56.836039763 +0800

创建时间:-

文件:_SUCCESS

大小:0 块:0 IO 块:4096 普通空文件

设备:806h/2054d Inode:8140732 硬链接:1

权限:(0644/-rw-r–r–) Uid:( 1000/ paco) Gid:( 1000/ paco)

最近访问:2018-09-14 17:06:56.740042067 +0800

最近更改:2018-09-14 17:06:56.740042067 +0800

最近改动:2018-09-14 17:06:
56.740042067 +0800

创建时间:-

实际上,这俩文件删掉的话也是可以被sqlContext正常读取parquet数据的,

sqlContext.read.parquet(path)

然而如果是内容为空或者不完成,比如删掉后,touch一个空的,上面的读取就失败了,重现了上面的Exception了。

解决办法:

治本应该是找到某些配置,让读取parquet目录的时候忽略掉这俩文件。暂时没找到,有知道的请告诉我。

work around1: 则是,在detect到_SUCCESS文件之后,sleep一个安全的时间段,比如1s之后,再开始处理, 这个略显土鳖。

work around2:在读取的时候,扫描目录,只读取*.parquet 文件名的,经过观察这些文件先于_SUCCESS生成,目前采用了这个方法,解决了问题,代码如下

//过滤得到输出目录下所有*.parquet文件列表,避免处理到空的_*metadata文件造成读取失败 val parquetFiles = hdfs.list(inPath)
         .map(_.getPath.toString)
         .filter(_.endsWith(".parquet"))
val df_user_pos = sqlContext.read.parquet(parquetFiles:_*)

    原文作者:Paco
    原文地址: https://zhuanlan.zhihu.com/p/44560572
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞