scala – 在spark中组合/加入ID行

给定一个充满ID行的文件,例如

i1, i2, i5
i3, i4
i2, i6, i7
i4, i8
i9, i3

你会如何通过链接相同的ID加入他们?因此,对于上面的示例,行1通过i2链接到行3,行2分别通过i4和i3链接到行4和5.这将给你以下(重复删除)

i1, i2, i5, i6, i7
i3, i4, i8, i9

我可以通过遍历行来完成它,但是想知道你将如何以功能的方式进行它?

最佳答案 当您使用Apache Spark时,您可以使用内置的GraphX组件为您完成工作.

import org.apache.spark.graphx._

def cross[Y](xs: Traversable[Y], ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)

val data = sc.parallelize(List(
    "1\t5\t3", 
    "3\t9\t30",
    "7\t10\t12",
    "10\t7\t13"
))

val prep = data.map(x => x.split("\t").map(_.toLong).toList)

val vertex = prep
  .flatMap(x => x)
  .map(x => x -> s"ID=$x")

val edges = prep
  .map(x => cross(x, x))
  .flatMap(x => x)
  .map(x => new Edge(x._1, x._2, "likes"))

val graph = Graph(vertex, edges)
val linked = graph
  .connectedComponents
  .vertices
  .map(_.swap)
  .groupByKey

linked.take(10).foreach(println)

将打印出以下结果:

(1,CompactBuffer(30, 3, 9, 1, 5))
(7,CompactBuffer(7, 10, 12, 13))

Cross只是创建两个列表的叉积,因此我们可以在所有顶点之间创建边.

connectedComponents函数将遍历图形并找到共享边缘的所有顶点并创建新图形,其中每个顶点是顶点Id的元组 – > “主要”顶点ID.

所以:

graph.connectedComponents.vertices.take(10).foreach(println)

打印出来

(30,1)
(1,1)
(3,1)
(5,1)
(7,7)
(9,1)
(10,7)
(12,7)
(13,7)

如您所见,已选择1和7作为“主顶点”并链接到第一个图形中的所有连接顶点.因此,简单的交换和组将所有连接的ID组合在一起.

点赞