sqoop脚本批量生成

  • 通过all_tab_columnss字典表生成hive的建表语句

create or replace view create_sql as
–通过all_tab_columnss字典表生成hive的建表语句
select owner,table_name, case

     when nm = 1 then
      'create table ' || owner || '.' || TABLE_NAME || ' (' ||
      COLUMN_NAME || ' ' || DATA_TYPE || ','
     when np = 1 then
      COLUMN_NAME || ' ' || DATA_TYPE || ') partitioned by (dt string);'
     else
      COLUMN_NAME || ' ' || DATA_TYPE || ','
   end create_sql

from (

    SELECT OWNER,
            TABLE_NAME,
            COLUMN_NAME,
            CASE
              WHEN DATA_TYPE IN ('VARCHAR2','LONG','CLOB','CHAR','NCHAR','BLOB','VARCHAR2') THEN
               'STRING'
              WHEN DATA_TYPE IN ('NUMBER') then
               'DECIMAL' || '(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
              WHEN DATA_TYPE IN ('DATE', 'TIMESTAMP(6)') THEN
               'STRING'
              ELSE
               DATA_TYPE||'(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
            END DATA_TYPE,
            COLUMN_ID,
            NM,
            NP
      FROM (select t.*,
                    row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID) nm,
                    row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID desc) np
             
               from all_tab_columns t
              ))

ORDER BY OWNER, TABLE_NAME, COLUMN_ID;

二、生成sqoop任务的配置表
select
owner||table_name job_name, –任务名称
” table_type, –表类型dim 维表,fact_i 只增,Fact_iud 增删改
” partition, –hive表是否是分区表 0 非,1 是
‘BS’ source_db, –来源业务系统,一般用源表用户名或库名
owner||’.’||table_name source_table, –源表名称
” datatime_cloumn, –增量时间戳字段
‘APPEND’ incremental, –增量接入方式,append:增量接入,overwrite:全量接入
” SPLIT_BY, –并行字段,选择重复数据较少的字段
‘APP_NO’ row_key, –主键字段
‘fz_bs’ hive_db, –指定接入的hive库
table_name hive_table, –指定接入的hive表,必须是无数据库名的存表名
” check_column, –指定接入的源表字段
columns
from (
select owner,table_name,wm_concat(column_name)over(partition by owner,table_name order by column_id) columns,rn from (
select owner,table_name,column_name,column_id,row_number()over(partition by owner,table_name order by column_id desc) rn from all_tab_column
where owner||’_’||table_name in(
‘BS_S_FAULT_RPT’,
‘BS_ARC_S_FAULT_RPT’,
‘HIS_ARC_S_FAULT_RPT’,
‘BS_S_FAULT_HANDLE’,
‘BS_ARC_S_FAULT_HANDLE’,
‘HIS_ARC_S_FAULT_HANDLE’,
‘BS_S_RETVISIT’,
‘BS_ARC_S_RETVISIT’,
‘HIS_ARC_S_RETVISIT’,
‘BS_S_95598_WKST_RELA’,
‘HIS_S_95598_WKST_RELA’)
order by owner,table_name,column_id
))
where rn=1
order by owner,table_name;

  • 下面是自动生成sqoop配置任务的的python脚本

hive任务

!/usr/bin/python3

coding=utf-8

import json,os
def json_make(input_file=’./tablelist’):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hive-bin')
    os.system('mkdir  sqoopjob/impala-bin')
    os.system('mkdir  sqoopjob/partition-bin')
    os.system('mkdir  sqoopjob/job-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
hcf = open('./sqoopjob/hql_corntab.cron', 'w')
imcf=open('./sqoopjob/imql_corntab.cron', 'w')
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
#scmd=open('./sqoopjob/sqoopcmd.sh', 'w')
#scmd.write('''yesdate=`date -d last-day +%Y-%m-%d`;todday=`date +%Y-%m-%d`''')
#cf=open('./sqoopjob/cron.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            hivetruncate='''hive -e"use {hive_db};truncate table {hive_table}_new"\n'''.format(hive_db=job['hive_db'],hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-drop-import-delims \\
                    --hive-overwrite \\
                    --hive-table {hive_db}.{hive_table}_NEW \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    hive_db=job['hive_db'],
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper()
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            hql = '''hive -e "use {hive_db};
                create table {tmp_table} as
                select {columns} from
                (select t.*,row_number()over(partition by {pattition_by} ORDER BY  t.{order_by} desc) as rn
                from (select * from {table_name}_new union all select * from {table_name}) t
                ) tm
                where rn =1;
                drop table {table_name};
                alter table {table_name}_TMP rename to {table_name};
                "\n'''.format(
                                    hive_db=job['hive_db'],
                                    tmp_table=job['source_table'].replace('.', '_')+'_tmp',
                                    columns=job['columns'],
                                    pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
                                    order_by=job['datatime_cloumn'],
                                    table_name=job['source_table'].replace('.', '_'),
                                    ).replace('    ','')
            imql = '''impala-shell -i bigdata-w-001 -q "invalidate metadata;use {hive_db};
                create table {tmp_table} as
                select {columns} from
                (select t.*,row_number()over(partition by {pattition_by} ORDER BY  t.{order_by} desc) as rn
                from (select * from {table_name}_new union all select * from {table_name}) t
                ) tm
                where rn =1;
                drop table {table_name};
                alter table {table_name}_TMP rename to {table_name};
                "\n'''.format(
                                    hive_db=job['hive_db'],
                                    tmp_table=job['source_table'].replace('.', '_') + '_tmp',
                                    columns=job['columns'],
                                    pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
                                    order_by=job['datatime_cloumn'],
                                    table_name=job['source_table'].replace('.', '_'),
                                ).replace('    ','')
            #print(sjf)
            sjf.write(hivetruncate+createjob)
            df.write(deledrop)
            open('./sqoopjob/hive-bin/' + job['job_name'].lower() + '_hql.sh', 'w').write(kerboros + execjob  + hql)
            open('./sqoopjob/impala-bin/' + job['job_name'].lower() + '_imql.sh', 'w').write(kerboros + execjob  + imql)
            hcf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()
                    )
                    )
            imcf.write(
            '''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_imql.sh >>../logs/{job_name}_imql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(
                job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        elif job['table_type'].lower()=='fact_iud'and job['partition']=='0':
            # 处理增量事实表,有增删改查的事实表
            hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
                hive_db=job['hive_db'], hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-drop-import-delims \\
                    --hive-table {hive_db}.{hive_table} \\
                    --delete-target-dir \\
                    -m 2;\n'''.format(
                                            job_name=job['job_name'].lower(),
                                            sqoopmeta=sqoopmeta,
                                            jdbc=jdbc,
                                            hive_db=job['hive_db'],
                                            source_table=job['source_table'].upper(),
                                            split_by=job['split_by'].upper(),
                                            hive_table=job['hive_table'].upper(),
                                        ).replace('    ','')
            sjf.write(hivetruncate+createjob)
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ',
                                                                                                             '')
            open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
        elif job['table_type'].lower()=='fact_i'and job['partition']=='0':
            # 处理在线事实表,只有写入事务的事实表
            hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
                hive_db=job['hive_db'], hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                                    -- import --connect {jdbc} \\
                                    --table {source_table} \\
                                    --incremental append \\
                                    --split-by {split_by} \\
                                    --hive-import \\
                                    --hive-drop-import-delims \\
                                    --hive-table {hive_db}.{hive_table} \\
                                    --check-column  {check_column} \\
                                    --last-value '2011-01-01 11:00:00' \\
                                    -m 2;\n'''.format(
                                                                job_name=job['job_name'].lower(),
                                                                sqoopmeta=sqoopmeta,
                                                                jdbc=jdbc,
                                                                hive_db=job['hive_db'],
                                                                source_table=job['source_table'].upper(),
                                                                split_by=job['split_by'].upper(),
                                                                hive_table=job['hive_table'].upper(),
                                                                check_column=job['datatime_cloumn'].upper()
                                                            ).replace('    ', '')
            sjf.write(hivetruncate+createjob)
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ', '')
            open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))


        elif job['partition']=='1' and job['table_type'] in ['fact_i','fact_iud']:
            #处理带有where条件查询数据
            shell_cmd='''if [ $# -gt 1 ]; then
                        yesdate=$1
                        today=$2
                        var_len1=`echo ${yesdate} |wc -L`
                        var_len2=`echo ${today} |wc -L`
                        if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
                            echo 'vars is wrong'
                            echo 'var input like:2017-01-01 2017-01-21'
                            exit
                        fi
                    else
                        yesdate=`date -d "today -1 day " +%Y-%m-%d`
                        today=`date -d today +%Y-%m-%d`
                    fi

                echo "data:${yesdate}  ${today}"\n'''.replace('    ',' ')
            createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if  exists  partition (dt='$yesdate');
                    alter table {hive_table} add partition(dt='$yesdate') "
                    sqoop  import --connect {jdbc} \\
                    --table {source_table} \\
                    --where "{where} >= date'$yesdate' and {where}<date'$today' " \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-partition-key dt  \\
                    --hive-partition-value  $yesdate \\
                    --hive-drop-import-delims \\
                    --hive-table {hive_db}.{hive_table} \\
                    --delete-target-dir \\
                    -m 2;\n'''.format(
                                            job_name=job['job_name'].lower(),
                                            hive_db=job['hive_db'],
                                            sqoopmeta=sqoopmeta,
                                            jdbc=jdbc,
                                            source_table=job['source_table'].upper(),
                                            where=job['datatime_cloumn'],
                                            split_by=job['split_by'].upper(),
                                            hive_table=job['hive_table'].upper(),
                                        ).replace('    ','')
            #scmd.write(createjob)
            open('./sqoopjob/partition-bin/'+job['job_name'].lower()+'.sh', 'w').write(shell_cmd+createjob)
            open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
            crontabf.write(
                '''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
        elif job['partition'] == '1' and job['table_type'] in ['dim']:
            # 处理带有where条件查询数据
            shell_cmd = '''if [ $# -gt 1 ]; then
                    yesdate=$1
                    today=$2
                    var_len1=`echo ${yesdate} |wc -L`
                    var_len2=`echo ${today} |wc -L`
                    if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
                        echo 'vars is wrong'
                        echo 'var input like:2017-01-01 2017-01-21'
                        exit
                    fi
                else
                    yesdate=`date -d "today -1 day " +%Y-%m-%d`
                    today=`date -d today +%Y-%m-%d`
                fi

            echo "data:${yesdate}  ${today}"\n'''.replace('    ',' ')
            createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if  exists  partition (dt='$yesdate');
                       alter table {hive_table} add partition(dt='$yesdate') "
                       sqoop  import --connect {jdbc} \\
                       --table {source_table} \\
                       --split-by {split_by} \\
                       --hive-import \\
                       --hive-partition-key dt  \\
                       --hive-partition-value  $yesdate \\
                       --hive-drop-import-delims \\
                       --hive-table {hive_db}.{hive_table} \\
                       --delete-target-dir \\
                       -m 2;\n'''.format(
                                                job_name=job['job_name'].lower(),
                                                hive_db=job['hive_db'],
                                                sqoopmeta=sqoopmeta,
                                                jdbc=jdbc,
                                                source_table=job['source_table'].upper(),
                                                where=job['datatime_cloumn'],
                                                split_by=job['split_by'].upper(),
                                                hive_table=job['hive_table'].upper(),
                                                ).replace('    ', '')
            #scmd.write(createjob)
            open('./sqoopjob/partition-bin/' + job['job_name'].lower() + '.sh', 'w').write(shell_cmd+createjob)
            open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
sjf.close()
hcf.close()
imcf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__==’__main__’:

#生成json文件
json_make(input_file='./tablelist')
#生成sqoop脚本
create_job(tablelist='tablelist.json')


下面是生成hbase任务的脚本

!/usr/bin/python3

coding=utf-8

import json,os
def json_make(input_file=’./hbase_tablelist’):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hbase-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hbase-create-table \\
                    --hbase-table {hive_table} \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    --hbase-row-key {row_key} \\
                    --column-family cf \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper(),
                    row_key=job['row_key']
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            createtable=''' CREATE EXTERNAL TABLE {hive_table}({columns})

STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,cf:LAST_ANALYZED,cf:SAMPLE_SIZE,cf:CHARACTER_SET_NAME”)
TBLPROPERTIES(“hbase.table.name” = “{hive_table}”, “hbase.mapred.output.outputtable” = “{hive_table}”);
”’.format(hive_table=job[‘hive_table’],

       columns=job['colums'].split(',').join())
            #print(sjf)
            sjf.write(createjob)

            df.write(deledrop)
            open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        else :pass

sjf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__==’__main__’:

#生成json文件
json_make(input_file='./hbase_tablelist')
#生成sqoop脚本
create_job(tablelist='./hbase_tablelist.json')


hbase任务

!/usr/bin/python3

coding=utf-8

import json,os
def json_make(input_file=’./hbase_tablelist’):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hbase-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
createtablef=open('./sqoopjob/createtable.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hbase-create-table \\
                    --hbase-table {hive_table} \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    --hbase-row-key {row_key} \\
                    --column-family cf \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper(),
                    row_key=job['row_key']
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            createtable='''hive -e "use{hive_db};CREATE EXTERNAL TABLE {hive_table}_external(key string,{columns_hive})
                        STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
                        WITH SERDEPROPERTIES ('hbase.columns.mapping' = '{columns_hbase}')
                        TBLPROPERTIES('hbase.table.name' = '{hive_table}',
                        'hbase.mapred.output.outputtable' = '{hive_table}')";\n '''.format(
                                    hive_table=job['hive_table'],
                                    hive_db=job['hive_db'],
                                    columns_hive=' string,'.join(job['columns'].split(','))+' string',
                                    columns_hbase=':key,cf:'+',cf:'.join(job['columns'].split(','))
                                    ).replace('    ','')
            sjf.write(createjob)
            createtablef.write(createtable)
            df.write(deledrop)
            open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        else :pass

sjf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__==’__main__’:

#生成json文件
json_make(input_file='./hbase_tablelist')
#生成sqoop脚本
create_job(tablelist='./hbase_tablelist.json')

    原文作者:zyzloner
    原文地址: https://segmentfault.com/a/1190000016010796
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞