我在这个例子中从createDataFrame调用中获取了StackOverflowError.它起源于涉及
java类型推理的scala代码,它在无限循环中调用自身.
final EventParser parser = new EventParser();
JavaRDD<Event> eventRDD = sc.textFile(path)
.map(new Function<String, Event>()
{
public Event call(String line) throws Exception
{
Event event = parser.parse(line);
log.info("event: "+event.toString());
return event;
}
});
log.info("eventRDD:" + eventRDD.toDebugString());
DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();
堆栈跟踪的底部如下所示:
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
这看起来类似于http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html中报告的错误,但我使用的是Spark 1.4.1,这比修复此错误的时间晚.
Event类由avro从此avsc生成.它确实包含双字段和长字段,据报道它会导致问题,但用字符串替换double不会改变症状.
{
"namespace": "mynamespace",
"type": "record",
"name": "Event",
"fields": [
{ "name": "ts", "type": "double", "doc": "Timestamp"},
{ "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
{ "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address (AKA ORIG)"},
{ "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port (or ICMP code)"},
{ "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address (AKA RESP)"},
{ "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or ICMP code)"},
{ "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"},
{ "name": "service", "type": "string", "doc": "Dynamically detected application protocol, if any"},
{ "name": "duration", "type": "double", "doc": "Time of last packet seen – time of first packet seen"},
{ "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence numbers if TCP"},
{ "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence numbers if TCP"},
{ "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state table)"},
{ "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if remotely F."},
{ "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
{ "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content gaps"},
{ "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history table)"},
{ "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"},
{ "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
{ "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"},
{ "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
{ "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection UID of encapsulating parent (s)"},
{ "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"},
{ "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"}
]
}
有人可以建议吗?谢谢!
最佳答案 在spark-avro项目中正在开展工作来解决这个问题,请参阅:
https://github.com/databricks/spark-avro/pull/217和
https://github.com/databricks/spark-avro/pull/216
合并后,应该有一个函数将Avro对象的RDD转换为DataSet(Rows的DataSet等效于DataFrame),而不会在生成的类中使用getSchema()函数的循环引用问题.