应该为数据框中的多个列执行一个函数
def handleBias(df: DataFrame, colName: String, target: String = target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
}
这可以很好地编写,如上面的spark-SQL和for循环中所示.然而,这导致了大量的洗牌(spark apply function to columns in parallel).
一个最小的例子:
val df = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
val columnsToDrop = Seq("col3TooMany")
val columnsToCode = Seq("col1", "col2")
val target = "TARGET"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
(currentDF, colName) => handleBias(currentDF, colName)
}
result.drop(columnsToDrop: _*).show
如何使用RDD API更有效地表达这一点? aggregateByKeys应该是一个好主意,但我仍然不清楚如何在这里应用它来替换窗口函数.
(提供更多背景/更大的例子https://github.com/geoHeil/sparkContrastCoding)
编辑
最初,我从Spark dynamic DAG is a lot slower and different from hard coded DAG开始,如下所示.好消息是,每列似乎都是独立/并行的.缺点是连接(即使是300 MB的小数据集)变得“太大”并导致无响应的火花.
handleBiasOriginal("col1", df)
.join(handleBiasOriginal("col2", df), df.columns)
.join(handleBiasOriginal("col3TooMany", df), df.columns)
.drop(columnsToDrop: _*).show
def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
此图像为spark 2.1.0,Spark dynamic DAG is a lot slower and different from hard coded DAG的图像为2.0.2
应用缓存时,DAG会更简单一些
df.cache
handleBiasOriginal(“col1”,df). …
除了窗口函数之外还有哪些其他可能性来优化SQL?
如果SQL是动态生成的,那么最好是很好的.
最佳答案 这里的要点是避免不必要的洗牌.现在,您的代码会为您要包含的每个列重复两次,并且无法在列之间重复使用生成的数据布局.
为简单起见,我假设target始终是二进制({0,1}),并且您使用的所有剩余列都是StringType.此外,我假设列的基数足够低,以便将结果分组并在本地处理.您可以调整这些方法来处理其他情况但需要更多工作.
RDD API
>从长到长重塑数据:
import org.apache.spark.sql.functions._
val exploded = explode(array(
(columnsToDrop ++ columnsToCode).map(c =>
struct(lit(c).alias("k"), col(c).alias("v"))): _*
)).alias("level")
val long = df.select(exploded, $"TARGET")
> aggregateByKey,重塑和收集:
import org.apache.spark.util.StatCounter
val lookup = long.as[((String, String), Int)].rdd
// You can use prefix partitioner (one that depends only on _._1)
// to avoid reshuffling for groupByKey
.aggregateByKey(StatCounter())(_ merge _, _ merge _)
.map { case ((c, v), s) => (c, (v, s)) }
.groupByKey
.mapValues(_.toMap)
.collectAsMap
>您可以使用查找来获取各个列和级别的统计信息.例如:
lookup("col1")("A")
org.apache.spark.util.StatCounter =
(count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000)
给你col1,level A的数据.根据二进制TARGET假设,这个信息是完整的(你得到两个类的计数/分数).
您可以使用这样的查找来生成SQL表达式或将其传递给udf并将其应用于各个列.
DataFrame API
>将数据转换为long,与RDD API一样.
>根据级别计算聚合:
val stats = long
.groupBy($"level.k", $"level.v")
.agg(mean($"TARGET"), sum($"TARGET"))
>根据您的偏好,您可以对其进行重新整形,以实现高效连接或转换为本地集合,类似于RDD解决方案.