我正在尝试在两个Spark RDD上进行连接.我有一个与类别链接的事务日志.我已将事务RDD格式化为具有类别ID作为键.
transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']),
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']),
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]
categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]
事务日志大约为20 GB(3.5亿行).
类别列表小于1KB.
我跑的时候
transactions_cat.join(categories).count()
Spark开始变得很慢.我有一个有643个任务的舞台.前10个任务大约需要1分钟.然后每个任务都越来越慢(在第60个任务周围约15分钟).我不确定是什么问题.
请检查这些截图以获得更好的主意.
我正在使用python shell运行Spark 1.1.0和4名工作人员,总内存为50 GB.
只计算事务RDD非常快(30分钟)
最佳答案 怎么可能是Spark没有注意到你有一个简单的连接问题.当你加入的两个RDD中的一个是如此之小时,你最好不要成为RDD.然后你可以滚动你自己的
hash join实现,这实际上比听起来简单得多.基本上,你需要:
>使用collect()将您的类别列表从RDD中拉出 – 结果通信将很容易为自己付费(或者,如果可能的话,首先不要使其成为RDD)
>将其转换为哈希表,其中一个条目包含一个键的所有值(假设您的键不是唯一的)
>对于大型RDD中的每一对,在哈希表中查找密钥并为列表中的每个值生成一对(如果未找到则该特定对不会产生任何结果)
我有一个implementation in Scala – 随意提出有关翻译的问题,但它应该很容易.
另一个有趣的可能性是尝试使用Spark SQL.我很确定该项目的长期目标将包括自动为您执行此操作,但我不知道他们是否已实现此目标.