python – PySpark:当列是列表时,向DataFrame添加一列

我已经阅读了类似的问题,但无法找到解决我具体问题的方法.

我有一份清单

l = [1, 2, 3]

和一个DataFrame

df = sc.parallelize([
    ['p1', 'a'],
    ['p2', 'b'],
    ['p3', 'c'],
]).toDF(('product', 'name'))

我想获得一个新的DataFrame,其中列表l作为另一列添加,即

+-------+----+---------+
|product|name| new_col |
+-------+----+---------+
|     p1|   a|     1   |
|     p2|   b|     2   |
|     p3|   c|     3   |
+-------+----+---------+

JOIN的方法,我加入df的时候

 sc.parallelize([[1], [2], [3]])

失败了.使用withColumn的方法,如

new_df = df.withColumn('new_col', l)

失败,因为列表不是Column对象.

最佳答案 因此,通过阅读一些有趣的东西
here,我已经确定你不能真正只是将随机/任意列附加到给定的DataFrame对象.看起来你想要的更多的是拉链而不是连接.我环顾四周找到了
this ticket,这让我觉得如果你有DataFrame而不是RDD对象你将无法压缩.

我能够解决你的问题的唯一方法就是离开DataFrame对象的世界并返回到RDD对象.我还需要为连接创建索引,这可能适用于您的用例,也可能不适用.

l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)

rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)

# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])

当我运行new_df.show()时,我得到:

+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
|     p1|   a|      1|
|     p2|   b|      2|
|     p3|   c|      3|
+-------+----+-------+

旁注:我真的很惊讶这没用.看起来像外部联接?

from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)

当我运行new_df.show()时,我得到:

+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
|     p1|   a|           1|
|     p1|   a|           2|
|     p1|   a|           3|
|     p2|   b|           1|
|     p3|   c|           1|
|     p2|   b|           2|
|     p2|   b|           3|
|     p3|   c|           2|
|     p3|   c|           3|
+-------+----+------------+
点赞