Python 操作 MySQL

一. Python DB API

官方文档:https://www.python.org/dev/peps/pep-0249/

《Python 操作 MySQL》 Python DB API
《Python 操作 MySQL》 Python DB API 流程

二、数据库连接对象 connection

建立方法(两种方式是一样的):
①pymysql.connections
②pymysql.connect

常用参数:

  • host:MySQL 服务器地址 — 字符串
  • port:MySQL 服务器端口号 — 数字
  • user:用户名 — 字符串
  • passwd:密码 — 字符串
  • db:数据库名 — 字符串
  • charset:连接编码 — 字符串

connection 对象常用方法:

  • cursor():使用该连接创建并返回游标
  • commit():提交当前事务
  • rollback():回滚当前事务
  • close():关闭连接
  • 示例:
import pymysql


connect = pymysql.connect(host='127.0.0.1',
                          port=3306,
                          user='pythonic',
                          passwd='pythonic',
                          db='test',
                          charset='utf8')
cursor = connect.cursor()

print(cursor)
print(connect)

cursor.close()
connect.close()

示例输出结果:

<pymysql.cursors.Cursor object at 0x02FD9C10>
<pymysql.connections.Connection object at 0x02FD9CD0>

三、数据库游标对象 cursor

游标对象用于执行查询和获取结果
cursor 对象常用方法:

  • execute(op[,args]):执行一个数据库命令
  • fetchone():获取结果集下一行
  • fetchmany(size):获取结果集的下 size 行
  • fetchall():获取结果集中剩下的所有行
  • rowcount:最近一次执行 execute 返回的数据行数或影响行数
  • close():关闭游标对象

《Python 操作 MySQL》 cursor 对象 execute 方法
《Python 操作 MySQL》 cursor 对象 fetch* 方法和 rowcount 属性

四、实例演示

  1. 查询数据库(select)

《Python 操作 MySQL》 查询数据库

  • 示例

《Python 操作 MySQL》 数据库原始数据

import pymysql


connect = pymysql.connect(host='127.0.0.1',
                          port=3306,
                          user='pythonic',
                          passwd='pythonic',
                          db='test',
                          charset='utf8')
cursor = connect.cursor()

sql = 'select * from user'
result = cursor.execute(sql)
print(result)  # 事实上执行 cursor.execute() 方法的时候就会自动返回受影响行数,可以用一个变量进行接收
print(cursor.rowcount)  # 获取受影响行数
print(cursor.fetchone())  # 获取 1 条数据
print(cursor.fetchmany(3))  # 获取 3 条数据
print(cursor.fetchall())  # 获取剩下全部数据

cursor.close()
connect.close()
  • 示例输出结果
6
6
(1, 'name1')  # 结果为元组
((2, 'name2'), (3, 'name3'), (4, 'name4'))
((5, 'name5'), (6, 'name6'))

通过以上示例可以发现,fetch*() 方法返回的结果是元组或二维元组

  • 示例
import pymysql


connect = pymysql.connect(host='127.0.0.1',
                          port=3306,
                          user='pythonic',
                          passwd='pythonic',
                          db='test',
                          charset='utf8')
cursor = connect.cursor()

sql = 'select * from user'
cursor.execute(sql)
for row in cursor.fetchall():
    print(f'userid:{row[0]} -- username:{row[1]}')
    # print('userid:%s -- username:%s' % row)

cursor.close()
connect.close()
  • 示例输出结果
userid:1 -- username:name1
userid:2 -- username:name2
userid:3 -- username:name3
userid:4 -- username:name4
userid:5 -- username:name5
userid:6 -- username:name6
  1. 更新数据库(insert/update/delete)

《Python 操作 MySQL》 更新数据库
《Python 操作 MySQL》 事务

  • pymysql 默认情况下会开启事务

  • 示例

《Python 操作 MySQL》 原始数据

import pymysql


connect = pymysql.connect(host='127.0.0.1',
                          port=3306,
                          user='pythonic',
                          passwd='pythonic',
                          db='test',
                          charset='utf8')
cursor = connect.cursor()

sql_insert = 'insert into user values(7, "name7")'
sql_update = 'update user set username="66" where userid="6"'
sql_delete ='delete from user where userid<3'

cursor.execute(sql_insert)
print(cursor.rowcount)

cursor.execute(sql_update)
print(cursor.rowcount)

cursor.execute(sql_delete)
print(cursor.rowcount)

connect.commit()

cursor.close()
connect.close()
  • 示例输出结果
1
1
2

《Python 操作 MySQL》 执行程序后结果数据

  • 示例

《Python 操作 MySQL》 原始数据

import pymysql


connect = pymysql.connect(host='127.0.0.1',
                          port=3306,
                          user='pythonic',
                          passwd='pythonic',
                          db='test',
                          charset='utf8')
cursor = connect.cursor()

sql_insert = 'insert into user values(7, "name7")'
sql_update = 'update user set username="66" where userid="6"'
sql_delete ='delete from user where userd<3'  # 错误的 sql 语句,没有 userd 这个字段

try:
    cursor.execute(sql_insert)
    print(cursor.rowcount)

    cursor.execute(sql_update)
    print(cursor.rowcount)

    cursor.execute(sql_delete)  # 执行错误的 sql 语句会出现异常
    print(cursor.rowcount)

    connect.commit()
except Exception as e:
    print(e)
    connect.rollback()  # 捕获到异常并回滚事务

cursor.close()
connect.close()
  • 示例输出结果
1
1
(1054, "Unknown column 'userd' in 'where clause'")

《Python 操作 MySQL》 执行错误的 sql 语句并进行 rollback 后结果并没有发生变化

  • 示例[关于事务]

《Python 操作 MySQL》 原始数据

import pymysql


connect = pymysql.connect(host='127.0.0.1',
                          port=3306,
                          user='pythonic',
                          passwd='pythonic',
                          db='test',
                          charset='utf8',
                          autocommit=True)  # 可以通过传递关键字参数的方法开启或关闭事务
# 可以通过connect.autocommit() 方法开启或关闭事务
# 默认 False,事务为开启状态,改为 True 可以关闭事务
# connect.autocommit(True)
cursor = connect.cursor()

sql_insert = 'insert into user values(7, "name7")'

cursor.execute(sql_insert)
print(cursor.rowcount)

# connect.commit()  # 关闭事务后不需要显式的调用 connect.commit() 来进行提交


cursor.close()
connect.close()
  • 示例输出结果
1

《Python 操作 MySQL》 执行关闭事务程序后结果数据

五、实例演示 — 模拟银行转账

  • 账户 1 给账户 2 转账 100 元

《Python 操作 MySQL》 账户 1 给账户 2 转账 100 元

  • 示例

《Python 操作 MySQL》 原始数据

import sys
import pymysql


connect = pymysql.connect(host='127.0.0.1',
                          port=3306,
                          user='pythonic',
                          passwd='pythonic',
                          db='test',
                          charset='utf8',
                          autocommit=False)


class TransferMoney(object):

    def __init__(self, conn):
        self.conn = conn

    def check_account_available(self, accountid):
        """
            检查账号是否正常
        Args:
            accountid: 账号 id
        Returns:
            None
        """
        cursor = self.conn.cursor()
        try:
            sql = 'select * from account where accountid=%s' % accountid
            cursor.execute(sql)
            result = cursor.fetchall()
            print(f'check_account_available: {sql}')
            if len(result) != 1:
                raise Exception('账号 %s 不存在' % accountid)
        finally:
            cursor.close()

        # with self.conn.cursor as cursor:
        #     sql = 'select * from account where sccountid=%s' % accountid
        #     cursor.execute(sql)
        #     result = cursor.fetchall()
        #     if len(result) != 1:
        #         raise Exception('账号 %s 不存在' % accountid)

    def have_enough_money(self, accountid, money):
        """
            判断账号是否有足够的钱
        Args:
            accountid: 账号 id
            money: 需要判断是否满足的金额
        Returns:
            None
        """
        cursor = self.conn.cursor()
        try:
            sql = 'select * from account where accountid=%s and money>%s' % (accountid, money)
            cursor.execute(sql)
            result = cursor.fetchall()
            print(f'have_enough_money: {sql}')
            if len(result) != 1:
                raise Exception('账号 %s 余额不足' % accountid)
        finally:
            cursor.close()

    def reduce_money(self, accountid, money):
        """
            减款
        Args:
            accountid: 减款的账号 id 
            money: 减款金额
        Returns:
            None
        """
        cursor = self.conn.cursor()
        try:
            sql = 'update account set money=money-%s where accountid=%s' % (money, accountid)
            cursor.execute(sql)
            print(f'reduce_money: {sql}')
            if cursor.rowcount != 1:
                raise Exception('账号 %s 减款失败' % accountid)
        finally:
            cursor.close()

    def add_money(self, accountid, money):
        """
            加款
        Args:
            accountid: 加款的账号 id 
            money: 加款金额
        Returns:
            None
        """
        cursor = self.conn.cursor()
        try:
            sql = 'update account set money=money+%s where accountid=%s' % (money, accountid)
            cursor.execute(sql)
            print(f'add_money: {sql}')
            if cursor.rowcount != 1:
                raise Exception('账号 %s 加款失败' % accountid)
        finally:
            cursor.close()

    def transfer(self, source_accountid, target_accountid, money):
        """
            转账
        Args:
            source_accountid: 转账发起者账号 id
            target_accountid: 转账接收者账账号 id
            money: 转账金额
        Returns:
            None
        """
        try:
            self.check_account_available(source_accountid)
            self.check_account_available(target_accountid)
            self.have_enough_money(source_accountid, money)
            self.reduce_money(source_accountid, money)
            self.add_money(target_accountid, money)
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            raise e


if __name__ == '__main__':
    source_accountid = sys.argv[1]  # 接收命令行参数
    target_accountid = sys.argv[2]
    money = sys.argv[3]
    transfer_money = TransferMoney(connect)
    try:
        transfer_money.transfer(source_accountid, target_accountid, money)
    except Exception as e:
        print(f'转账出错:{e}')
        connect.rollback()
  • 配置命令行参数

《Python 操作 MySQL》 配置命令行参数

  • 示例输出结果
check_account_available: select * from account where accountid=1
check_account_available: select * from account where accountid=2
have_enough_money: select * from account where accountid=1 and money>100
reduce_money: update account set money=money-100 where accountid=1
add_money: update account set money=money+100 where accountid=2

《Python 操作 MySQL》 运行程序后成功转账后的结果数据

  • 此时如果对结果数据再次运行程序,会输出转账失败
check_account_available: select * from account where accountid=1
check_account_available: select * from account where accountid=2
have_enough_money: select * from account where accountid=1 and money>100
转账出错:账号 1 余额不足

总结

《Python 操作 MySQL》 总结

    原文作者:CursiveS
    原文地址: https://www.jianshu.com/p/e508c2049f48
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞