apache-spark – 编写Spark函数,它接受Column参数并返回一个Column

我想写一个与dateiff类似的yeardiff函数. yeardiff应该获取两个Column参数并返回一个Column,其中包含这些参数列之间的年数.

我们使用以下示例数据:

val testDf = Seq(
  ("2016-09-10", "2001-08-10"),
  ("2016-04-18", "2010-05-18"),
  ("2016-01-10", "2013-08-10")
)
  .toDF("first_datetime", "second_datetime")
  .withColumn("first_datetime", $"first_datetime".cast("timestamp"))
  .withColumn("second_datetime", $"second_datetime".cast("timestamp"))

我们可以运行它来获取日期差异:

testDf.withColumn("num_days", datediff(col("first_datetime"), col("second_datetime")))

我想能够运行这个:

testDf.withColumn("num_years", yeardiff(col("first_datetime"), col("second_datetime")))

我尝试使用必要的方法签名定义yeardiff函数,但没有得到任何结果:

def yeardiff(end: Column, start: Column): Column = {
  // what do I do here
}    

这是一个黑客转化解决方案,我提出并且不喜欢:

def yearDiff(end: String, start: String)(df: DataFrame): DataFrame = {
  val c = s"${end}_${start}_datediff"
  df
    .withColumn(c, datediff(col(end), col(start)))
    .withColumn("yeardiff", col(c) / 365)
}

编辑

我开始深入研究Spark源代码以了解datediff的工作原理.这是the datediff function definition

def datediff(end: Column, start: Column): Column = withExpr { DateDiff(end.expr, start.expr) }

这是the DateDiff case class

case class DateDiff(endDate: Expression, startDate: Expression)
  extends BinaryExpression with ImplicitCastInputTypes {

  override def left: Expression = endDate
  override def right: Expression = startDate
  override def inputTypes: Seq[AbstractDataType] = Seq(DateType, DateType)
  override def dataType: DataType = IntegerType

  override def nullSafeEval(end: Any, start: Any): Any = {
    end.asInstanceOf[Int] - start.asInstanceOf[Int]
  }

  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    defineCodeGen(ctx, ev, (end, start) => s"$end - $start")
  }
}

最佳答案 这可以解决您的问题:

def yearDiff(end: Column, start: Column): Column = {
  datediff(end, start)/365
}
点赞