Essential_SQLAlchemy2th学习笔记之Core模块

SQL Expression Language对原生SQL语言进行了简单的封装
两大模块SQLAlchemy Core and ORM:

  • Core:提供执行SQL Expression Language的接口

  • ORM

安装:SQLAlchemy及相关数据库驱动
pip install sqlalchemy pymysql

连接到数据库

数据库连接字符串格式:请参考这里

mysql://username:password@hostname/database
postgresql://username:password@hostname/database
sqlite:////absolute/path/to/database
oracle://scott:tiger@127.0.0.1:1521/orcl

比如SQLite如下:

from sqlalchemy import create_engine
engine = create_engine('sqlite:///cookies.db')
engine2 = create_engine('sqlite:///:memory:')
engine3 = create_engine('sqlite:////home/cookiemonster/cookies.db')
engine4 = create_engine('sqlite:///c:\\Users\\cookiemonster\\cookies.db')

注意:create_engine函数返回以一个engine实例,但是不会立即获取数据库连接,直到在engine上进行操作如查询时才会去获取connection

关于MySQL空闲连接8小时自动关闭的解决方案:传入 pool_recycle=3600参数

from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://cookiemonster:chocolatechip@mysql01.monster.internal/cookies', pool_recycle=3600)

create_engine其余的一些参数:

  • echo:是否log打印执行的sql语句及其参数。默认为False

  • encoding:默认utf-8

  • isolation_level:隔离级别

  • pool_recycle:指定连接回收间隔,这对于MySQL连接的8小时机制特别重要。默认-1

获取连接

from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://cookiemonster:chocolatechip' \
'@mysql01.monster.internal/cookies', pool_recycle=3600)
connection = engine.connect()

Schema and Types

四种类型集合:
• Generic
• SQL standard
• Vendor specific
• User defined

SQLAlchemy定义了很多generic types以兼容不同数据库。这些类型都定义在sqlalchemy.types模块中,为了方便也可以从sqlalchemy直接导入这些类型。
类型对应表如下:

SQLAlchemyPythonSQL
BigIntegerintBIGINT
BooleanboolBOOLEAN or SMALLINT
Datedatetime.dateDATE (SQLite: STRING)
DateTimedatetime.datetimeDATETIME (SQLite: STRING)
EnumstrENUM or VARCHAR
Floatfloat or DecimalFLOAT or REAL
IntegerintINTEGER
Intervaldatetime.timedeltaINTERVAL or DATE from epoch
LargeBinarybyteBLOB or BYTEA
Numericdecimal.DecimalNUMERIC or DECIMAL
UnicodeunicodeUNICODE or VARCHAR
TextstrCLOB or TEXT
Timedatetime.timeDATETIME

如果这些类型不能满足你,比如有些数据库支持json类型,那么你需要用到sqlalchemy.dialects模块中对应数据库的类型。比如from sqlalchemy.dialects.postgresql import JSON

Metadata & Table & Column

Metadata为了快速访问数据库。可以看作是很多Table对象的集合,还有一些关于engin,connection的信息。可以通过MetaData.tables访问这些表对象字典
定义表对象之前需要先实例化Metadata:

from sqlalchemy import MetaData
metadata = MetaData()

Table对象构建如下:第一个参数为名称,第二个参数为Metadata对象,后续参数为Column对象. Column对象参数为,名称,类型,及其余等

from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey
cookies = Table('cookies', metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255)),
Column('cookie_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
from datetime import datetime
from sqlalchemy import DateTime
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now)

注意:这里default,onupdate属性是一个callable对象而不是直接值,比如datetime.now(),因为这样的话,就永远是这个值,而不是每个实例实例化、更新时的时间了。
比较有用的就是onupdate,每次更新时都会调用该方法或函数。

键和约束(Keys and Constraints)
键和约束既可以像上面那样通过kwargs定义在Column中,也可以在之后通过对象添加。相关类定义在基础的 sqlalchemy模块中,比如最常用的三个:
from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint

PrimaryKeyConstraint('user_id', name='user_pk'),它也支持同时定义多个形成联合主键。
UniqueConstraint('username', name='uix_username')
CheckConstraint('unit_cost >= 0.00', name='unit_cost_positive')

索引(Index)

from sqlalchemy import Index
Index('ix_cookies_cookie_name', 'cookie_name')

这个定义需要放置在Table构造器中。也可以在之后定义,比如
Index('ix_test', mytable.c.cookie_sku, mytable.c.cookie_name))

关联关系和外键约束(Relationships and ForeignKeyConstraints)

from sqlalchemy import ForeignKey
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id')),
Column('shipped', Boolean(), default=False)
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('cookie_id', ForeignKey('cookies.cookie_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)

注意:这里ForeignKey用的是字符串参数(这些字符串对应的是数据库中的表名.列名),而非引用。这样隔离了模块间相互依赖
我们也可以使用:
ForeignKeyConstraint(['order_id'], ['orders.order_id'])

创建或持久化表模式(Persisting the Tables)
通过示例代码我们知道所有的Table定义,以及额外的模式定义都会与一个metadata对象关联。我们可以通过这个metadata对象来创建表:

metadata.create_all(engine)

注意:默认情况下create_all不会重新创建已有表,所以它可以安全地多次调用,而且也非常友好地与数据库迁移库如Ablembic集成而不需要你进行额外手动编码。

本节代码完整如下:

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
DateTime, ForeignKey, create_engine)
metadata = MetaData()
cookies = Table('cookies', metadata,
Column('cookie_id', Integer(), primary_key=True),
Column('cookie_name', String(50), index=True),
Column('cookie_recipe_url', String(255)),
Column('cookie_sku', String(55)),
Column('quantity', Integer()),
Column('unit_cost', Numeric(12, 2))
)
users = Table('users', metadata,
Column('user_id', Integer(), primary_key=True),
Column('customer_number', Integer(), autoincrement=True),
Column('username', String(15), nullable=False, unique=True),
Column('email_address', String(255), nullable=False),
Column('phone', String(20), nullable=False),
Column('password', String(25), nullable=False),
Column('created_on', DateTime(), default=datetime.now),
Column('updated_on', DateTime(), default=datetime.now, onupdate=datetime.now)
)
orders = Table('orders', metadata,
Column('order_id', Integer(), primary_key=True),
Column('user_id', ForeignKey('users.user_id'))
)
line_items = Table('line_items', metadata,
Column('line_items_id', Integer(), primary_key=True),
Column('order_id', ForeignKey('orders.order_id')),
Column('cookie_id', ForeignKey('cookies.cookie_id')),
Column('quantity', Integer()),
Column('extended_cost', Numeric(12, 2))
)
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)

SQLAlchemy-Core模块

插入数据:

ins = cookies.insert().values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
cookie_sku="CC01",
quantity="12",
unit_cost="0.50"
)
print(str(ins))

当然你也可以这么做:

from sqlalchemy import insert
ins = insert(cookies).values(
cookie_name="chocolate chip",
cookie_recipe_url="http://some.aweso.me/cookie/recipe.html",
cookie_sku="CC01",
quantity="12",
unit_cost="0.50"
)

上述编译成预编译语句如下:

INSERT INTO cookies
(cookie_name, cookie_recipe_url, cookie_sku, quantity, unit_cost)
VALUES
(:cookie_name, :cookie_recipe_url, :cookie_sku, :quantity, :unit_cost)

实际过程会是如下ins对象内部会调用compile()方法编译成上述语句,然后将参数存储到ins.compile().params字典中。
接下来我们通过前面获取的connection对象执行statement:

result = connection.execute(ins)

当然你也可以这么查询:

ins = cookies.insert()
result = connection.execute(
ins,
cookie_name='dark chocolate chip',
cookie_recipe_url='http://some.aweso.me/cookie/recipe_dark.html',
cookie_sku='CC02',
quantity='1',
unit_cost='0.75'
)
result.inserted_primary_key

批量插入:

inventory_list = [
{
'cookie_name': 'peanut butter',
'cookie_recipe_url': 'http://some.aweso.me/cookie/peanut.html',
'cookie_sku': 'PB01',
'quantity': '24',
'unit_cost': '0.25'
},
{
'cookie_name': 'oatmeal raisin',
'cookie_recipe_url': 'http://some.okay.me/cookie/raisin.html',
'cookie_sku': 'EWW01',
'quantity': '100',
'unit_cost': '1.00'
}
]
result = connection.execute(ins, inventory_list)

注意:一定要确保所有字典参数拥有相同的keys

查询

from sqlalchemy.sql import select
s = select([cookies])
rp = connection.execute(s)
results = rp.fetchall()

当然我们也可以使用字符串来代替:

s = select("""SELECT cookies.cookie_id, cookies.cookie_name,
cookies.cookie_recipe_url, cookies.cookie_sku, cookies.quantity,
cookies.unit_cost FROM cookies""")

connection.execute返回的rp变量是一个ResultProxy对象(它是DBAPI中cursor对象的封装)。

我们也可以这样写:

from sqlalchemy.sql import select
s = cookies.select()
rp = connection.execute(s)
results = rp.fetchall()

ResultProxy使得查询结果可以通过index,name,or Column object访问列数据。例如:

first_row = results[0]
first_row[1] #游标列索引从1开始,by index
first_row.cookie_name # by name
first_row[cookies.c.cookie_name] #by Column object.

你也可以迭代ResultProxy,如下:

rp = connection.execute(s)
for record in rp:
print(record.cookie_name)

ResultProxy其余可用来获取结果集的方法

  • first()

  • fetchone()

  • fetchall()

  • scalar():Returns a single value if a query results in a single record with one column.

  • keys() 获取列名

关于选择ResultProxy上述的方法的建议:
1、使用first()而不是fetchone()来获取单条记录,因为fetchone()调用之后仍然保留着打开的connections共后续使用,如果不小心的话很容易引起问题。
2、使用迭代方式获取所有结果,而不是fetchall(),更加省内存。
3、使用scalar()获取单行单列结果时需要注意,如果返回多于一行,它会抛出异常。

控制返回列的数目

s = select([cookies.c.cookie_name, cookies.c.quantity])
rp = connection.execute(s)
print(rp.keys())
result = rp.first()

排序

s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
rp = connection.execute(s)
for cookie in rp:
print('{} - {}'.format(cookie.quantity, cookie.cookie_name))

#倒序desc
from sqlalchemy import desc
s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(desc(cookies.c.quantity))

限制返回结果集的条数

s = select([cookies.c.cookie_name, cookies.c.quantity])
s = s.order_by(cookies.c.quantity)
s = s.limit(2)
rp = connection.execute(s)
print([result.cookie_name for result in rp])

内置SQL函数

在sqlalchemy.sql.func模块中

#sum
from sqlalchemy.sql import func
s = select([func.sum(cookies.c.quantity)])
rp = connection.execute(s)
print(rp.scalar())

#count
s = select([func.count(cookies.c.cookie_name)])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.count_1) #字段名是自动生成的,<func_name>_<position>,可以设置别名的,看下面

#设置别名
s = select([func.count(cookies.c.cookie_name).label('inventory_count')])
rp = connection.execute(s)
record = rp.first()
print(record.keys())
print(record.inventory_count)

过滤

#where
s = select([cookies]).where(cookies.c.cookie_name == 'chocolate chip')
rp = connection.execute(s)
record = rp.first()
print(record.items()) #调用row对象的items()方法。

#like
s = select([cookies]).where(cookies.c.cookie_name.like('%chocolate%'))
rp = connection.execute(s)
for record in rp.fetchall():
    print(record.cookie_name)

可以在where中使用的子句元素

  • between(cleft, cright)

  • concat(column_two) Concatenate column with column_two

  • distinct()

  • in_([list])

  • is_(None) Find where the column is None (commonly used for Null checks with None)

  • contains(string) Find where the column has string in it (case-sensitive)

  • endswith(string) Find where the column ends with string (case-sensitive)

  • like(string) Find where the column is like string (case-sensitive)

  • startswith(string) Find where the column begins with string (case-sensitive)

  • ilike(string) Find where the column is like string (this is not case-sensitive)

当然还包括一系列的notxxx方法,比如notin_(),唯一的例外是isnot()

操作符

  • +,-,*,/,%

  • ==,!=,<,>,<=,>=

  • AND,OR,NOT,由于python关键字的原因,使用and_(),or_(),not_()来代替

+号还可以用于字符串拼接:

s = select([cookies.c.cookie_name, 'SKU-' + cookies.c.cookie_sku])
for row in connection.execute(s):
print(row)
from sqlalchemy import cast
s = select([cookies.c.cookie_name,
    cast((cookies.c.quantity * cookies.c.unit_cost),
        Numeric(12,2)).label('inv_cost')])
for row in connection.execute(s):
    print('{} - {}'.format(row.cookie_name, row.inv_cost))

注意:cast是另外一个函数,允许我们进行类型转换,上述转换是将数字转换为货币形式,和
print(‘{} – {:.2f}’.format(row.cookie_name, row.inv_cost)).这个行为一致。

from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    and_(
        cookies.c.quantity > 23,
        cookies.c.unit_cost < 0.40
    )
)
for row in connection.execute(s):
    print(row.cookie_name)


from sqlalchemy import and_, or_, not_
s = select([cookies]).where(
    or_(
        cookies.c.quantity.between(10, 50),
        cookies.c.cookie_name.contains('chip')
    )
)
for row in connection.execute(s):
    print(row.cookie_name)

update

from sqlalchemy import update
u = update(cookies).where(cookies.c.cookie_name == "chocolate chip")
u = u.values(quantity=(cookies.c.quantity + 120))
result = connection.execute(u)
print(result.rowcount)
s = select([cookies]).where(cookies.c.cookie_name == "chocolate chip")
result = connection.execute(s).first()
for key in result.keys():
    print('{:>20}: {}'.format(key, result[key]))

delete

from sqlalchemy import delete
u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(u)
print(result.rowcount)

s = select([cookies]).where(cookies.c.cookie_name == "dark chocolate chip")
result = connection.execute(s).fetchall()
print(len(result))

joins

join(),outerjoin()函数,select_from()函数

columns = [orders.c.order_id, users.c.username, users.c.phone,
           cookies.c.cookie_name, line_items.c.quantity,
           line_items.c.extended_cost]
cookiemon_orders = select(columns)
cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join(
    line_items).join(cookies)).where(users.c.username ==
                                     'cookiemon')
result = connection.execute(cookiemon_orders).fetchall()
for row in result:
    print(row)

最终产生的SQL语句如下:

SELECT orders.order_id, users.username, users.phone, cookies.cookie_name,
line_items.quantity, line_items.extended_cost FROM users JOIN orders ON
users.user_id = orders.user_id JOIN line_items ON orders.order_id =
line_items.order_id JOIN cookies ON cookies.cookie_id = line_items.cookie_id
WHERE users.username = :username_1

outerjoin

columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
    print(row)

表别名函数alias()

>>> manager = employee_table.alias('mgr')
>>> stmt = select([employee_table.c.name],
            ... and_(employee_table.c.manager_id==manager.c.id,
            ... manager.c.name=='Fred'))
>>> print(stmt)
SELECT employee.name
FROM employee, employee AS mgr
WHERE employee.manager_id = mgr.id AND mgr.name = ?

分组

columns = [users.c.username, func.count(orders.c.order_id)]
all_orders = select(columns)
all_orders = all_orders.select_from(users.outerjoin(orders))
all_orders = all_orders.group_by(users.c.username)
result = connection.execute(all_orders).fetchall()
for row in result:
    print(row)

chaining

def get_orders_by_customer(cust_name, shipped=None, details=False):
    columns = [orders.c.order_id, users.c.username, users.c.phone]
    joins = users.join(orders)
    if details:
        columns.extend([cookies.c.cookie_name, line_items.c.quantity,
            line_items.c.extended_cost])
        joins = joins.join(line_items).join(cookies)
    cust_orders = select(columns)
    cust_orders = cust_orders.select_from(joins)

    cust_orders = cust_orders.where(users.c.username == cust_name)
    if shipped is not None:
        cust_orders = cust_orders.where(orders.c.shipped == shipped)
    result = connection.execute(cust_orders).fetchall()
    return result

执行原生SQL

返回的还是ResultProxy对象
1、完全采用原始SQL

result = connection.execute("select * from orders").fetchall()
print(result)

2、部分采用原始SQL,text()函数

from sqlalchemy import text
stmt = select([users]).where(text("username='cookiemon'"))
print(connection.execute(stmt).fetchall())

异常

SQLALchemy定义了很多异常。我们通过关心:AttributeErrors,IntegrityErrors.等
为了进行相关试验与说明,请先执行下面这些语句

from datetime import datetime
from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String,
                        DateTime, ForeignKey, Boolean, create_engine,
                        CheckConstraint)
metadata = MetaData()
cookies = Table('cookies', metadata,
                Column('cookie_id', Integer(), primary_key=True),
                37
                Column('cookie_name', String(50), index=True),
                Column('cookie_recipe_url', String(255)),
                Column('cookie_sku', String(55)),
                Column('quantity', Integer()),
                Column('unit_cost', Numeric(12, 2)),
                CheckConstraint('quantity > 0', name='quantity_positive')
                )
users = Table('users', metadata,
              Column('user_id', Integer(), primary_key=True),
              Column('username', String(15), nullable=False, unique=True),
              Column('email_address', String(255), nullable=False),
              Column('phone', String(20), nullable=False),
              Column('password', String(25), nullable=False),
              Column('created_on', DateTime(), default=datetime.now),
              Column('updated_on', DateTime(),
                     default=datetime.now, onupdate=datetime.now)
              )
orders = Table('orders', metadata,
               Column('order_id', Integer()),
               Column('user_id', ForeignKey('users.user_id')),
               Column('shipped', Boolean(), default=False)
               )
line_items = Table('line_items', metadata,
                   Column('line_items_id', Integer(), primary_key=True),
                   Column('order_id', ForeignKey('orders.order_id')),
                   Column('cookie_id', ForeignKey('cookies.cookie_id')),
                   Column('quantity', Integer()),
                   Column('extended_cost', Numeric(12, 2))
                   )
engine = create_engine('sqlite:///:memory:')
metadata.create_all(engine)
connection = engine.connect()
from sqlalchemy import select, insert
ins = insert(users).values(
username="cookiemon",
email_address="mon@cookie.com",
phone="111-111-1111",
password="password"
)
result = connection.execute(ins)
s = select([users.c.username])
results = connection.execute(s)
for result in results:
print(result.username)
print(result.password) #此处包AttributeError异常

在违反约束的情况下会出现IntegrityError异常。比如违反唯一性约束等。

s = select([users.c.username])
connection.execute(s).fetchall()
[(u'cookiemon',)]
ins = insert(users).values(
    username="cookiemon",
    email_address="damon@cookie.com",
    phone="111-111-1111",
    password="password"
)
result = connection.execute(ins) #此处报IntegrityError, UNIQUE constraint failed: users.username
#异常处理
try:
    result = connection.execute(ins)
except IntegrityError as error:
    print(error.orig.message, error.params)

所有的SQLAlchemy异常处理方式都是上面那种思路,通过[SQLAlchemyError](http://docs.sqlal
chemy.org/en/latest/core/exceptions.html)可以获取到的信息由如下:

  • orig :The DBAPI exception object.

  • params:The parameter list being used when this exception occurred.

  • statement :The string SQL statement being invoked when this exception occurred.

事务Transactions

from sqlalchemy.exc import IntegrityError


def ship_it(order_id):
    s = select([line_items.c.cookie_id, line_items.c.quantity])
    s = s.where(line_items.c.order_id == order_id)
    transaction = connection.begin() #开启事务
    cookies_to_ship = connection.execute(s).fetchall()
    try:
        for cookie in cookies_to_ship:
            u = update(cookies).where(cookies.c.cookie_id == cookie.cookie_id)
            u = u.values(quantity=cookies.c.quantity - cookie.quantity)
            result = connection.execute(u)
        u = update(orders).where(orders.c.order_id == order_id)
        u = u.values(shipped=True)
        result = connection.execute(u)
        print("Shipped order ID: {}".format(order_id))
        transaction.commit() #提交事务
    except IntegrityError as error:
        transaction.rollback()   #事务回滚
        print(error)
    原文作者:xbynet
    原文地址: https://segmentfault.com/a/1190000007835240
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞