python实现sqlserver表导出为excel

文章目录

前言

使用python完成将sqlserver数据库中的表导出为excel,并将文件打包成zip格式压缩包,添加定时器功能,实现每日十点定时生成文件并打包

一、将sqlserver导出为excel

# -*- coding: utf-8 -*-
from cgi import print_arguments
from datetime import datetime
import pymssql #引入pymssql模块 
import xlwt #引入xlwt模块 
import os
import random
 
# current runng file path
# os.path.abspath() 是 os 模块当中的一个函数,这个函数接收一个 path 路径对象,返回 path 标准化的绝对路径。current_dir==e:\code\Python\newExcel\
current_dir = os.path.abspath(os.path.dirname(__file__))+"\\newExcel"+"\\"
connect = pymssql.connect('192.168.1.1:1433', 'haha', '123', 'test',charset="utf8") #服务器名,端口,账户,密码,数据库名
 
def export_excel():
   
    if connect:
        print("连接成功!")
 
    cursor = connect.cursor()   #创建一个游标对象,python里的sql语句都要通过cursor来执行
    
    sql = "select name from sysobjects where xtype='U'"  #获取数据库中所有的表
    cursor.execute(sql)   #执行sql语句
    responses = cursor.fetchall()
    for response in responses:
        res=''.join(response) # res为数据表的名名称 字符串形式
        cursor.execute('select * FROM [%s] where 1=1'%res) #对单个表进行处理,获取表中内容
        fields = [field[0] for field in cursor.description]  # 获取所有字段名
        all_data = cursor.fetchall()  # 所有数据
        aa=datetime.now().strftime('%Y-%m-%d')
        # 写入excel
        book = xlwt.Workbook()
        sheet = book.add_sheet('sheet1')

        for col,field in enumerate(fields):
            sheet.write(0,col,field)

        row = 1
        for data in all_data:
            for col,field in enumerate(data):
                sheet.write(row,col,field)
            row += 1
        book.save(current_dir+res+"_"+"%s" % aa+"_"+str(random.random())+".xls")
 
    print("Export to excel success!")
   
 
if __name__ == '__main__':
    # export data from SQL server
    export_excel()
 
    # close database connection
    connect.close() 

二、当表中数据量巨大时

对于表中数据量巨大时(内容超过一百万条数据时,使用多个sheet)
使用xlwt 库大概一个sheet中能导出五六万条数据?,而openpyxl大概一百万,当数据量巨大时,需要多个sheet

# 导出excel函数
def export_excel():
   
    if connect:
        print("连接成功!")

    cursor = connect.cursor()   #创建一个游标对象,python里的sql语句都要通过cursor来执行
    
    sql = "select name from sysobjects where xtype='U'"
    cursor.execute(sql)   #执行sql语句
    responses = cursor.fetchall()
    maxrow=0
    for response in responses:
        res=''.join(response)
        #获取每一个表的行数
        cursor.execute('select count(*) as rows from [%s]'%res)
        rows = cursor.fetchall()
        # print(type(rows[0][0]))
        if maxrow<rows[0][0]:
            maxrow = rows[0][0]

    for response in responses:
        res=''.join(response)
        print('表名:'+res)

        cursor.execute('select * FROM [%s] where 1=1'%res)
        fields = [field[0] for field in cursor.description]  # 获取所有字段名
        all_data = cursor.fetchall()  # 所有数据
        curtime=datetime.datetime.now().strftime('%Y-%m-%d')
        r=int(maxrow/1000000) #每一个sheet可以容纳一百万条数据
        # 写入excel
        book = openpyxl.Workbook()
        sheet=[]
        for i in range(r+1):
            sheet.append(book.create_sheet(index=i))
            # print(sheet[i])
        
        for col,field in enumerate(fields):
            for i in range(r+1):
                sheet[i].cell(1,col+1,field) #row的第一行为名称,从第二行起才是值

        row = 2
        i=0
        for data in all_data:
            for col,field in enumerate(data):
                field=ILLEGAL_CHARACTERS_RE.sub(r'', str(field))
                try:
                    field=field.encode('latin1').decode('gbk')
                except:
                    print(field)
                #field=field.encode('utf8','ignore').decode('gbk')
                sheet[i].cell(row,col+1,field) 
            row += 1
            if row>=1000002:
                i+=1
                row=2
        book.save(current_dir+'/'+res+"_"+"%s" % curtime+"_"+".xls")
        #open(current_dir+'/'+res+"_"+"%s" % curtime+"_"+".xls","r",encoding='GBK')
        
    
    print("Export to excel success!")
   
   
 
if __name__ == '__main__':
    # export data from SQL server
    export_excel()
 
    # close database connection
    connect.close() 

三、加入定时器

代码如下(示例):

import datetime
import threading
marktime=" 9:24:20" #启动的时间,最前面有个空格不要删除
 
 
# 运行函数
def func():
    # 在这里加你的函数即可,86400是3600*24 
    print("haha")
    timer = threading.Timer(86400, func)
    timer.start()
# preFun预处理函数 
def preFun():
    now_time = datetime.datetime.now()
    marktimes = datetime.datetime.strptime(str(now_time.date()) + marktime, "%Y-%m-%d %H:%M:%S")
    if (now_time <= marktimes):
        next_time = marktimes
        print("今日" + marktime + '执行代码')
    else:
        # 明日启动
        next_time = now_time + datetime.timedelta(days=+1)
        print("明日" + marktime + '执行代码')
    next_year = next_time.date().year
    next_month = next_time.date().month
    next_day = next_time.date().day
 
    next_time = datetime.datetime.strptime(str(next_year) +
                                           "-" + str(next_month) +
                                           "-" + str(next_day) + marktime,
                                           "%Y-%m-%d %H:%M:%S")
    timer_start_time = (next_time - now_time).total_seconds()
    return timer_start_time
def main():
    timer_start_time=preFun()
    timer = threading.Timer(timer_start_time, func)
    timer.start()
    print('冷启动后启动func的时间',timer_start_time)
    pass
if __name__ == '__main__':
    main()
 

四、打包成zip压缩包

满足将导出的excel文件打包成zip的需求

#打包目录为zip文件(未压缩)
#filedir为文件目录
def zip_file(filedir):
    """ 压缩文件夹至同名zip文件 """
    file_news = filedir + '.zip'
    z = zipfile.ZipFile(file_news,'w',zipfile.ZIP_DEFLATED) #参数一:文件夹名
    for dirpath, dirnames, filenames in os.walk(filedir):
        fpath = dirpath.replace(filedir,'') #这一句很重要,不replace的话,就从根目录开始复制
        fpath = fpath and fpath + os.sep or ''#这句话理解我也点郁闷,实现当前文件夹以及包含的所有文件的压缩
        for filename in filenames:
            z.write(os.path.join(dirpath, filename),fpath+filename)
    z.close()

五、删除某一目录下的所有文件

需求为每日定时讲数据表导出为excel,因此需要将昨日产生的excel删除

def del_files(path_file):
    ls = os.listdir(path_file)
    for i in ls:
        f_path = os.path.join(path_file, i)
        # 判断是否是一个目录,若是,则递归删除
        if os.path.isdir(f_path):
            del_files(f_path)
        else:
            os.remove(f_path)

六、完整代码实例

# -*- coding: utf-8 -*-
from base64 import encode
from datetime import datetime
from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE
import pymssql #引入pymssql模块 
import openpyxl
import os
import datetime
import threading
import zipfile

#几点开始执行 有个空格不要删除
marktime=" 22:00:00"
current_dir = 'E:/apps/temp'
connect = pymssql.connect('127.0.0.1:1466', 'sa', 'xxx', 'xxx',charset="utf8") #服务器名,端口,账户,密码,数据库名
 
# 运行函数
def func():
    if os.path.exists(current_dir):
        print("临时目录已经存在")
    else:
        os.mkdir(current_dir)
    # 删除临时目录下的所有文件
    del_files(current_dir)
    # 生成excle的函数
    export_excel()
    # 将生成的excel压缩成zip
    zip_file(current_dir)  #指定要压缩的文件夹路径
    #86400是3600*24 Timer第一个参数指定时间(秒)
    timer = threading.Timer(86400, func)  
    timer.start()

# preFun预处理函数 
def preFun():
    now_time = datetime.datetime.now()
    marktimes = datetime.datetime.strptime(str(now_time.date()) + marktime, "%Y-%m-%d %H:%M:%S")
    if (now_time <= marktimes):
        next_time = marktimes
        print("今日" + marktime + '执行代码')
    else:
        # 明日启动
        next_time = now_time + datetime.timedelta(days=+1)
        print("明日" + marktime + '执行代码')
    next_year = next_time.date().year
    next_month = next_time.date().month
    next_day = next_time.date().day
   
    next_time = datetime.datetime.strptime(str(next_year) +
                                           "-" + str(next_month) +
                                           "-" + str(next_day) + marktime,
                                           "%Y-%m-%d %H:%M:%S")
    timer_start_time = (next_time - now_time).total_seconds()
    return timer_start_time

# 导出excel函数
def export_excel():
   
    if connect:
        print("连接成功!")

    cursor = connect.cursor()   #创建一个游标对象,python里的sql语句都要通过cursor来执行
    
    sql = "select name from sysobjects where xtype='U'"
    cursor.execute(sql)   #执行sql语句
    responses = cursor.fetchall()
    maxrow=0
    for response in responses:
        res=''.join(response)
        #获取每一个表的行数
        cursor.execute('select count(*) as rows from [%s]'%res)
        rows = cursor.fetchall()
        # print(type(rows[0][0]))
        if maxrow<rows[0][0]:
            maxrow = rows[0][0]

    for response in responses:
        res=''.join(response)
        print('表名:'+res)

        cursor.execute('select * FROM [%s] where 1=1'%res)
        fields = [field[0] for field in cursor.description]  # 获取所有字段名
        all_data = cursor.fetchall()  # 所有数据
        curtime=datetime.datetime.now().strftime('%Y-%m-%d')
        r=int(maxrow/1000000) #每一个sheet可以容纳一百万条数据
        # 写入excel
        book = openpyxl.Workbook()
        sheet=[]
        for i in range(r+1):
            sheet.append(book.create_sheet(index=i))
            # print(sheet[i])
        
        for col,field in enumerate(fields):
            for i in range(r+1):
                sheet[i].cell(1,col+1,field) #row的第一行为名称,从第二行起才是值

        row = 2
        i=0
        for data in all_data:
            for col,field in enumerate(data):
                field=ILLEGAL_CHARACTERS_RE.sub(r'', str(field))
                try:
                    field=field.encode('latin1').decode('gbk')
                except:
                    print(field)
                #field=field.encode('utf8','ignore').decode('gbk')
                sheet[i].cell(row,col+1,field) 
            row += 1
            if row>=1000002:
                i+=1
                row=2
        book.save(current_dir+'/'+res+"_"+"%s" % curtime+"_"+".xls")
        #open(current_dir+'/'+res+"_"+"%s" % curtime+"_"+".xls","r",encoding='GBK')
        
    
    print("Export to excel success!")
   
#打包目录为zip文件(未压缩)
def zip_file(filedir):
    """ 压缩文件夹至同名zip文件 """
    file_news = filedir + '.zip'
    z = zipfile.ZipFile(file_news,'w',zipfile.ZIP_DEFLATED) #参数一:文件夹名
    for dirpath, dirnames, filenames in os.walk(filedir):
        fpath = dirpath.replace(filedir,'') #这一句很重要,不replace的话,就从根目录开始复制
        fpath = fpath and fpath + os.sep or ''#这句话理解我也点郁闷,实现当前文件夹以及包含的所有文件的压缩
        for filename in filenames:
            z.write(os.path.join(dirpath, filename),fpath+filename)
    z.close()

#删除某一目录下的所有文件
def del_files(path_file):
    ls = os.listdir(path_file)
    for i in ls:
        f_path = os.path.join(path_file, i)
        # 判断是否是一个目录,若是,则递归删除
        if os.path.isdir(f_path):
            del_files(f_path)
        else:
            os.remove(f_path)


def main():
    timer_start_time=preFun()
    timer = threading.Timer(timer_start_time, func)
    timer.start()
    print('冷启动后启动func的时间',timer_start_time)
    # close database connection
    # connect.close() 
    pass

if __name__ == '__main__':
    main()

七、Python打包EXE

  1. 首先安装pyinstaller,使用安装命令:pip3 install pyinstaller
  2. cmd切换到想要打包的py文件所在目录,执行命令:pyinstaller-F 文件名.py
  3. 执行完毕之后,会生成几个文件夹,如下图所示。
    《python实现sqlserver表导出为excel》
    pyinstaller打包exe文件及过程中 no module named 问题处理
    《python实现sqlserver表导出为excel》

pyinstaller -F test.py –hidden-import openpyxl

八、openpyxl.utils.exceptions.IllegalCharacterError问题

数据写入时excel中含有异常字符报错
解决方法:

from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE
s = '谢谢你\x00\x00\x00\x00\t' # \x00为非法字符
print(s.encode())
s = ILLEGAL_CHARACTERS_RE.sub(r'', s)
print(s.encode())

输出结果:

b'\xe8\xb0\xa2\xe8\xb0\xa2\xe4\xbd\xa0\x00\x00\x00\x00'
b'\xe8\xb0\xa2\xe8\xb0\xa2\xe4\xbd\xa0'

可见\x00被处理掉了

九、解决中文乱码问题

在sqlserver数据库中输入
SELECT COLLATIONPROPERTY('Chinese_PRC_Stroke_CI_AI_KS_WS', 'CodePage') 查看结果为936
《python实现sqlserver表导出为excel》

936 简体中文GBK
950 繁体中文BIG5
437 美国/加拿大英语
932 日文
949 韩文
866 俄文
65001 unicode UFT-8

查看表可知使用的编码方式为GBK
因此修改开头连接出的charset=“GBK”

connect = pymssql.connect('192.168.1.1:1433', 'haha', '123', 'test',charset="GBK") #服务器名,端口,账户,密码,数据库名

《python实现sqlserver表导出为excel》
导出结果显示正常!
解决UnicodeDecodeError: ‘gbk’ codec can’t decode byte 0xb9 in position x: illegal multibyte sequence问题
第一种解决方法,增加encoding=‘UTF-8’:
FILE_OBJECT= open( 'train.txt','r', encoding='UTF-8' )
第二种方法,二进制读取:
FILE_OBJECT= open( 'train.txt', 'rb' )
因此将field=ILLEGAL_CHARACTERS_RE.sub(rb’’, str(field)) 改为rb

中文乱码问题补充醒:

问题描述🤷‍️:
对于connect = pymssql.connect(‘192.168.1.1:1433’, ‘haha’, ‘123’, ‘test’,charset=“GBK”) #服务器名,端口,账户,密码,数据库名
在数据库连接出如果设置charset=”utf8″则导出的excel会出现中文乱码问题,
但是如果设置charset=”GBK”则会报UnicodeDecodeError: ‘utf-8’ codec can’t decode byte 0xa3 in position 4: invalid start byte

处理方法啊:
在连接数据库时charset依然设置为”utf8″,在后面对数据进行写入的时候再转变编码格式
field=field.encode(‘latin1’).decode(‘gbk’) ,先将其latin1解码,然后再gbk编码
另外可以通过这样的方式来定位解码失败的行和行内容:

for data in all_data:
            for col,field in enumerate(data):
                #print(field)
                field=ILLEGAL_CHARACTERS_RE.sub(r'', str(field))
                try:
                    field=field.encode('latin1').decode('gbk')
                except:
                    print(field)
                sheet[i].cell(row,col+1,field) 
            row += 1
            if row>=1000002:
                i+=1
                row=2
        book.save(current_dir+'/'+res+"_"+"%s" % curtime+"_"+".xls")

遇到UnicodeDecodeError: ‘utf-8’ codec can’t decode byte 0xa3 in position 4: invalid start byte的几种处理方法
pymssql中文显示乱码

    原文作者:HoPE_st
    原文地址: https://blog.csdn.net/HoPE_er/article/details/125478949
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞