Flask本身不限定数据库的选择,你可以选择SQL或NOSQL的任何一种。也可以选择更方便的SQLALchemy,类似于Django的ORM。SQLALchemy实际上是对数据库的抽象,让开发者不用直接和SQL语句打交道,而是通过Python对象来操作数据库,在舍弃一些性能开销的同时,换来的是开发效率的较大提升。
SQLAlchemy是一个关系型数据库框架,它提供了高层的ORM和底层的原生数据库的操作。flask-sqlalchemy是一个简化了SQLAlchemy操作的flask扩展。
安装Flask-SQLALchemy
pip install flask-sqlalchemy
#同时需要安装数据库驱动层(有的人肯定很迷茫他们的关系,其实就是下面这样,我这里选择mysql官方推荐的mysql-python)
1、pymysql支持python3、python2
2、mysql-python支持python2
3、flask-mysqladb是在mysql-python(mysqpl官方提供的驱动包)基础上进行的封装
pip install mysql-python
常用的SQLAlchemy字段类型
类型名 | python中类型 | 说明 |
---|---|---|
Integer | int | 普通整数,一般是32位 |
SmallInteger | int | 取值范围小的整数,一般是16位 |
BigInteger | int或long | 不限制精度的整数 |
Float | float | 浮点数 |
Numeric | decimal.Decimal | 普通整数,一般是32位 |
String | str | 变长字符串 |
Text | str | 变长字符串,对较长或不限长度的字符串做了优化 |
Unicode | unicode | 变长Unicode字符串 |
UnicodeText | unicode | 变长Unicode字符串,对较长或不限长度的字符串做了优化 |
Boolean | bool | 布尔值 |
Date | datetime.date | 时间 |
Time | datetime.datetime | 日期和时间 |
LargeBinary | str | 二进制文件 |
代码实例
# coding:utf-8
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
#配置数据库连接的对象
class Config(object):
"""配置参数"""
# sqlalchemy的配置参数 帐号:密码 地址 端口 数据库
SQLALCHEMY_DATABASE_URI = "mysql://root:mysql@127.0.0.1:3306/db_test"
# 设置sqlalchemy自动更跟踪数据库(数据库表手动更新是同步跟新到对象)
SQLALCHEMY_TRACK_MODIFICATIONS = True
#查询时会显示原始SQL语句
SQLALCHEMY_ECHO = True
#加载数据库配置对象
app.config.from_object(Config)
# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)
class Role(db.Model):
"""用户角色/身份表"""
#重命名表名
__tablename__ = "tbl_roles"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32), unique=True)
#这个重点介绍一下,这是额外添加的一个属性,并不会添加在表里面
users = db.relationship("User", backref="role")
def __repr__(self):
"""定义之后,可以让显示对象的时候更直观"""
return "Role object: name=%s" % self.name
# 表名的常见规范
# ihome -> ih_user 数据库名缩写_表名
# tbl_user tbl_表名
# 创建数据库模型类
class User(db.Model):
"""用户表"""
__tablename__ = "tbl_users" # 指明数据库的表名
id = db.Column(db.Integer, primary_key=True) # 整型的主键,会默认设置为自增主键
name = db.Column(db.String(64), unique=True)
email = db.Column(db.String(128), unique=True)
password = db.Column(db.String(128))
# 添加外键 关联角色表的id
role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id"))
def __repr__(self):
return "User object: name=%s" % self.name
if __name__ == '__main__':
# 清除数据库里的所有数据
db.drop_all()
# 创建所有的表
db.create_all()
# 创建对象
role1 = Role(name="admin")
# session记录对象任务
db.session.add(role1)
# 提交任务到数据库中
db.session.commit()
role2 = Role(name="stuff")
db.session.add(role2)
db.session.commit()
us1 = User(name='wang', email='wang@163.com', password='123456', role_id=role1.id)
us2 = User(name='zhang', email='zhang@189.com', password='201512', role_id=role2.id)
us3 = User(name='chen', email='chen@126.com', password='987654', role_id=role2.id)
us4 = User(name='zhou', email='zhou@163.com', password='456789', role_id=role1.id)
# 一次保存多条数据
db.session.add_all([us1, us2, us3, us4])
db.session.commit()
常用的SQLAlchemy查询过滤器
过滤器 | 说明 |
---|---|
filter() | 把过滤器添加到原查询上,返回一个新查询 |
filter_by() | 把等值过滤器添加到原查询上,返回一个新查询 |
limit | 使用指定的值限定原查询返回的结果 |
offset() | 偏移原查询返回的结果,返回一个新查询 |
order_by() | 根据指定条件对原查询结果进行排序,返回一个新查询 |
group_by() | 根据指定条件对原查询结果进行分组,返回一个新查询 |
常用的SQLAlchemy查询执行器
方法 | 说明 |
---|---|
all() | 以列表形式返回查询的所有结果 |
first() | 返回查询的第一个结果,如果未查到,返回None |
first_or_404() | 返回查询的第一个结果,如果未查到,返回404 |
get() | 返回指定主键对应的行,如不存在,返回None |
get_or_404() | 返回指定主键对应的行,如不存在,返回404 |
count() | 返回查询结果的数量 |
paginate() | 返回一个Paginate对象,它包含指定范围内的结果 |
插入一条数据
ro = Role(name='admin')
db.session.add(ro)
db.session.commit()
查询:filter_by精确查询
User.query.filter_by(name='wang').all()
first()返回查询到的第一个对象
User.query.first()
all()返回查询到的所有对象
User.query.all()
filter模糊查询,返回名字结尾字符为g的所有数据。
User.query.filter(User.name.endswith('g')).all()
get(),参数为主键,如果主键不存在没有返回内容
User.query.get()
逻辑非,返回名字不等于wang的所有数据。
User.query.filter(User.name!='wang').all()
逻辑与,需要导入and,返回and()条件满足的所有数据。
from sqlalchemy import and_
User.query.filter(and_(User.name!='wang',User.email.endswith('163.com'))).all()
逻辑或,需要导入or_
from sqlalchemy import or_
User.query.filter(or_(User.name!='wang',User.email.endswith('163.com'))).all()
not_ 相当于取反
from sqlalchemy import not_
User.query.filter(not_(User.name=='chen')).all()
查询数据后删除
user = User.query.first()
db.session.delete(user)
db.session.commit()
User.query.all()
更新数据
user = User.query.first()
user.name = 'wang'
db.session.commit()
User.query.first()
使用update
User.query.filter_by(name='zhang').update({'name':'li'})
关联查询示例:角色和用户的关系是一对多的关系,一个角色可以有多个用户,一个用户只能属于一个角色。
查询角色的所有用户:查询roles表id为1的角色
ro1 = Role.query.get(1)
#查询该角色的所有用户
ro1.us
查询用户所属角色:
#查询users表id为2的用户
us1 = User.query.get(2)
#查询用户属于什么角色
us1.role