scala – 如何在Spark UDF中使用Option

我有这样的数据集:

+----+------+
|code|status|
+-----------+
|   1| "new"|
|   2|  null|
|   3|  null|
+----+------+

我想写一个依赖于两列的UDF.

我按照this answer中的第二种方法来处理它,即在UDF外部处理null,并编写myFn以将布尔值作为第二个参数:

df.withColumn("new_column",
  when(pst_regs("status").isNull, 
    myFnUdf($"code", lit(false))
  )
  .otherwise(
    myFnUdf($"code", lit(true))
  )
)

为了在UDF中处理null,我看到的方法是每this answer,它讨论“使用Options包装参数”.我试过这样的代码:

df.withColumn("new_column", myFnUdf($"code", $"status"))

def myFn(code: Int, status: String) = (code, Option(status)) match {
  case (1, "new") => "1_with_new_status"
  case (2, Some(_)) => "2_with_any_status"
  case (3, None) => "3_no_status"
}

但是一行为null会导致类型不匹配; found:None.type required String.我还尝试在udf创建期间使用Option包装参数但没有成功.这个的基本形式(没有Option)看起来像这样:

myFnUdf = udf[String, Int, String](myFn(_:Int, _:String))

我是Scala的新手所以我确定我错过了一些简单的东西.我的一些困惑可能是从函数创建udfs的不同语法(例如,每https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-udfs.html),所以我不确定我是否使用最好的方法.任何帮助赞赏!

编辑

编辑为@ user6910411和@sgvd评论添加缺失(1,“新”)案例.

最佳答案 首先,您可能正在使用一些我们在此处缺少的代码.当我尝试你的例子myFn,用val myFnUdf = udf(myFn _)制作一个UDF并用df.withColumn(“new_column”,myFnUdf($“code”,$“status”))运行它.显示,我不要不会出现类型不匹配,而是出现MatchError,同样也是user6910411所指出的.这是因为没有匹配的模式(1,“新”).

除此之外,虽然通常使用Scala的选项而不是原始的空值更好,但在这种情况下你不必这样做.以下示例直接使用null:

val my_udf = udf((code: Int, status: String) => status match {
    case null => "no status"
    case _ => "with status"
})

df.withColumn("new_column", my_udf($"code", $"status")).show

结果:

+----+------+-----------+
|code|status| new_column|
+----+------+-----------+
|   1|   new|with status|
|   2|  null|  no status|
|   2|  null|  no status|
+----+------+-----------+

使用选项包装仍然有效:

val my_udf = udf((code: Int, status: String) => Option(status) match {
    case None => "no status"
    case Some(_) => "with status"
})

这给出了相同的结果.

点赞