scala – Spark将sql窗口函数迁移到RDD以获得更好的性能

应该为数据框中的多个列执行一个函数

def handleBias(df: DataFrame, colName: String, target: String = target) = {
    val w1 = Window.partitionBy(colName)
    val w2 = Window.partitionBy(colName, target)

    df.withColumn("cnt_group", count("*").over(w2))
      .withColumn("pre2_" + colName, mean(target).over(w1))
      .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
      .drop("cnt_group")
  }

这可以很好地编写,如上面的spark-SQL和for循环中所示.然而,这导致了大量的洗牌(spark apply function to columns in parallel).

一个最小的例子:

  val df = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

  val columnsToDrop = Seq("col3TooMany")
  val columnsToCode = Seq("col1", "col2")
  val target = "TARGET"

  val targetCounts = df.filter(df(target) === 1).groupBy(target)
    .agg(count(target).as("cnt_foo_eq_1"))
  val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

  val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
    (currentDF, colName) => handleBias(currentDF, colName)
  }

  result.drop(columnsToDrop: _*).show

如何使用RDD API更有效地表达这一点? aggregateByKeys应该是一个好主意,但我仍然不清楚如何在这里应用它来替换窗口函数.

(提供更多背景/更大的例子https://github.com/geoHeil/sparkContrastCoding)

编辑

最初,我从Spark dynamic DAG is a lot slower and different from hard coded DAG开始,如下所示.好消息是,每列似乎都是独立/并行的.缺点是连接(即使是300 MB的小数据集)变得“太大”并导致无响应的火花.

handleBiasOriginal("col1", df)
    .join(handleBiasOriginal("col2", df), df.columns)
    .join(handleBiasOriginal("col3TooMany", df), df.columns)
    .drop(columnsToDrop: _*).show

  def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
    val pre1_1 = df
      .filter(df(target) === 1)
      .groupBy(col, target)
      .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
      .drop(target)

    val pre2_1 = df
      .groupBy(col)
      .agg(mean(target).alias("pre2_" + col))

    df
      .join(pre1_1, Seq(col), "left")
      .join(pre2_1, Seq(col), "left")
      .na.fill(0)
  }

此图像为spark 2.1.0,Spark dynamic DAG is a lot slower and different from hard coded DAG的图像为2.0.2
《scala – Spark将sql窗口函数迁移到RDD以获得更好的性能》

应用缓存时,DAG会更简单一些
    df.cache
    handleBiasOriginal(“col1”,df). …

除了窗口函数之外还有哪些其他可能性来优化SQL?
如果SQL是动态生成的,那么最好是很好的.

《scala – Spark将sql窗口函数迁移到RDD以获得更好的性能》

最佳答案 这里的要点是避免不必要的洗牌.现在,您的代码会为您要包含的每个列重复两次,并且无法在列之间重复使用生成的数据布局.

为简单起见,我假设target始终是二进制({0,1}),并且您使用的所有剩余列都是StringType.此外,我假设列的基数足够低,以便将结果分组并在本地处理.您可以调整这些方法来处理其他情况但需要更多工作.

RDD API

>从长到长重塑数据:

import org.apache.spark.sql.functions._

val exploded = explode(array(
  (columnsToDrop ++ columnsToCode).map(c => 
    struct(lit(c).alias("k"), col(c).alias("v"))): _*
)).alias("level")

val long = df.select(exploded, $"TARGET")

> aggregateByKey,重塑和收集:

import org.apache.spark.util.StatCounter

val lookup = long.as[((String, String), Int)].rdd
  // You can use prefix partitioner (one that depends only on _._1)
  // to avoid reshuffling for groupByKey
  .aggregateByKey(StatCounter())(_ merge _, _ merge _)
  .map { case ((c, v), s) => (c, (v, s)) }
  .groupByKey
  .mapValues(_.toMap)
  .collectAsMap

>您可以使用查找来获取各个列和级别的统计信息.例如:

lookup("col1")("A")
org.apache.spark.util.StatCounter = 
  (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000)

给你col1,level A的数据.根据二进制TARGET假设,这个信息是完整的(你得到两个类的计数/分数).

您可以使用这样的查找来生成SQL表达式或将其传递给udf并将其应用于各个列.

DataFrame API

>将数据转换为long,与RDD API一样.
>根据级别计算聚合:

val stats = long
  .groupBy($"level.k", $"level.v")
  .agg(mean($"TARGET"), sum($"TARGET"))

>根据您的偏好,您可以对其进行重新整形,以实现高效连接或转换为本地集合,类似于RDD解决方案.

点赞