SQLAlchemy —— 多表查询

1. join 查询

假设这样一个业务场景,知道一个邮箱地址,要查询这个地址所属的用户,第一个办法是用连接多个 filter() 来查询。

for u, a in session.query(User, Address).\
    filter(User.id==Address.user_id).\
    filter(Address.email_address=='jack@google.com').\
    all():

    print(u)
    print(a)

# 执行结果
jack
jack@google.com

更简便的方法是使用 join() 方法:

u =  session.query(User).join(Address).\
    filter(Address.email_address=='jack@google.com').\
    one()

print(u)

# 执行结果
jack

Query.join() 知道如何在 UserAddress 之间进行连接,因为我们设定了外键。假如我们没有指定外键,比如这样:

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    fullname = Column(String(50))
    password = Column(String(12))


class Address(Base):
    __tablename__ = 'addresses'
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)
    user_id = Column(Integer)

我们可以用下面方法来让 join 生效:

query.join(Address, User.id==Address.user_id)    # explicit condition
query.join(User.addresses)                       # specify relationship from left to right
query.join(Address, User.addresses)              # same, with explicit target
query.join('addresses')                          # same, using a string

例子:

session.query(User).\
    join(Address, User.id==Address.user_id).\
    filter(Address.email_address=='jack@google.com').all()

2. 子查询(subquery)

现在需要查询每个用户所拥有的邮箱地址数量,思路是先对 addresses 表按用户 ID 分组,统计各组数量,这样我们得到一张新表;然后用 JOIN 连接新表和 users 两个表,在这里,我们应该使用 LEFT OUTER JOIN,因为使用 INTER JOIN 所得出的新表只包含两表的交集。

from sqlalchemy.sql import func

stmt = session.query(Address.user_id, func.count('*').\
    label('address_count')).\
    group_by(Address.user_id).subquery()

for u, count in session.query(User, stmt.c.address_count).\
    outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id):
    print(u, count)

# 执行结果
ed None
wendy None
mary None
fred None
jack 2

如果上面的暂时看不懂,我们先来看看第一个 stmt 的情况。

from sqlalchemy.sql import func

stmt = session.query(Address.user_id, func.count('*').\
    label('address_count')).\
    group_by(Address.user_id).all()

for i in stmt:
    print(i)

# 执行结果
(5, 2)

可以理解成 group_by() 方法生成了一张新的表,该表有两列,第一列是 user_id ,第二列是该 user_id 所拥有的 addresses 的数量,这个值由 func() 跟着的方法产生,我们可以使用 c() 方法来访问这个值。

from sqlalchemy.sql import func

stmt = session.query(Address.user_id, func.count('*').\
    label('address_count')).\
    group_by(Address.user_id).subquery()

q = session.query(User, stmt.c.address_count).\
    outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id).all()

for i in q:
    print(i)

# 执行结果
(ed, None)
(wendy, None)
(mary, None)
(fred, None)
(jack, 2)

如果不用 outerjoin() 而使用 join(),就等于使用 SQL 中的 INTER JOIN,所得出的表只为两者交集,不会包含 None 值的列。

from sqlalchemy.sql import func

stmt = session.query(Address.user_id, func.count('*').\
    label('address_count')).\
    group_by(Address.user_id).subquery()

q = session.query(User, stmt.c.address_count).\
    join(stmt, User.id==stmt.c.user_id).order_by(User.id).all()

for i in q:
    print(i)

# 执行结果
(jack, 2)

3.使用别名(aliased)

SQLAlchemy 使用 aliased() 方法表示别名,当我们需要把同一张表连接多次的时候,常常需要用到别名。

from sqlalchemy.orm import aliased

# 把 Address 表分别设置别名
adalias1 = aliased(Address)
adalias2 = aliased(Address)

for username, email1, email2 in \
    session.query(User.name, adalias1.email_address, adalias2.email_address).\
    join(adalias1, User.addresses).\
    join(adalias2, User.addresses).\
    filter(adalias1.email_address=='jack@google.com').\
    filter(adalias2.email_address=='j25@yahoo.com'):
    
    print(username, email1, email2)

# 执行结果
jack jack@google.com j25@yahoo.com

上述代码查询同时拥有两个名为:”jack@google.com” 和 “j25@yahoo.com” 邮箱地址的用户。

别名也可以在子查询里使用:

from sqlalchemy.orm import aliased

stmt = session.query(Address).\
    filter(Address.email_address != 'j25@yahoo.com').\
    subquery()

adalias = aliased(Address, stmt)

for user, address in session.query(User, adalias).\
    join(adalias, User.addresses):

    print(user)
    print(address)

# 执行结果
jack
jack@google.com

4. EXISTS 关键字

EXISTS 关键字可以在某些场景替代 JOIN 的使用。

from sqlalchemy.sql import exists

stmt = exists().where(Address.user_id==User.id)

for name, in session.query(User.name).filter(stmt):
    print(name)

# 执行结果
jack

使用 any() 方法也能得到同意的效果:

for name, in session.query(User.name).\
    filter(User.addresses.any()):
    
    print(name)

使用 any() 方法时也可加上查询条件:

for name, in session.query(User.name).\
    filter(User.addresses.any(Address.email_address.like('%google%'))):
    print(name)

使用 has() 方法也能起到 JOIN 的作用:

session.query(Address).filter(~Address.user.has(User.name=='jack')).all()

注意:这里的 ~ 符号是 “不” 的意思。

关系运算符

1. 等于、不等于
query = session.query(Address)
jack = session.query(User).filter(User.name == 'jack').one()

# 筛选 user 为 jack 的邮箱
query.filter(Address.user == jack)

# 筛选 user 不为 jack 的邮箱
query.filter(Address.user != jack)
2. 为空、不为空
# 筛选 user 为空的邮箱
query.filter(Address.user == None)

# 筛选 user 不为空的邮箱
query.filter(Address.user != None)
3. 包含
query = session.query(User)
address = session.query(Address).filter(Address.id == 1).one()

# 筛选包含某地址的用户
query.filter(User.addresses.contains(address))
    原文作者:SingleDiego
    原文地址: https://www.jianshu.com/p/bf9f98479026
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞