1.创建与使用udf
udf有两种使用方法,一是通过sparkSession注册,在sql中直接使用;二是在dataset中通过Column使用。
udf用法一:注册(在sql中使用)
java:
import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes; sparkSession.udf().register("split", new UDF1<String, String[]>() { public String[] call(String s) throws Exception { return s.split(","); } }, DataTypes.createArrayType(DataTypes.StringType)); //sparkSession.udf().register("split", (String value) -> value.split(","),DataTypes.createArrayType(DataTypes.StringType));
scala:
spark.udf.register("split", (value: String) => { value.split(",") })
udf用法二:方法调用
java:
spark >= 2.3
import static org.apache.spark.sql.functions.*; import org.apache.spark.sql.expressions.UserDefinedFunction; UserDefinedFunction mode = udf( (Seq<String> ss) -> ss.headOption(), DataTypes.StringType ); df.select(mode.apply(col("vs"))).show();
spark < 2.3
UDF1 mode = new UDF1<Seq<String>, String>() { public String call(final Seq<String> types) throws Exception { return types.headOption(); } }; sparkSession.udf().register("mode", mode, DataTypes.StringType); df.select(callUDF("mode", col("vs"))).show(); df.selectExpr("mode(vs)").show();
scala:
import org.apache.spark.sql.functions._ val test_split = udf((value:String)=> value.split(",")) ds.withColumn("test_split",test_split($"column"))
2.String* 参数传入数组
使用scala时,dataset的select和drop等方法中有要传入String*可变参数类型的,但如果只有数组形式,转换方法如下:
dataset.drop(columns: _*)
或 dataset.select(columns.map(col(_)): _*)