尝试将StringType强制转换为
JSON的ArrayType,以生成CSV格式的数据帧.
在Spark2上使用pyspark
我正在处理的CSV文件;如下 –
date,attribute2,count,attribute3
2017-09-03,'attribute1_value1',2,'[{"key":"value","key2":2},{"key":"value","key2":2},{"key":"value","key2":2}]'
2017-09-04,'attribute1_value2',2,'[{"key":"value","key2":20},{"key":"value","key2":25},{"key":"value","key2":27}]'
如上所示,它在文字字符串中包含一个属性“attribute3”,从技术上讲,它是一个完全长度为2的字典(JSON)列表.
(这是函数输出的区别)
printSchema()的片段
attribute3: string (nullable = true)
我试图将“attribute3”转换为ArrayType,如下所示
temp = dataframe.withColumn(
"attribute3_modified",
dataframe["attribute3"].cast(ArrayType())
)
06003
实际上,ArrayType期望数据类型作为参数.我试过“json”,但它没有用.
期望的输出 –
最后,我需要将attribute3转换为ArrayType()或简单的Python列表. (我试图避免使用eval)
如何将其转换为ArrayType,以便将其视为JSON列表?
我在这里错过了什么吗?
(documentation,没有直截了当地解决这个问题)
最佳答案 使用
from_json
与schema3列中的实际数据匹配的模式将json转换为ArrayType:
原始数据框:
df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: string (nullable = true)
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
创建架构:
schema = ArrayType(
StructType([StructField("key", StringType()),
StructField("key2", IntegerType())]))
使用from_json:
df = df.withColumn("attribute3", from_json(df.attribute3, schema))
df.printSchema()
#root
# |-- date: string (nullable = true)
# |-- attribute2: string (nullable = true)
# |-- count: long (nullable = true)
# |-- attribute3: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- key: string (nullable = true)
# | | |-- key2: integer (nullable = true)
df.show(1, False)
#+----------+----------+-----+------------------------------------+
#|date |attribute2|count|attribute3 |
#+----------+----------+-----+------------------------------------+
#|2017-09-03|attribute1|2 |[[value, 2], [value, 2], [value, 2]]|
#+----------+----------+-----+------------------------------------+