apache-spark – 在多个核心上运行时,Spark作业中的hadoop bz2库失败

我目前在使用Spark和读取bz2文件时遇到问题.我正在使用Spark 1.2.0(preoilt for hadoop 2.4,但文件目前只在本地读取).对于测试,有大约1500个文件,每个文件大小约为50KB.

以下脚本count_loglines.py说明了问题:

 from pyspark import SparkConf, SparkContext
 spark_conf = SparkConf().setAppName("SparkTest")
 sc = SparkContext(conf=spark_conf)

 overall_log_lines = sc.textFile('/files/bzipped/*.log.bz2')
 line_count = overall_log_lines.count()
 print line_count

在一个核心上本地运行脚本,它按预期工作.

spark/bin/spark-submit --master local[1] count_log_lines.py

使用2在2核上运行脚本

spark/bin/spark-submit --master local[2] count_log_lines.py

结束于hadoop bzip2库的错误消息,例如

 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 60 in stage 0.0 failed 1 times, most recent failure: Lost task 60.0 in stage 0.0 (TID 60, localhost): java.io.IOException: unexpected end of stream
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.bsGetBit(CBZip2InputStream.java:626)

当我事先解压缩文件,并读取未压缩的日志文件而不是bzip压缩文件,即sc.textFile(‘/ files / unzipped / * .log’)时,脚本按预期工作,也在多个核心上工作.

我的问题:这里有什么问题?如果在多个核心上运行,为什么Spark作业不能正确读取bz2文件?

谢谢你的帮助!

最佳答案 我不确定文本文件是否支持bz2文件.

您可以查看pyspark newAPIHadoopFile或hadoopfile API.如果拆分的bz2文件包含Text(例如log),您可以使用:

stdout = sc.newAPIHadoopFile(path="/HDFSpath/to/folder/containing/bz2/", inputFormatClass="org.apache.hadoop.mapreduce.lib.input.TextInputFormat", keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text", keyConverter=None, valueConverter=None, conf=None, batchSize=5)

资料来源:http://spark.apache.org/docs/1.2.0/api/python/pyspark.html

hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

使用HDFS中的任意键和值类,本地文件系统(在所有节点上都可用)或任何支持Hadoop的文件系统URI读取“旧”Hadoop InputFormat.该机制与sc.sequenceFile相同.

Hadoop配置可以作为Python dict传入.这将转换为Java中的配置.

参数:
path – Hadoop文件的路径
inputFormatClass – Hadoop InputFormat的完全限定类名(例如“org.apache.hadoop.mapred.TextInputFormat”)
keyClass – 密钥可写类的完全限定类名(例如“org.apache.hadoop.io.Text”)
valueClass – 值的完全限定类名可写类(例如“org.apache.hadoop.io.LongWritable”)
keyConverter – (默认为None)
valueConverter – (默认为None)
conf – Hadoop配置,作为dict传入(默认为None)
batchSize – 表示为单个Java对象的Python对象的数量. (默认为0,自动选择batchSize)

要么

newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

使用HDFS中的任意键和值类,本地文件系统(在所有节点上都可用)或任何支持Hadoop的文件系统URI读取“新API”Hadoop InputFormat.该机制与sc.sequenceFile相同.

Hadoop配置可以作为Python dict传入.这将转换为Java中的配置

参数:
path – Hadoop文件的路径
inputFormatClass – Hadoop InputFormat的完全限定类名(例如“org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
keyClass – 密钥可写类的完全限定类名(例如“org.apache.hadoop.io.Text”)
valueClass – 值的完全限定类名可写类(例如“org.apache.hadoop.io.LongWritable”)
keyConverter – (默认为None)
valueConverter – (默认为None)
conf – Hadoop配置,作为dict传入(默认为None)
batchSize – 表示为单个Java对象的Python对象的数量. (默认为0,自动选择batchSize)

RGS,

ķ

点赞