Spark UDF and functions(一)

1.创建与使用udf

udf有两种使用方法,一是通过sparkSession注册,在sql中直接使用;二是在dataset中通过Column使用。

udf用法一:注册(在sql中使用)

  • java:

        import org.apache.spark.sql.api.java.UDF1;
        import org.apache.spark.sql.types.DataTypes;
        sparkSession.udf().register("split", new UDF1<String, String[]>() {
          public String[] call(String s) throws Exception {
            return s.split(",");
          }
        }, DataTypes.createArrayType(DataTypes.StringType));
        //sparkSession.udf().register("split", (String value) -> value.split(","),DataTypes.createArrayType(DataTypes.StringType));
    
  • scala:

        spark.udf.register("split", (value: String) => {
          value.split(",")
        })
    

udf用法二:方法调用

  • java:

    spark >= 2.3

        import static org.apache.spark.sql.functions.*;
        import org.apache.spark.sql.expressions.UserDefinedFunction;
        UserDefinedFunction mode = udf(
          (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
        );
        df.select(mode.apply(col("vs"))).show();
    
    

    spark < 2.3

        UDF1 mode = new UDF1<Seq<String>, String>() {
        public String call(final Seq<String> types) throws Exception {
            return types.headOption();
          }
        };
        sparkSession.udf().register("mode", mode, DataTypes.StringType);
        df.select(callUDF("mode", col("vs"))).show();
        df.selectExpr("mode(vs)").show();
        
    
  • scala:

        import org.apache.spark.sql.functions._
        val test_split = udf((value:String)=> value.split(","))
        ds.withColumn("test_split",test_split($"column"))
    

2.String* 参数传入数组

使用scala时,dataset的select和drop等方法中有要传入String*可变参数类型的,但如果只有数组形式,转换方法如下:
dataset.drop(columns: _*)dataset.select(columns.map(col(_)): _*)

    原文作者:lioversky
    原文地址: https://www.jianshu.com/p/705ea856a0c2
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞