•If函数: if
•非空查找函数: COALESCE
•条件判断函数:CASE
• If 函数
语法: if(boolean testCondition, T valueTrue, T valueFalseOrNull)
返回值: T
说明: 当条件testCondition为TRUE时,返回valueTrue;否则返回valueFalseOrNull
举例:
hive> select if(1=2,100,200) from dual;
200
hive> select if(1=1,100,200) from dual;
100
• 非空查找函数 : COALESCE
COALESCE使用时和AS搭配使用,对于合并数据列非常有用。
语法: COALESCE(T v1, T v2, …)
返回值: T
说明: 返回参数中的第一个非空值;如果所有值都为NULL,那么返回NULL
举例:
hive> select COALESCE(null,'100','50′) from dual;
100
条件判断函数: CASE
语法 : CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END
返回值 : T
说明:如果 a 等于 b ,那么返回 c ;如果 a 等于 d ,那么返回 e ;否则返回 f
举例:
hive> Select case 100 when 50 then 'tom' when 100 then 'mary' else 'tim' end from dual;
mary
code例子
############################################################################
'''
# function:求取筛选字段
# 功能:求取两个表的存储字段,先取第一个表特有的字段,再取第二个表特有的字段,最后取两个表字段的交集(如果第一个表的该字段的值为空,则取第二个该字段的该值)
# 创建时间:2017/08/16
'''
#############################################################################
def colChoose(first_tb,second_tb):
"""两个表字段的全集等于两个表之间相互之间的差集和一个交集这三者之和"""
# 构建需要筛选的差集
col_difference_A = str(",".join([tmp for tmp in ['A.' +
i for i in list(set(hiveContext.sql("SELECT * FROM %s"%first_tb).columns). \
difference(set(hiveContext.sql("SELECT * FROM %s"%second_tb).columns)))]]))
col_difference_B = str(",".join([tmp for tmp in ['B.' +
i for i in list(set(hiveContext.sql("SELECT * FROM %s"%second_tb).columns). \
difference(set(hiveContext.sql("SELECT * FROM %s"%first_tb).columns)))]]))
# 求两个表之间的交集
col_intersection_A = [tmp for tmp in ['A.' + i for i in list(set(hiveContext.sql("SELECT * FROM %s"%first_tb).columns).intersection(
set(hiveContext.sql("SELECT * FROM %s"%second_tb).columns)))]]
col_intersection_B = [tmp for tmp in ['B.' +i for i in list(set(hiveContext.sql("SELECT * FROM %s"%first_tb).columns).intersection(
set(hiveContext.sql("SELECT * FROM %s"%second_tb).columns)))]]
col_intersection = [i for i in list(set(hiveContext.sql("SELECT * FROM %s"%first_tb).columns).intersection(
set(hiveContext.sql("SELECT * FROM %s"%second_tb).columns)))]
# col_intersection_AB = str(",".join([tmp for tmp in ["COALESCE" + str(i) for i in map(None, col_intersection_A,col_intersection_B)]]))
# col_intersection_AB = str(",".join([tmp for tmp in list(map(lambda x,y : x + " " + "AS" + " " + y, ["COALESCE" + str(i) for i in map(None, col_intersection_A,col_intersection_B)],col_intersection))]))
col_intersection_AB = str(",".join([tmp for tmp in list(map(lambda x,y,z : "COALESCE(" + x + "," + y + ")" + " " + "AS" + " " + z, col_intersection_A,col_intersection_B,col_intersection))]))
# col_interset = str(",".join([tmp for tmp in ["IFNULL" + str(i) for i in map(None, col_interset_A,col_interset_B)]])) sql语句中含有IFNULL,hive中并没有,hive中用COALESCE。
return col_difference_A,col_difference_B,col_intersection_AB
# 使用示例
# 1. colChoose("tb_source_data.loan_applications","tb_source_data.user_profiles")
# 2. print colChoose("tb_source_data.loan_applications","tb_source_data.user_profiles")
# 3. 其他函数调用:col_difference_A,col_difference_B,col_intersection_AB=colChoose(first_tb,second_tb)
############################################################################
'''
# function:join_two_tables
# 功能:可以用来聚合loan_application和user_profiles的三张表,(包含loan_application&user_profiles,nanyue_loan_application&nanyue_user_profiles,partner_loan_application&partner_user_profiles)
# 创建时间:2017/08/11 修改时间:2017/08/16
'''
#############################################################################
def joinLoanApp_UserProfilesTable(first_tb,second_tb,store_tb,first_tb_on_col,second_tb_on_col,joinway):
begin = datetime.datetime.now()
hiveContext.sql("DROP TABLE IF EXISTS %s PURGE"%store_tb)
col_difference_A, col_difference_B, col_intersection_AB = colChoose(first_tb,second_tb)
sql_join = """
SELECT %s, %s, %s
FROM
(SELECT * FROM
(SELECT *, row_number() OVER (PARTITION BY %s ORDER BY created_at DESC) AS num FROM %s) t1
WHERE t1.num=1 AND %s IS NOT NULL) A
%s JOIN
(SELECT * FROM
(SELECT *, row_number() OVER (PARTITION BY %s ORDER BY created_at DESC) AS num FROM %s) t2
WHERE t2.num=1 AND %s IS NOT NULL) B
ON A.%s=B.%s """%(col_difference_A, col_difference_B, col_intersection_AB, first_tb_on_col, first_tb, first_tb_on_col,joinway, second_tb_on_col, second_tb, second_tb_on_col, first_tb_on_col, second_tb_on_col)
print "-----------建表语句-----------"
print sql_join
print "-----------开始join表-----------"
hiveContext.sql(sql_join).write.saveAsTable("%s"%store_tb)
print "-----------join表结束-----------"
end = datetime.datetime.now()
print "耗时:",end-begin
# 使用示例
# joinLoanApp_UserProfilesTable("tb_source_data.loan_applications","tb_source_data.user_profiles","stage_data.loan_application_join_user_profiles","user_profile_id","id","FULL")