- 通过all_tab_columnss字典表生成hive的建表语句
create or replace view create_sql as
–通过all_tab_columnss字典表生成hive的建表语句
select owner,table_name, case
when nm = 1 then
'create table ' || owner || '.' || TABLE_NAME || ' (' ||
COLUMN_NAME || ' ' || DATA_TYPE || ','
when np = 1 then
COLUMN_NAME || ' ' || DATA_TYPE || ') partitioned by (dt string);'
else
COLUMN_NAME || ' ' || DATA_TYPE || ','
end create_sql
from (
SELECT OWNER,
TABLE_NAME,
COLUMN_NAME,
CASE
WHEN DATA_TYPE IN ('VARCHAR2','LONG','CLOB','CHAR','NCHAR','BLOB','VARCHAR2') THEN
'STRING'
WHEN DATA_TYPE IN ('NUMBER') then
'DECIMAL' || '(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
WHEN DATA_TYPE IN ('DATE', 'TIMESTAMP(6)') THEN
'STRING'
ELSE
DATA_TYPE||'(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
END DATA_TYPE,
COLUMN_ID,
NM,
NP
FROM (select t.*,
row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID) nm,
row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID desc) np
from all_tab_columns t
))
ORDER BY OWNER, TABLE_NAME, COLUMN_ID;
二、生成sqoop任务的配置表
select
owner||table_name job_name, –任务名称
” table_type, –表类型dim 维表,fact_i 只增,Fact_iud 增删改
” partition, –hive表是否是分区表 0 非,1 是
‘BS’ source_db, –来源业务系统,一般用源表用户名或库名
owner||’.’||table_name source_table, –源表名称
” datatime_cloumn, –增量时间戳字段
‘APPEND’ incremental, –增量接入方式,append:增量接入,overwrite:全量接入
” SPLIT_BY, –并行字段,选择重复数据较少的字段
‘APP_NO’ row_key, –主键字段
‘fz_bs’ hive_db, –指定接入的hive库
table_name hive_table, –指定接入的hive表,必须是无数据库名的存表名
” check_column, –指定接入的源表字段
columns
from (
select owner,table_name,wm_concat(column_name)over(partition by owner,table_name order by column_id) columns,rn from (
select owner,table_name,column_name,column_id,row_number()over(partition by owner,table_name order by column_id desc) rn from all_tab_column
where owner||’_’||table_name in(
‘BS_S_FAULT_RPT’,
‘BS_ARC_S_FAULT_RPT’,
‘HIS_ARC_S_FAULT_RPT’,
‘BS_S_FAULT_HANDLE’,
‘BS_ARC_S_FAULT_HANDLE’,
‘HIS_ARC_S_FAULT_HANDLE’,
‘BS_S_RETVISIT’,
‘BS_ARC_S_RETVISIT’,
‘HIS_ARC_S_RETVISIT’,
‘BS_S_95598_WKST_RELA’,
‘HIS_S_95598_WKST_RELA’)
order by owner,table_name,column_id
))
where rn=1
order by owner,table_name;
- 下面是自动生成sqoop配置任务的的python脚本
hive任务
!/usr/bin/python3
coding=utf-8
import json,os
def json_make(input_file=’./tablelist’):
#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]
# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
values = lines[line_num].split("\t")
parsed_datas.append(dict(zip(keys, values)))
line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'
# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))
def create_job(tablelist):
if os.path.exists('sqoopjob'):
os.system('rm -fr sqoopjob/*')
os.system('mkdir sqoopjob/hive-bin')
os.system('mkdir sqoopjob/impala-bin')
os.system('mkdir sqoopjob/partition-bin')
os.system('mkdir sqoopjob/job-bin')
else:
os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
hcf = open('./sqoopjob/hql_corntab.cron', 'w')
imcf=open('./sqoopjob/imql_corntab.cron', 'w')
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
#scmd=open('./sqoopjob/sqoopcmd.sh', 'w')
#scmd.write('''yesdate=`date -d last-day +%Y-%m-%d`;todday=`date +%Y-%m-%d`''')
#cf=open('./sqoopjob/cron.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'
with open(tablelist, 'r') as load_f:
load_dict = json.load(load_f)
for job in load_dict:
if job['table_type'].lower()=='dim' and job['partition']=='0':
#处理档案表
hivetruncate='''hive -e"use {hive_db};truncate table {hive_table}_new"\n'''.format(hive_db=job['hive_db'],hive_table=job['hive_table'])
createjob = '''sqoop job --create {job_name} --meta-connect {sqoopmeta} \\
-- import --connect {jdbc} \\
--table {source_table} \\
--incremental append \\
--split-by {split_by} \\
--hive-import \\
--hive-drop-import-delims \\
--hive-overwrite \\
--hive-table {hive_db}.{hive_table}_NEW \\
--check-column {check_column} \\
--last-value '2011-01-01 11:00:00' \\
-m 2;\n'''.format(
job_name=job['job_name'].lower(),
sqoopmeta=sqoopmeta,
jdbc=jdbc,
hive_db=job['hive_db'],
source_table=job['source_table'].upper(),
split_by=job['split_by'].upper(),
hive_table=job['hive_table'].upper(),
check_column=job['datatime_cloumn'].upper()
).replace(' ','')
execjob = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace(' ','')
deledrop = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace(' ','')
hql = '''hive -e "use {hive_db};
create table {tmp_table} as
select {columns} from
(select t.*,row_number()over(partition by {pattition_by} ORDER BY t.{order_by} desc) as rn
from (select * from {table_name}_new union all select * from {table_name}) t
) tm
where rn =1;
drop table {table_name};
alter table {table_name}_TMP rename to {table_name};
"\n'''.format(
hive_db=job['hive_db'],
tmp_table=job['source_table'].replace('.', '_')+'_tmp',
columns=job['columns'],
pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
order_by=job['datatime_cloumn'],
table_name=job['source_table'].replace('.', '_'),
).replace(' ','')
imql = '''impala-shell -i bigdata-w-001 -q "invalidate metadata;use {hive_db};
create table {tmp_table} as
select {columns} from
(select t.*,row_number()over(partition by {pattition_by} ORDER BY t.{order_by} desc) as rn
from (select * from {table_name}_new union all select * from {table_name}) t
) tm
where rn =1;
drop table {table_name};
alter table {table_name}_TMP rename to {table_name};
"\n'''.format(
hive_db=job['hive_db'],
tmp_table=job['source_table'].replace('.', '_') + '_tmp',
columns=job['columns'],
pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
order_by=job['datatime_cloumn'],
table_name=job['source_table'].replace('.', '_'),
).replace(' ','')
#print(sjf)
sjf.write(hivetruncate+createjob)
df.write(deledrop)
open('./sqoopjob/hive-bin/' + job['job_name'].lower() + '_hql.sh', 'w').write(kerboros + execjob + hql)
open('./sqoopjob/impala-bin/' + job['job_name'].lower() + '_imql.sh', 'w').write(kerboros + execjob + imql)
hcf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
job_name=job['job_name'].lower()
)
)
imcf.write(
'''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_imql.sh >>../logs/{job_name}_imql.out 2>&1\n'''.format(
job_name=job['job_name'].lower()))
execjob = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--exec {job_name};\n'''.format(
job_name=job['job_name'].lower()).replace(' ','')
open('./sqoopjob/exec_run.sh', 'a').write(execjob)
elif job['table_type'].lower()=='fact_iud'and job['partition']=='0':
# 处理增量事实表,有增删改查的事实表
hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
hive_db=job['hive_db'], hive_table=job['hive_table'])
createjob = '''sqoop job --create {job_name} --meta-connect {sqoopmeta} \\
-- import --connect {jdbc} \\
--table {source_table} \\
--split-by {split_by} \\
--hive-import \\
--hive-drop-import-delims \\
--hive-table {hive_db}.{hive_table} \\
--delete-target-dir \\
-m 2;\n'''.format(
job_name=job['job_name'].lower(),
sqoopmeta=sqoopmeta,
jdbc=jdbc,
hive_db=job['hive_db'],
source_table=job['source_table'].upper(),
split_by=job['split_by'].upper(),
hive_table=job['hive_table'].upper(),
).replace(' ','')
sjf.write(hivetruncate+createjob)
execjob = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace(' ',
'')
open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
open('./sqoopjob/exec_run.sh', 'a').write(execjob)
crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
job_name=job['job_name'].lower()))
elif job['table_type'].lower()=='fact_i'and job['partition']=='0':
# 处理在线事实表,只有写入事务的事实表
hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
hive_db=job['hive_db'], hive_table=job['hive_table'])
createjob = '''sqoop job --create {job_name} --meta-connect {sqoopmeta} \\
-- import --connect {jdbc} \\
--table {source_table} \\
--incremental append \\
--split-by {split_by} \\
--hive-import \\
--hive-drop-import-delims \\
--hive-table {hive_db}.{hive_table} \\
--check-column {check_column} \\
--last-value '2011-01-01 11:00:00' \\
-m 2;\n'''.format(
job_name=job['job_name'].lower(),
sqoopmeta=sqoopmeta,
jdbc=jdbc,
hive_db=job['hive_db'],
source_table=job['source_table'].upper(),
split_by=job['split_by'].upper(),
hive_table=job['hive_table'].upper(),
check_column=job['datatime_cloumn'].upper()
).replace(' ', '')
sjf.write(hivetruncate+createjob)
execjob = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace(' ', '')
open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
open('./sqoopjob/exec_run.sh', 'a').write(execjob)
crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
job_name=job['job_name'].lower()))
elif job['partition']=='1' and job['table_type'] in ['fact_i','fact_iud']:
#处理带有where条件查询数据
shell_cmd='''if [ $# -gt 1 ]; then
yesdate=$1
today=$2
var_len1=`echo ${yesdate} |wc -L`
var_len2=`echo ${today} |wc -L`
if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
echo 'vars is wrong'
echo 'var input like:2017-01-01 2017-01-21'
exit
fi
else
yesdate=`date -d "today -1 day " +%Y-%m-%d`
today=`date -d today +%Y-%m-%d`
fi
echo "data:${yesdate} ${today}"\n'''.replace(' ',' ')
createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if exists partition (dt='$yesdate');
alter table {hive_table} add partition(dt='$yesdate') "
sqoop import --connect {jdbc} \\
--table {source_table} \\
--where "{where} >= date'$yesdate' and {where}<date'$today' " \\
--split-by {split_by} \\
--hive-import \\
--hive-partition-key dt \\
--hive-partition-value $yesdate \\
--hive-drop-import-delims \\
--hive-table {hive_db}.{hive_table} \\
--delete-target-dir \\
-m 2;\n'''.format(
job_name=job['job_name'].lower(),
hive_db=job['hive_db'],
sqoopmeta=sqoopmeta,
jdbc=jdbc,
source_table=job['source_table'].upper(),
where=job['datatime_cloumn'],
split_by=job['split_by'].upper(),
hive_table=job['hive_table'].upper(),
).replace(' ','')
#scmd.write(createjob)
open('./sqoopjob/partition-bin/'+job['job_name'].lower()+'.sh', 'w').write(shell_cmd+createjob)
open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
crontabf.write(
'''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
job_name=job['job_name'].lower()))
elif job['partition'] == '1' and job['table_type'] in ['dim']:
# 处理带有where条件查询数据
shell_cmd = '''if [ $# -gt 1 ]; then
yesdate=$1
today=$2
var_len1=`echo ${yesdate} |wc -L`
var_len2=`echo ${today} |wc -L`
if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
echo 'vars is wrong'
echo 'var input like:2017-01-01 2017-01-21'
exit
fi
else
yesdate=`date -d "today -1 day " +%Y-%m-%d`
today=`date -d today +%Y-%m-%d`
fi
echo "data:${yesdate} ${today}"\n'''.replace(' ',' ')
createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if exists partition (dt='$yesdate');
alter table {hive_table} add partition(dt='$yesdate') "
sqoop import --connect {jdbc} \\
--table {source_table} \\
--split-by {split_by} \\
--hive-import \\
--hive-partition-key dt \\
--hive-partition-value $yesdate \\
--hive-drop-import-delims \\
--hive-table {hive_db}.{hive_table} \\
--delete-target-dir \\
-m 2;\n'''.format(
job_name=job['job_name'].lower(),
hive_db=job['hive_db'],
sqoopmeta=sqoopmeta,
jdbc=jdbc,
source_table=job['source_table'].upper(),
where=job['datatime_cloumn'],
split_by=job['split_by'].upper(),
hive_table=job['hive_table'].upper(),
).replace(' ', '')
#scmd.write(createjob)
open('./sqoopjob/partition-bin/' + job['job_name'].lower() + '.sh', 'w').write(shell_cmd+createjob)
open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
job_name=job['job_name'].lower()))
sjf.close()
hcf.close()
imcf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')
if __name__==’__main__’:
#生成json文件
json_make(input_file='./tablelist')
#生成sqoop脚本
create_job(tablelist='tablelist.json')
下面是生成hbase任务的脚本
!/usr/bin/python3
coding=utf-8
import json,os
def json_make(input_file=’./hbase_tablelist’):
#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]
# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
values = lines[line_num].split("\t")
parsed_datas.append(dict(zip(keys, values)))
line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'
# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))
def create_job(tablelist):
if os.path.exists('sqoopjob'):
os.system('rm -fr sqoopjob/*')
os.system('mkdir sqoopjob/hbase-bin')
else:
os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'
with open(tablelist, 'r') as load_f:
load_dict = json.load(load_f)
for job in load_dict:
if job['table_type'].lower()=='dim' and job['partition']=='0':
#处理档案表
createjob = '''sqoop job --create {job_name} --meta-connect {sqoopmeta} \\
-- import --connect {jdbc} \\
--table {source_table} \\
--incremental append \\
--split-by {split_by} \\
--hbase-create-table \\
--hbase-table {hive_table} \\
--check-column {check_column} \\
--last-value '2011-01-01 11:00:00' \\
--hbase-row-key {row_key} \\
--column-family cf \\
-m 2;\n'''.format(
job_name=job['job_name'].lower(),
sqoopmeta=sqoopmeta,
jdbc=jdbc,
source_table=job['source_table'].upper(),
split_by=job['split_by'].upper(),
hive_table=job['hive_table'].upper(),
check_column=job['datatime_cloumn'].upper(),
row_key=job['row_key']
).replace(' ','')
execjob = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace(' ','')
deledrop = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace(' ','')
createtable=''' CREATE EXTERNAL TABLE {hive_table}({columns})
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,cf:LAST_ANALYZED,cf:SAMPLE_SIZE,cf:CHARACTER_SET_NAME”)
TBLPROPERTIES(“hbase.table.name” = “{hive_table}”, “hbase.mapred.output.outputtable” = “{hive_table}”);
”’.format(hive_table=job[‘hive_table’],
columns=job['colums'].split(',').join())
#print(sjf)
sjf.write(createjob)
df.write(deledrop)
open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
job_name=job['job_name'].lower()))
execjob = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace(' ','')
open('./sqoopjob/exec_run.sh', 'a').write(execjob)
else :pass
sjf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')
if __name__==’__main__’:
#生成json文件
json_make(input_file='./hbase_tablelist')
#生成sqoop脚本
create_job(tablelist='./hbase_tablelist.json')
hbase任务
!/usr/bin/python3
coding=utf-8
import json,os
def json_make(input_file=’./hbase_tablelist’):
#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]
# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
values = lines[line_num].split("\t")
parsed_datas.append(dict(zip(keys, values)))
line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'
# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))
def create_job(tablelist):
if os.path.exists('sqoopjob'):
os.system('rm -fr sqoopjob/*')
os.system('mkdir sqoopjob/hbase-bin')
else:
os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
createtablef=open('./sqoopjob/createtable.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'
with open(tablelist, 'r') as load_f:
load_dict = json.load(load_f)
for job in load_dict:
if job['table_type'].lower()=='dim' and job['partition']=='0':
#处理档案表
createjob = '''sqoop job --create {job_name} --meta-connect {sqoopmeta} \\
-- import --connect {jdbc} \\
--table {source_table} \\
--incremental append \\
--split-by {split_by} \\
--hbase-create-table \\
--hbase-table {hive_table} \\
--check-column {check_column} \\
--last-value '2011-01-01 11:00:00' \\
--hbase-row-key {row_key} \\
--column-family cf \\
-m 2;\n'''.format(
job_name=job['job_name'].lower(),
sqoopmeta=sqoopmeta,
jdbc=jdbc,
source_table=job['source_table'].upper(),
split_by=job['split_by'].upper(),
hive_table=job['hive_table'].upper(),
check_column=job['datatime_cloumn'].upper(),
row_key=job['row_key']
).replace(' ','')
execjob = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace(' ','')
deledrop = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace(' ','')
createtable='''hive -e "use{hive_db};CREATE EXTERNAL TABLE {hive_table}_external(key string,{columns_hive})
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ('hbase.columns.mapping' = '{columns_hbase}')
TBLPROPERTIES('hbase.table.name' = '{hive_table}',
'hbase.mapred.output.outputtable' = '{hive_table}')";\n '''.format(
hive_table=job['hive_table'],
hive_db=job['hive_db'],
columns_hive=' string,'.join(job['columns'].split(','))+' string',
columns_hbase=':key,cf:'+',cf:'.join(job['columns'].split(','))
).replace(' ','')
sjf.write(createjob)
createtablef.write(createtable)
df.write(deledrop)
open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
job_name=job['job_name'].lower()))
execjob = '''sqoop job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
--exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace(' ','')
open('./sqoopjob/exec_run.sh', 'a').write(execjob)
else :pass
sjf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')
if __name__==’__main__’:
#生成json文件
json_make(input_file='./hbase_tablelist')
#生成sqoop脚本
create_job(tablelist='./hbase_tablelist.json')