hadoop – Spark – 寻找重叠的价值观或寻找共同朋友的变体

我有一个问题,我试图用Spark解决.我对Spark很新,所以我不确定设计它的最佳方法是什么.

输入:

group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5

我想找到每对用户之间的相互组计数.所以对于上面的输入,我期望的输出是:

输出:

1st user || 2nd user || mutual/intersection count || union count
------------------------------------------------------------
user1        user2           2                       7
user1        user3           1                       6
user1        user4           1                       9
user2        user4           3                       8

我认为有几种方法可以解决这个问题,其中一个解决方案可能是:

>创建一个键值对,其中键为user,value为group
>按键分组,然后我们将有一个用户所属的组列表
>然后找到两组之间的交集/联合

例:

(1st stage): Map
group1=user1,user2 ==>
          user1, group1
          user2, group1
group2=user1,user2,user3 ==>
          user1, group2
          user2, group2
          user3, group2
....
....
....


(2nd stage): Reduce by key
user1 -> group1, group2, group4, group8
user2 -> group1, group2, group3, group7, group9

但我的问题是,在用钥匙减少它之后,以我想要的方式表示计数的最佳方法是什么?

有没有更好的方法来处理这个问题?用户的最大数量是常量,不会超过5000,因此这是它将创建的最大键数.但输入可能包含接近1B行的几行.我不认为这会有问题,如果我错了,请纠正我.

更新:

这是我用Spark的一点知识来解决这个问题的一段代码(上个月开始学习Spark):

def createPair(line: String): Array[(String, String)] = {
    val splits = line.split("=")
    val kuid = splits(0)
    splits(1).split(",").map { segment => (segment, kuid) }
}


val input = sc.textFile("input/test.log")
val pair = input.flatMap { line => createPair(line) }

val pairListDF = pair
  .aggregateByKey(scala.collection.mutable.ListBuffer.empty[String])(
    (kuidList, kuid) => { kuidList += kuid; kuidList },
    (kuidList1, kuidList2) => { kuidList1.appendAll(kuidList2); kuidList1 })
  .mapValues(_.toList).toDF().select($"_1".alias("user"), $"_2".alias("groups"))

pairListDF.registerTempTable("table")

sqlContext.udf.register("intersectCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.intersect(list2).size)
sqlContext.udf.register("unionCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.union(list2).distinct.size)

val populationDF = sqlContext.sql("SELECT t1.user AS user_first,"
  + "t2.user AS user_second,"
  + "intersectCount(t1.groups, t2.groups) AS intersect_count,"
  + "unionCount(t1.groups, t2.groups) AS union_count"
  + " FROM table t1 INNER JOIN table t2"
  + " ON t1.user < t2.user"
  + " ORDER BY user_first,user_second")

输出:

+----------+-----------+---------------+-----------+
|user_first|user_second|intersect_count|union_count|
+----------+-----------+---------------+-----------+
|     user1|      user2|              2|          7|
|     user1|      user3|              1|          6|
|     user1|      user4|              1|          9|
|     user1|      user5|              1|          8|
|     user2|      user3|              1|          7|
|     user2|      user4|              3|          8|
|     user2|      user5|              1|          9|
|     user3|      user4|              1|          8|
|     user3|      user5|              2|          6|
|     user4|      user5|              3|          8|
+----------+-----------+---------------+-----------+

很想得到关于我的代码和我缺少的东西的一些反馈.我可以随意批评我的代码,因为我刚开始学习Spark.再次感谢@axiom的回答,比我预期的更小更好的解决方案.

最佳答案 摘要:

获得对数,然后使用这个事实

union(a, b) = count(a) + count(b) – intersection(a, b)

val data = sc.textFile("test")
//optionally data.cache(), depending on size of data.
val pairCounts  = data.flatMap(pairs).reduceByKey(_ + _)
val singleCounts = data.flatMap(singles).reduceByKey(_ + _)
val singleCountMap = sc.broadcast(singleCounts.collectAsMap())
val result = pairCounts.map{case ((user1, user2), intersectionCount) =>(user1, user2, intersectionCount, singleCountMap.value(user1) + singleCountMap.value(user2) - intersectionCount)}

细节:

>总共有5000个用户,2500万个密钥(每对1个)不应该太多.我们可以使用reduceByKey来计算交叉点数.
>可以在地图中轻松地播放个人计数.
>现在众所周知:

Union(user1,user2)= count(user1)count(user2) – Intersection(user1,user2).

从广播的地图中读取前两个计数,同时我们映射对数的rdd.

码:

//generate ((user1, user2), 1) for pair counts
def pairs(str: String) = {
 val users = str.split("=")(1).split(",")
 val n = users.length
 for(i <- 0 until n; j <- i + 1 until n) yield {
  val pair = if(users(i) < users(j)) {
    (users(i), users(j))
  } else {
   (users(j), users(i))
  } //order of the user in a list shouldn't matter
  (pair, 1)
 } 
}

//generate (user, 1), to obtain single counts
def singles(str: String) = {
  for(user <- str.split("=")(1).split(",")) yield (user, 1)
}


//read the rdd
scala> val data = sc.textFile("test")
scala> data.collect.map(println)
group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5

//get the pair counts
scala> val pairCounts  = data.flatMap(pairs).reduceByKey(_ + _)
pairCounts: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[16] at reduceByKey at <console>:25



//just checking
scala> pairCounts.collect.map(println)
((user2,user3),1)
((user1,user3),1)
((user3,user4),1)
((user2,user5),1)
((user1,user5),1)
((user2,user4),3)
((user4,user5),3)
((user1,user4),1)
((user3,user5),2)
((user1,user2),2)

//single counts
scala> val singleCounts = data.flatMap(singles).reduceByKey(_ + _)
singleCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:25

scala> singleCounts.collect.map(println)

(user5,5)
(user3,3)
(user1,4)
(user2,5)
(user4,6)


//broadcast single counts
scala> val singleCountMap = sc.broadcast(singleCounts.collectAsMap())

//calculate the results:

最后:

scala> val res = pairCounts.map{case ((user1, user2), intersectionCount) => (user1, user2, intersectionCount, singleCountMap.value(user1) + singleCountMap.value(user2) - intersectionCount)}
res: org.apache.spark.rdd.RDD[(String, String, Int, Int)] = MapPartitionsRDD[23] at map at <console>:33

scala> res.collect.map(println)
(user2,user3,1,7)
(user1,user3,1,6)
(user3,user4,1,8)
(user2,user5,1,9)
(user1,user5,1,8)
(user2,user4,3,8)
(user4,user5,3,8)
(user1,user4,1,9)
(user3,user5,2,6)
(user1,user2,2,7)

注意:

>在生成对时,我对元组进行排序,因为我们不希望列表中的用户顺序很重要.
>将用户名字符串编码为整数,您可以获得显着的性能提升.

点赞