我正在尝试根据一个(或多个)列的值拆分数据帧,并独立于其余列旋转每个结果数据帧.即,给定输入数据帧:
val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b","germany"),
("lola","50","c","street c","argentina"), ("maria","60","d","street d","argentina"), ("joe","70","e","street e","argentina")
.toDF("name","age","company","address","country")
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
我需要按“国家/地区”列的不同值拆分记录.对于输入数据帧,拆分应该产生:
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
而且我还必须在每个数据框下旋转“名称”和“年龄”列,以便每个人拥有不同的公司和地址,同时保持其余列的完整性.所需的输出数据框如下所示:
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//|jimmy| 30| a|street a| germany|
//| tom| 20| b|street b| germany|
//| joe| 60| c|street c|argentina|
//| lola| 40| d|street d|argentina|
//|maria| 50| e|street e|argentina|
//+-----+---+-------+--------+---------+
最后的行顺序无关紧要
我的第一次尝试(在Spark-shell上运行)
我尝试为每一行分配一个唯一的id,然后将所需的列(名称和年龄)洗牌,并使用辅助id值将重新排序的数据帧与其余的数据帧连接起来.这里的主要问题是使用collect(),这对于大数据帧可能是危险的,并且重新分区(1)几乎违反分布式计算和Spark(它用于在使用不同数量的分区压缩rdds时避免异常) .
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.LongType
// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)
// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
val rotateCols = colsToRotate.map(col) :+ col(auxCol)
// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())
val splitValuesSchema = dfWithID.select(splitCols: _*).schema
// create one dataframe for each value of the splitting column
val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect()
.map(row => spark.sparkContext.makeRDD(List(row)))
.map(rdd => spark.createDataFrame(rdd, splitValuesSchema))
val rotateIDCols = Array(auxCol) ++ colsToRotate
// join the split values with their records (DFs with id + colsToRotate)
val splittedDFs = splitValuesDFs
.map(df => df.join(dfWithID, colToSplit).selectExpr(rotateIDCols: _*))
// random reorder the auxiliar id column (DFs with random ids)
val randIdDFs = splittedDFs
.map(df => df.select(auxCol).orderBy(rand()).toDF())
// get rdds with random ids
val randIdRdds = randIdDFs
.map(df => df.select(auxCol).rdd.map(row => row(0)))
// finally, zip and append the rdds with the random ids to the dataframes created by
// splitting the main df to obtain the rotated dataframe with all the data
val tuples = (splittedDFs, randIdRdds).zipped
val newRdds = tuples
.map((df: DataFrame, rdd) => df.rdd.repartition(1).zip(rdd.repartition(1))
.map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2))))
val tuples2 = (splittedDFs, newRdds).zipped
val rotatedDF = tuples2.map((df: DataFrame, rdd) => spark
.createDataFrame(rdd, df.schema.add("rotated_id", LongType)).drop(auxCol))
.reduce(_ union _).withColumnRenamed("rotated_id", "column2join")
// get the rest of the columns
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
.withColumnRenamed(auxCol, "column2join")
// join both dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
.select(inputDF.columns.map(col): _*) // to keep the initial columns order
显示输出数据帧会产生与上述预期输出类似的结果(它主要取决于rand()函数顺序)
我想尽可能避免使用收集和重新分区,以获得更实用的解决方案.
欢迎任何评论或想法!
最佳答案 通过尽可能地删除表现不佳的电话(重新分配和一些收集),我一直在努力寻找更好,更清晰,更实用的解决方案.我添加了一个辅助方法来索引数据帧行,以便能够连接不相关的部分(无法通过任何公共列连接的列或dfs).这是我目前的开发,它还删除了rdds和数据帧之间的多个转换,看起来更易读和易懂.
我希望这可以帮助有同样担忧的人.
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.{LongType, StructField, StructType}
// auxiliar method to index row in dataframes
def addRowIndex(df: DataFrame) = spark.createDataFrame(
df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) },
StructType(df.schema.fields :+ StructField("index", LongType, false))
)
// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)
// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())
val rotateIDCols = (Array(auxCol) ++ colsToRotate).map(col)
// get an array of dfs with the different values of the splitter column(s)
// --assuming there will not be too much different values in the splitter column--
val filterValues = dfWithID.select(splitCols: _*).distinct().collect()
// retrieve the different dfs according to the splitter values
val splitDfs = filterValues.map(filterRow => filterRow.getValuesMap(colToSplit)
.foldLeft(dfWithID) {
(df, filterField) =>
df.filter(col(filterField._1) === filterField._2)
.select(rotateIDCols: _*)
})
// get and random reorder the aux id column for each dataframe
val randIdDfs = splitDfs.map(_.select(auxCol).orderBy(rand()).toDF())
// remove aux column for each dataframe
val splitWithoutIdDfs = splitDfs.map(_.drop(auxCol))
val dfsTuples = splitWithoutIdDfs.zip(randIdDfs)
// index row of dfs with columns to rotate and dfs with random ids
val indexedDfsTuples = dfsTuples.map {
case (colsDf, idsDf) => (addRowIndex(colsDf), addRowIndex(idsDf))
}
// join reordered-ids dfs and cols to rotate dataframes by the index
val reorderedDfs = indexedDfsTuples.map {
case (df1, df2) => df1.join(df2, Seq("index"))
.drop("index").withColumnRenamed(auxCol, "column2join")
}
// union both dataframes to create the rotated df
reorderedDfs.tail.foldLeft(reorderedDfs.head) { (acc, df) => acc.union(df) }
// get the rest of the columns to get the part of the main df which does not change
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
.withColumnRenamed(auxCol, "column2join")
// join the rotated and no rotated dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
.select(inputDF.columns.map(col): _*) // to keep the initial columns order