Spark 1.6.1版
创建边缘和顶点RDD
val vertices_raw = sqlContext.read.json("vertices.json.gz")
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))
val verticesRDD: RDD[(VertexId, String)] = vertices
val edges_raw = sqlContext.read.json("edges.json.gz")
val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))
我有一个可以检查的EdgesRDD
[IN] edgesRDD
res10: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[19] at map at <console>:38
[IN] edgesRDD.foreach(println)
Edge(5000005125036254,5000005125036231,42.26548472559799)
Edge(5000005125651333,5000005125651330,29.557979625165135)
Edge(5000005125651329,5000005125651330,81.9310872300414)
我有一个顶点RDD
[IN] verticesRDD
res12: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[9] at map at <console>:38
[IN] verticesRDD.foreach(println)
(5000005125651331,343722)
(5000005125651332,343723)
(5000005125651333,343724)
我将这些结合起来创建一个图表.
[IN] val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@303bbd02
我可以检查图形对象中的edgesRDD:
[IN] graph.edges.foreach(println)
Edge(5000005125774813,4000000029917080,72.9742898009203)
Edge(5000005125774814,5000005125774813,49.87951589790352)
Edge(5000005125775080,4000000029936370,69.62871049042008)
但是,当我检查顶点RDD时:
[IN] graph.vertices.foreach(println)
我的图形构造有问题吗?
ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 13)
java.lang.ArrayStoreException: java.lang.Long
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/17 12:27:16 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/17 12:27:16 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
at $iwC$$iwC$$iwC.<init>(<console>:68)
at $iwC$$iwC.<init>(<console>:70)
at $iwC.<init>(<console>:72)
at <init>(<console>:74)
at .<init>(<console>:78)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArrayStoreException: java.lang.Long
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
编辑.经过一番挖掘,是this相关吗?我检查了VertexId的要求:
VertexId = type Long
A 64-bit vertex identifier that uniquely identifies a vertex within a graph.
我提供的独特字段,例如,5000005125036318是令人满意的.
最佳答案 是.您的问题与
ArrayStoreException有关,因为您当前的代码尝试将Long类型存储到String数组中.
ArrayStoreException is thrown to indicate that an attempt has been made to store the wrong
type of object into an array of objects
为什么要使用ArrayStoreException?
以下是vertices.json.gz文件的快照:
{"toid": "osgb4000000031043205", "index": 1, "point": [508180.748, 195333.973]}
{"toid": "osgb4000000031043206", "index": 2, "point": [508163.122, 195316.627]}
{"toid": "osgb4000000031043207", "index": 3, "point": [508172.075, 195325.719]}
{"toid": "osgb4000000031043208", "index": 4, "point": [508513, 196023]}
默认情况下,“index”值在创建vertices_raw DataFrame时读取为LongType,如下所示:
scala> vertices_raw.schema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(index,LongType,true), StructField(point,ArrayType(DoubleType,true),true), StructField(toid,StringType,true))
当您创建图形时,Long类型将存储到String数组中,从而导致此异常:
val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
解决方案1:
使用Long作为索引,即替换以下行:
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))
val verticesRDD: RDD[(VertexId, String)] = vertices
val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
有:
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
val verticesRDD: RDD[(VertexId, Long)] = vertices
val graph: Graph[(Long),Double] = Graph(verticesRDD, edgesRDD)
解决方案2:
从vertices_raw创建一个新的DataFrame vertices_raw2,将索引的类型从LongType转换为StringType,如下所示:
import org.apache.spark.sql.functions._
val to_string = udf[String, Long]( _.toString)
val vertices_raw2 = vertices_raw.withColumn("index", to_string(vertices_raw("index"))).select("index", "toid")
然后进一步使用vertices_raw2 DataFrame创建顶点RDD:
val vertices = vertices_raw2.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))
输出:
scala> graph.edges.foreach(println)
Edge(5000005125740769,4000000029965899,51.55460482650549)
Edge(5000005125740770,5000005125740759,26.108461618676447)
Edge(5000005125740771,5000005125740763,30.841246458481766) ...
scala> graph.vertices.foreach(println)
(4000000029867298,58335)
(4000000029892180,10846)
(4000000027730512,338018)
(4000000023185673,43945) ...