在开始正式数据处理之前,我觉得有必要去学习理解下UDF。
UDF
UDF全称User-Defined Functions
,用户自定义函数,是Spark SQL的一项功能,用于定义新的基于列的函数,这些函数扩展了Spark SQL的DSL用于转换数据集的词汇表。
我在databricks上找到一个比较简单理解的入门栗子:
Register the function as a UDF
val squared = (s: Int) => {
s * s
}
spark.udf.register("square", squared)
Call the UDF in Spark SQL
spark.range(1, 20).registerTempTable("test")
%sql select id, square(id) as id_squared from test
我理解就是先定义一个函数squared
,返回输入数字的平方,然后register,并绑定square
方法名为square
,然后就在Spark SQL中直接使用square
方法。
实例一:温度转化
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
object ScalaUDFExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Scala UDF Example")
val spark = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()
val ds = spark.read.json("temperatures.json")
ds.createOrReplaceTempView("citytemps")
// Register the UDF with our SparkSession spark.udf.register("CTOF", (degreesCelcius: Double) => ((degreesCelcius * 9.0 / 5.0) + 32.0))
spark.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
}
}
我们将定义一个 UDF 来将以下 JSON 数据中的温度从摄氏度(degrees Celsius)转换为华氏度(degrees Fahrenheit):
{"city":"St. John's","avgHigh":8.7,"avgLow":0.6}
{"city":"Charlottetown","avgHigh":9.7,"avgLow":0.9}
{"city":"Halifax","avgHigh":11.0,"avgLow":1.6}
{"city":"Fredericton","avgHigh":11.2,"avgLow":-0.5}
{"city":"Quebec","avgHigh":9.0,"avgLow":-1.0}
{"city":"Montreal","avgHigh":11.1,"avgLow":1.4}
...
实例二:时间转化
case class Purchase(customer_id: Int, purchase_id: Int, date: String, time: String, tz: String, amount:Double)
val x = sc.parallelize(Array(
Purchase(123, 234, "2007-12-12", "20:50", "UTC", 500.99),
Purchase(123, 247, "2007-12-12", "15:30", "PST", 300.22),
Purchase(189, 254, "2007-12-13", "00:50", "EST", 122.19),
Purchase(187, 299, "2007-12-12", "07:30", "UTC", 524.37)
))
val df = sqlContext.createDataFrame(x)
df.registerTempTable("df")
自定义函数
def makeDT(date: String, time: String, tz: String) = s"$date $time $tz"
sqlContext.udf.register("makeDt", makeDT(_:String,_:String,_:String))
// Now we can use our function directly in SparkSQL. sqlContext.sql("SELECT amount, makeDt(date, time, tz) from df").take(2)
// but not outside df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2) // fails
如果想要在SQL外面使用,必须通过spark.sql.function.udf
来创建UDF
import org.apache.spark.sql.functions.udf
val makeDt = udf(makeDT(_:String,_:String,_:String))
// now this works df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2)
实践操作
写一个UDF来将一些Int数字分类
val formatDistribution = (view: Int) => {
if (view < 10) {
"<10"
} else if (view <= 100) {
"10~100"
} else if (view <= 1000) {
"100~1K"
} else if (view <= 10000) {
"1K~10K"
} else if (view <= 100000) {
"10K~100K"
} else {
">100K"
}
}
注册:
session.udf.register("formatDistribution", UDF.formatDistribution)
SQL:
session.sql("select user_id, formatDistribution(variance_digg_count) as variance from video")
写到这里,再回顾UDF,我感觉这就像是去为了方便做一个分类转化等操作,和Python里面的函数一样,只不过这里的UDF一般特指Spark SQL里面使用的函数。然后发现这里和SQL中的自定义函数挺像的:
CREATE FUNCTION [函数所有者.]<函数名称>
(
-- 添加函数所需的参数,可以没有参数 [<@param1> <参数类型>]
[,<@param1> <参数类型>]…
)
RETURNS TABLE
AS
RETURN
(
-- 查询返回的SQL语句 SELECT查询语句
)
/* * 创建内联表值函数,查询交易总额大于1W的开户人个人信息 */
create function getCustInfo()
returns @CustInfo table --返回table类型
(
--账户ID
CustID int,
--帐户名称
CustName varchar(20) not null,
--身份证号
IDCard varchar(18),
--电话
TelePhone varchar(13) not null,
--地址
Address varchar(50) default('地址不详')
)
as
begin
--为table表赋值
insert into @CustInfo
select CustID,CustName,IDCard,TelePhone,Address from AccountInfo
where CustID in (select CustID from CardInfo
where CardID in (select CardID from TransInfo group by CardID,transID,TransType,TransMoney,TransDate having sum(TransMoney)>10000))
return
end
go
-- 调用内联表值函数 select * from getCustInfo()
go
好像有异曲同工之妙~
欢迎关注:Spark实战–学习UDF