apache-spark – 检查GraphX图形对象

Spark 1.6.1版

创建边缘和顶点RDD

val vertices_raw = sqlContext.read.json("vertices.json.gz")

val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))

val verticesRDD: RDD[(VertexId, String)] = vertices

val edges_raw = sqlContext.read.json("edges.json.gz")

val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))

我有一个可以检查的EdgesRDD

[IN] edgesRDD
res10: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[19] at map at <console>:38
[IN] edgesRDD.foreach(println)

Edge(5000005125036254,5000005125036231,42.26548472559799)
Edge(5000005125651333,5000005125651330,29.557979625165135)
Edge(5000005125651329,5000005125651330,81.9310872300414)

我有一个顶点RDD

[IN] verticesRDD
res12: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[9] at map at <console>:38

[IN] verticesRDD.foreach(println)
(5000005125651331,343722)
(5000005125651332,343723)
(5000005125651333,343724)

我将这些结合起来创建一个图表.

[IN] val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@303bbd02

我可以检查图形对象中的edgesRDD:

[IN] graph.edges.foreach(println)

Edge(5000005125774813,4000000029917080,72.9742898009203)
Edge(5000005125774814,5000005125774813,49.87951589790352)
Edge(5000005125775080,4000000029936370,69.62871049042008)

但是,当我检查顶点RDD时:

[IN] graph.vertices.foreach(println)

我的图形构造有问题吗?

ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 13)
java.lang.ArrayStoreException: java.lang.Long
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/08/17 12:27:16 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

16/08/17 12:27:16 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
        at $iwC$$iwC$$iwC.<init>(<console>:68)
        at $iwC$$iwC.<init>(<console>:70)
        at $iwC.<init>(<console>:72)
        at <init>(<console>:74)
        at .<init>(<console>:78)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArrayStoreException: java.lang.Long
        at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
        at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
        at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
        at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

编辑.经过一番挖掘,是this相关吗?我检查了VertexId的要求:

VertexId = type Long
A 64-bit vertex identifier that uniquely identifies a vertex within a graph.

我提供的独特字段,例如,5000005125036318是令人满意的.

最佳答案 是.您的问题与
ArrayStoreException有关,因为您当前的代码尝试将Long类型存储到String数组中.

ArrayStoreException is thrown to indicate that an attempt has been made to store the wrong
type of object into an array of objects

为什么要使用ArrayStoreException?

以下是vertices.json.gz文件的快照:

{"toid": "osgb4000000031043205", "index": 1, "point": [508180.748, 195333.973]}
{"toid": "osgb4000000031043206", "index": 2, "point": [508163.122, 195316.627]}
{"toid": "osgb4000000031043207", "index": 3, "point": [508172.075, 195325.719]}
{"toid": "osgb4000000031043208", "index": 4, "point": [508513, 196023]}

默认情况下,“index”值在创建vertices_raw DataFrame时读取为LongType,如下所示:

scala> vertices_raw.schema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(index,LongType,true), StructField(point,ArrayType(DoubleType,true),true), StructField(toid,StringType,true))

当您创建图形时,Long类型将存储到String数组中,从而导致此异常:

val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)

解决方案1:

使用Long作为索引,即替换以下行:

val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))

val verticesRDD: RDD[(VertexId, String)] = vertices

val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)

有:

val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))

val verticesRDD: RDD[(VertexId, Long)] = vertices

val graph: Graph[(Long),Double] = Graph(verticesRDD, edgesRDD)

解决方案2:

从vertices_raw创建一个新的DataFrame vertices_raw2,将索引的类型从LongType转换为StringType,如下所示:

import org.apache.spark.sql.functions._

val to_string = udf[String, Long]( _.toString)

val vertices_raw2 = vertices_raw.withColumn("index", to_string(vertices_raw("index"))).select("index", "toid")

然后进一步使用vertices_raw2 DataFrame创建顶点RDD:

val vertices = vertices_raw2.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))

输出:

scala> graph.edges.foreach(println)
Edge(5000005125740769,4000000029965899,51.55460482650549)
Edge(5000005125740770,5000005125740759,26.108461618676447)
Edge(5000005125740771,5000005125740763,30.841246458481766) ...

scala> graph.vertices.foreach(println)
(4000000029867298,58335)
(4000000029892180,10846)
(4000000027730512,338018)
(4000000023185673,43945) ...
点赞