声明
上篇地址:https://segmentfault.com/a/11…
虽然上一篇,已经说明,但还是强调一下,peewee是 python-ORM(只支持 MySQL,Sqlite,postgresql )
虽然ORM可以与多种数据库无缝相接,并且兼容性好, 但是某些细微的语法并不是数据库共有的。
我用MySQL, 所以下面说的都是基于MySQL(其他2种数据库也差不了多少, 99%是一样的)
总官档地址:http://docs.peewee-orm.com/en…
官方Github地址:https://github.com/coleifer/p…
增加数据
方式1:(推荐)
zhang = Owner.create(name='Zhang', age=20)
方式2:
zhang = Owner(name='Zhang1', age=18)
zhang.save()
# 你可以看见,它需要用save(),所以推荐用上一种方式。
方式3:(推荐)
cython = Owner.insert(name='Cython', age=15).execute()
# 方式1 和 方式2, 返回结果都是模型实例"(也就意味着创建了一个实例)"
# 而本方式,返回结果是 最新插入的主键值"(也就意味着不会创建实例)"
如果存在外键关联,假如存在 Pet类 引用的 Owner的主键,插入数据方式有2种:
方式1: 用新建对象变量传值:
lin = Owner.create(name='lin', age=20)
tom1 = Pet.create(name='Tom', age=1, owner=lin) # 注意 owner = lin
方式2: 手动维护主键 id,通过主键传值(或者通过查询id):
lin = Owner.create(id=100, name='lin', age=20) # id自己给的值为 100
tom1 = Pet.create(name='Tom', age=1, owner=100) # 注意 owner=100
插入多条数据:(官档有好几种方法,我只说最提倡,最快速的方法(好处就是一次性提交,不用循环))
方式1:
"""
注意格式 [ {},{},{} ]
每个字典,对应一条记录。
"""
data = [
{'name': 'Alice', 'age': 18},
{'name': 'Jack', 'age': 17},
]
Owner.insert_many(data).execute()
方式2: (就是不用在数据中都指定键了,方便一点)
"""
注意格式 [ (),(),() ]
每个元组,对应一条记录。
"""
data = [
('Alice', 18),
('Jack', 17),
]
User.insert_many(data, fields=[Owner.name, Owner.age]).execute()
注意一下:尾部都必须要带一个execute()
如果数据量过大,可能会出现OOM等问题。你可以手动分批,但是 peewee 给我们提供了成品的 api
from peewee import chunked
with mysql_db.atomic(): # 官档建议用事务包裹
for batch in chunked(data, 100): # 一次100条, chunked() 返回的是可迭代对象
Owner.insert_many(batch).execute()
防止数据重复插入的2种办法(或者防止设置了主键,重复插入抛出异常,导致程序无法运行):
方法1: INGORE关键字 (这种方式是如果冲突了,就自动忽略)
SQL:
insert ignore into owner (name,age) values ('lin',30);
peewee:
Owner.insert(name='lin', age=30).on_conflict_ignore()
方法2:用 ON DUPLICATE KEY UPDATE (这种方式,是如果冲突了,你还可以做一些操作)
SQL:
insert into owner (name,age) values ('lin',30)
ON DUPLICATE KEY
UPDATE name='lin', age=30; # 如果冲突了,可以重新设置值
peewee:
Owner.insert(name='lin', age=30).on_conflict(
preserve=[Owner.name, Owner.age], # 若冲突,你想保留不变的字段
update={Owner.name: 'lin', Owner.age: 30} # 若冲突,你想更新什么
).execute()
# 注: preserve 和 update 按情况用,一般设置一个用就行了。
删除数据
方法1:
php = Owner.get(name='PHP') # 获取单条对象
php.delete_instance()
# 注意: delete_instance() 只能删除单条对象, 如果用select()查出来的,需要遍历才能删
方法2:
Owner.delete().where(Owner.name == 'lin').execute()
# 注意这种方法和添加类似, 最后也必须有个 execute()
修改数据
方式1:(不推荐)
owner= 查询单个对象结果
owner.name = 'Pack'
owner.name = 50
owner.save() # 你可以看见,我们还需要手动调用一下save()
方式2:(推荐)
query = Owner.update(name='Pack', age=50).where(Owner.name == 'Zhang')
query.execute()
查询数据
查询单条数据 (特别注意,如果你有多条,它只会给你返回第一条)
"""存在则返回原有对象, 不存在则抛error"""
one_owner = Owner.get(name='Zhang2')
print(one_woner.age)
扩展版1: get_or_create
"""存在则返回原有对象。不存在则插入数据,并返回新对象"""
obj, status = Owner.get_or_create(name='Zhang23213',age=3123)
print(obj.name, status)
# obj就是返回的新对象
# status表示插入是否成功 True 或者 False
扩展版2: get_or_none
"""存在则返回原有对象, 不存在则返回 None (不会抛error)"""
Owner.get_or_none(name='abc')
查询多条数据
正常查询所有数据
owners = Owner.select() # 返回结果 owners 是对象集合,需要遍历
for owner in owners: # owner 是每个对象(对应每条记录)
print(woner.name)
当然你可以在查询后转为 python 类dict格式:
owners = Owner.select().dicts() # 返回结果 owners 是 "类字典对象集合"
for owner in owners: # owner是每个字典对象, (它 对应每条记录)
print(owner['name']) # 字典语法取值,懂了吧,不多说了。
上面的查询如果在数据大量的情况下可能会导致OOM,因此可转为迭代:
"""再每个查询的最后加上 .iterator() 即可"""
eg:
owners = Owner.select().iterator()
owners = Owner.select().dicts().iterator()
条件查询:
首先我先强调个,”MySQL是否区分大小写” 的事:
MySQL5.7+,是区分大小写的; (MySQL8,和 MariaDB 我没试, 应该和 5.7是一样的)
但这个区分大小写 仅仅仅仅仅仅 是 针对于 SQL语句的表名 "" 引号外面的(就是非字符串语法)
举个例子:
现有一表,名叫 owner
desc owner # 正确
desc OWNER # 错误,表不存在
这种情况下,因为不涉及字符串的 "" 引号操作,所以是严格区分大小写的。
"而引号里面" (其实就是涉及字符串)的数据语法,是 不区分 大小写的。
举个例子(因为下面例子都有 "" 字符串操作,所以都 不区分 大小写):
SQL:
查询例子:
select * from owner where name='zHang'
select * from owner where name='ZHANG'
他们俩查询的是同一个数据。
插入例子:
insert into owner values("zhaNg")
insert into owner values("zhang")
他们俩 插入的 也是同一个数据
peewee:
查询例子:
...where(name="zhang")
...where(name="ZHaNg")
他们俩查询的是 同一个数据。
插入例子:
...insert({'name':'Zhang')
...insert({'name': 'zhANG')
他们俩 插入的 也是同一个数据
官档-条件操作符:http://docs.peewee-orm.com/en…
上边的连接是官档操作符大全,下面我把部分常用摘出来说一下。
常用操作符
与或非:
与:&
模型类.where( (User.is_active == True) & (User.is_admin == True) )
或:|
模型类.where( (User.is_admin) | (User.is_superuser) )
非:~
模型类.where( ~(User.username.contains('admin')) )
我说两句,方便记忆:
1. SQL语句中"与或非" 是 "and or not" 语法, 为啥peewee不遵循?
答: 因为,"python原语法"也是这三个。。。冲突, 所以 peewee改了。
2. 看上面的例子, 每个条件操作符 "两边"的代码 都用 "()" 括起来了
范围:
# 查询年龄18到20的数据 (前闭后闭)
for owner in Owner.select().where(Owner.age.between(18,20)):
print(owner.age)
包含&不包含:
不包含:not_in (同下)
不包含:in_
# 将姓名包含 Alice和Tom的记录找出来
for owner in Owner.select().where(Owner.name.in_(['Alice', 'Tom'])):
print(owner.name)
是否为null:
# True 就代表把所有 name 为 null 的 记录都查出来
# False 就代表把所有 name 为 非null 的 记录都查出来
for owner in Owner.select().where( Owner.name.is_null(True) ):
print(owner.name)
以..开头 & 以..结尾
以..开头: startswith
以..结尾: endswith
# 把以 ali 开头的 都查询出来
for owner in Owner.select().where(Owner.name.startswith('ali')):
print(owner.name)
模糊查询:
# 将包含 li 字符串的数据查询出来
for owner in Owner.select().where(Owner.name.contains('li')):
print(owner.name)
正则查询:
这个就有意思了。前面我们强调过,MySQL带引号字符串是不区分大小写的。
而正则功能提供给我们区分大小写的API。(这是个特例,只有正则区分大小写的功能。记住)
例子条件:
假如我们有一个数据 name为 Alice
regexp: 严格区分大小写的正则
# 用的是 regexp,区分大小写, 条件给的是 al小写, 所以当然 查不出来,返回空
for owner in Owner.select().where(Owner.name.regexp('al*')):
print(owner.name)
iregexp:不区分大小写的正则
# 用的是 iregexp, 不区分大小写。 因此即使 你给 al小写, 也能够将 Alice查出来。
for owner in Owner.select().where(Owner.name.iregexp('al*')):
print(owner.name)
统计记录数 count
print(MyModel.select().count())
offset & limit
"""跳过前2行,从第2+1行开始,取1条, 其实取出的就是第3行"""
for x in Owner.select().offset(2).limit(1).dicts():
print(x)
分页 paginate
"""
1. paginate 第1个参数为 第几页
2. paginate 第2个参数为 一页几个数据
3. paginate会自动根据查询的所有记录总数 和 你传的 两个 参数来为你自动分页
"""
for obj in MyModel.select().paginate(1,3).dicts(): # 第一页,每页三个数据
print(obj)
# peewee提供给我们分页就这么多,想要更多需求,需要我们自己发散思维。
# 下面是我自己粗略写的一个笨拙的分页。。可以参考下。。
def page(document_count=None, per_page_size=None, start_page=1):
page_count = (document_count // per_page_size) # 整除的页数(可能有残页)
is_rest = (document_count % per_page_size) # 总数/每页数:是否能除尽
# 除尽代表整页直接返回,除不尽有残页 ,页码+1 返回
page_count = page_count if not is_rest else page_count + 1
for page in range(start_page, page_count + 1):
for obj in MyModel.select().paginate(page, per_page_size).dicts().iterator():
yield obj
document_count = MyModel.select().count() # 先获取记录总数
for obj in page(document_count=document_count, per_page_size=3, start_page=1):
print(obj)
# 如果你有需求分页切片或索引, 那么你可以封装成类,然后实现 __getitem__ 方法
document_count = MyModel.select().count()
for obj in page(document_count=document_count, per_page_size=3, start_page=1):
print(obj)
排序 order_by
# 默认升序 asc()
for owner in Owner.select().order_by(Owner.age):
print(owner.age)
# 降序 desc()
for owner in Owner.select().order_by(Owner.age.desc()):
print(owner.age)
分组 group_by
# 用姓名分组,统计人头数大于1的所有记录,降序查询
query = Owner.select(Owner.name, fn.count(Owner.name).alias('total_num')) \
.group_by(Owner.name) \
.having(fn.count(Owner.name) > 1) \
.order_by(SQL('total_num').desc())
for owner in query:
print(f'名字为{owner.name}的 人数为{owner.total_num}个')
分组注意事项,说几点:
1. 分组操作,和SQL的group by一样, group by后面写了什么字段, 前面select同时也必须包含
2. .alias('统计结果字段名'),是给统计后的结果起一个新字段名。
3. SQL('total_num') 的作用是给临时命名的查询字符串,当作临时字段使用,支持,desc()等API
4. peewee的API是高仿SQL写的,方便使用者。因此我们最好同步SQL的语法规范,按如下顺序:
where > group_by > having > order_by
聚合原理
一会讲peewee的fn聚合原理会涉及到 __getattr__(),如果你不了解,可以看下我之前写过的文章。
https://segmentfault.com/a/11…
聚合原理如下: (以上面分组的 fn.count() 为例)
fn是我事先导入进来的(开篇我就说过 from peewee import * )就导入了一切(建议练习使用)
fn可以使用聚合操作,我看了一下源码:讲解下思路(不一定特别正确):
fn是 Function类实例的出的对象
Function() 定义了 __getattr__方法,(__getattr__开头我已经给链接了,不懂的可以传送)
当你使用 fn.xx() :
xx 就会被当作字符串传到 __getattr__ ,
__getattr__里面用装饰器模式,将你 xx 这个字符串。
经过一系列操作,映射为同名的SQL语句 (这系列操作包括大小写转换等)
所以你用 fn.count 和 fn.CoUNt 是一样的
说到底 fn.xx() , 的意思就是 fn 把 xx 当作字符串映射到SQL语句,能映射到就能执行
常用fn聚合函数
fn.count()
统计总人头数:
for owner in Owner.select(fn.count(Owner.name).alias('total_num')):
print(owner.total_num)
fn.lower() / fn.upper()
名字转小写/大写(注意是临时转,并没有真的转),并查询出来:
for owner in Owner.select(fn.Upper(Owner.name).alias('lower_name')):
print(owner.lower_name)
fn.sum()
年龄求和:
for owner in Owner.select(fn.sum(Owner.age).alias('sum_age')):
print(owner.sum_age)
fn.avg()
求平均年龄:
for owner in Owner.select(fn.avg(Owner.age).alias('avg_age')):
print(owner.avg_age)
fn.min() / fn.max()
找出最小/最大年龄:
for owner in Owner.select(fn.max(Owner.age).alias('max_age')):
print(owner.max_age)
fn.rand()
通常用于乱序查询 (默认是升序的哦):
for owner in Owner.select().order_by()
print(owner.name)
关联查询前提数据准备
from peewee import *
mysql_db = MySQLDatabase('你的数据库名', user='你的用户名', password='你的密码',
host='你的IP', port=3306, charset='utf8mb4')
class BaseModel(Model):
class Meta:
database = mysql_db
class Teacher(BaseModel):
teacher_name = CharField()
class Student(BaseModel):
student_name = CharField()
teacher = ForeignKeyField(Teacher, backref='student')
class Course(BaseModel):
course_name = CharField()
teacher = ForeignKeyField(Teacher, backref='course')
student = ForeignKeyField(Student, backref='course')
mysql_db.create_tables([Teacher, Student, Course])
data = (
('Tom', ('stu1', 'stu2'), ('Chinese',)),
('Jerry', ('stu3', 'stu4'), ('English',)),
)
for teacher_name, stu_obj, course_obj in data:
teacher = Teacher.create(teacher_name=teacher_name)
for student_name in stu_obj:
student = Student.create(student_name=student_name, teacher=teacher)
for course_name in course_obj:
Course.create(teacher=teacher, student=student, course_name=course_name)
关联查询
方式1:join (连接顺序 Teacer -> Student , Student -> Course)
# 注意: 你不用写 on ,因为peewee会自动帮你配对
query = Teacher.select(Teacher.teacher_name, Student.student_name, Course.course_name) \
.join(Student, JOIN.LEFT_OUTER). \ # Teacer -> Student
join(Course, JOIN.LEFT_OUTER) \ # Student -> Course
.dicts()
for obj in query:
print(f"教师:{obj['teacher_name']},学生:{obj['student_name']},课程:{obj['course_name']}")
方式2:switch (连接顺序 Teacer -> Student , Teacher -> Course)
# 说明,我给的数据例子,可能并不适用这种方式的语义,只是单纯抛出语法。
query = Teacher.select(Teacher.teacher_name, Student.student_name, Course.course_name) \
.join(Student) \ # Teacher -> Student
.switch(Student) \ # 注意这里,把join上下文权力还给了 Teacher
.join(Course, JOIN.LEFT_OUTER) \ # Teacher -> Course
.dicts()
for obj in query:
print(f"教师:{obj['teacher_name']},学生:{obj['student_name']},课程:{obj['course_name']}")
方式3:join_from(和方式2是一样的效果,只不过语法书写有些变化)
query = Teacher.select(Teacher.teacher_name, Student.student_name, Course.course_name) \
.join_from(Teacher, Student) \ # 注意这里,直接指明连接首尾对象
.join_from(Teacher, Course, JOIN.LEFT_OUTER) \ # 注意这里,直接指明连接首尾对象
.dicts()
for obj in query:
print(f"教师:{obj['teacher_name']},学生:{obj['student_name']},课程:{obj['course_name']}")
方式4:关联子查询
(说明:关联子查询的意思就是:之前我们join的是个表,而现在join后面不是表,而是子查询。)
SQL版本如下:
SELECT `t1`.`id`, `t1`.`student_name`, `t1`.`teacher_id`, `t2`.`stu_count`
FROM `student` AS `t1`
INNER JOIN (
SELECT `t1`.`teacher_id` AS `new_teacher`, count(`t1`.`student_name`) AS `stu_count`
FROM `student` AS `t1` GROUP BY `t1`.`teacher_id`
) AS `t2`
ON (`t2`.`new_teacher` = `t1`.`teacher_id`
peewee版本如下:
# 子查询(以学生的老师外键分组,统计每个老师的学生个数)
temp_query = Student.select(
Student.teacher.alias('new_teacher'), # 记住这个改名
fn.count(Student.student_name).alias('stu_count') # 统计学生,记住别名,照应下面.c语法
).group_by(Student.teacher) # 以学生表中的老师外键分组
# 主查询
query = Student.select(
Student, # select 传整个类代表,查询
temp_query.c.stu_count # 指定查询字段为 子查询的字段, 所以需要用 .c 语法来指定
).join(
temp_query, # 关联 子查询
on=(temp_query.c.new_teacher == Student.teacher) # 关联条件
).dicts()
for obj in query:
print(obj)
方式5: 无外键关联查询 (无外键也可以join哦,自己指定on就行了)
重新建立一个无外键的表,并插入数据
class Teacher1(BaseModel):
teacher_name = CharField()
class Student1(BaseModel):
student_name = CharField()
teacher_id = IntegerField()
mysql_db.create_tables([Teacher1, Student1])
data = (
('Tom', ('zhang1', 1)),
('Jerry', ('zhang2', 2)),
)
for teacher_name, student_obj in data:
Teacher1.create(teacher_name=teacher_name)
student_name, teacher_id = student_obj
Student1.create(student_name=student_name, teacher_id=teacher_id)
现在我们实现无外键关联查询:
"""查询学生 对应老师 的姓名"""
query = Student1.select(
Student1, # 上面其实已经讲过了,select里面传某字段就查某字段,传类就查所有字段
Teacher1 # 因为后面是join了,但peewee默认是不列出 Teacher1这张外表的。
# 所以需要手动指定Teacher1 (如果我们想查Teacher1表信息,这个必须指定)
).join(
Teacher1, # 虽然无外键关联,但是依旧是可以join的(原生SQL也如此的)
on=(Student1.teacher_id==Teacher1.id) # 这个 on必须手动指定了
# 强调一下,有外键的时候,peewee会自动为我们做on操作,所以我们不需要指定
# 但是,这个是无外键关联的情况,所以必须手动指定on, 不然找不着
).dicts()
for obj in query:
print(obj)
方式6: 自关联查询
# 新定义个表
class Category(Model):
name = CharField()
parent = ForeignKeyField('self', backref='children')
# 注意一下,外键引用这里写的是 "self" ,这是是固定字符串哦 ;backref是反向引用,说过了。
# 创建表
mysql_db.create_tables([Category])
# 插入数据
data = ("son", ("father", ("grandfather", None)))
def insert_self(data):
if data[1]:
parent = insert_self(data[1])
return Category.create(name=data[0], parent=parent)
return Category.create(name=data[0])
insert_self(data) # 这是我自己定义的一个递归插入的方式。。可能有点low
# 可能有点绕,我把插入结果直接贴出来吧
mysql> select * from category;
+----+-------------+-----------+
| id | name | parent_id |
+----+-------------+-----------+
| 1 | grandfather | NULL |
| 2 | father | 1 |
| 3 | son | 2 |
+----+-------------+-----------+
# 开始查询
Parent = Category.alias() # 这是表的(临时查询)改名操作。 接受参数 Parent 即为表名
# 因为自关联嘛,自己和自己,复制一份(改名就相当于临时自我拷贝)
query = Category.select(
Category,
Parent
).join(
Parent,
join_type=JOIN.LEFT_OUTER, # 因为顶部类为空,并且默认连接方式为 inner
# 所以最顶端的数据(grandfather)是查不到的
# 所以查所有数据需要用 ==> 左连接
# on=(Parent.id == Category.parent) # 官档说 on 需要指定,但我试了,不写也能关联上
).dicts()
至此,关联查询操作介绍结束!
接下来对以上六种全部方式的做一些强调和说明:
你可以看见我之前六种方式都是用的dicts(),返回的是类字典格式。(此方式的字段名符合SQL规范)
当然你也可以以类对象的格式返回,(这种方式麻烦一点,我推荐还是用 dicts() )
如果想返回类对象,见如下代码(下面这种方式多了点东西):
query = Teacher.select(Teacher.teacher_name, Student.student_name, Course.course_name) \
.join_from(Teacher, Student) \
.join_from(Teacher, Course, JOIN.LEFT_OUTER) # 注意,我没有用dicts()
for obj in query:
print(obj.teacher_name) # 这行应该没问题吧。本身Teacher就有teacher_name字段
# 注意了,按SQL原理来说,既然已经做了join查询,那么查询结果就应该直接具有所有表的字段的
# 按理说 的确是这样,但是peewee,需要我们先指定多表的表名,在跟写多表的字段,正确写法如下
print(obj.student.student_name) # 而不是 obj.student_name直接调用
print(obj.course.course_name) # 而不是 obj.course_name直接调用
# 先埋个点, 如果你看到下面的 N+1查询问题的实例代码和这个有点像。
# 但我直接说了, 这个是用了预先join()的, 所以涉及到外表查询后,不会触发额外的外表查询
# 自然也不会出现N+1的情况。
# 但如果你没有用join,但查询中涉及了外表,那么就会触发额外的外表查询,就会出现N+1的情况。
关联N+1查询问题:
什么是N+1 query? 看下面例子:
# 数据没有什么特殊的,假设, 老师 和 学生的关系是一对多(注意,我们用了外键)。
class Teacher(BaseModel):
teacher_name = CharField()
class Student(BaseModel):
student_name = CharField()
teacher_id = ForeignKeyField(Teacher, backref='student')
# 查询
teachers = Teacher.select() # 这是 1 次, 查出N个数据
for teacher_obj in teachers:
for student in teacher_obj.student: # 这是 N 次循环(N代表查询的数据)
print(student.student_name)
# 每涉及一个外表属性,都需要对外表进行额外的查询, 额外N次
# 所以你可以看到, 我们总共查询 1+N次, 这就是 N+1 查询。
# (其实我们先做个 表连接,查询一次就可解决问题了。。 这 N+1这种方式 属实弟弟)
# 下面我们介绍2种避免 N+1 的方式
peewee解决N+1问题有两种方式:
方式1:(join)
用 join 先连接好,再查询(前面说了6种方式的join,总有一种符合你需求的)
因为 peewee是支持用户显示调用join语法的, 所以 join是个 特别好的解决 N+1 的问题
方式2: (peewee的prefetch)
# 当然,除了 join,你也可以使用peewee提供的下面这种方式
# 乍眼一看,你会发现和我们上面写的 n+1 查询方式的例子差不多,不一样,你仔细看看
teacher = Teacher.select() # 先预先把 主表 查出来
student = Student.select() # 先预先把 从表 查出来
teacher_and_student = prefetch(teacher, student) # 使用 prefetch方法 (关键)
for teacher in teacher_and_student: # 下面就和N+1一样了
print(teacher.teacher_name)
for student in teacher.student:
print(student.student_name)
说明:
0. prefetch, 原理是,将有外键关系的主从表,隐式"一次性"取出来。"需要时"按需分配即可。
1. 使用prefetch先要把,有外键关联的主从表查出来(注意,"必须必须要有外键,不然不好使")
2. prefetch(主表,从表) # 传进去就行,peewee会自动帮我们根据外键找关系
3. 然后正常 以外键字段 为桥梁 查其他表的信息即可
4. (
题外话,djnago也有类似的prefetch功能,(反正都是避免n+1,优化ORM查询)
貌似给外键字段 设置select_related() 和 prefetch_related() 属性
)
未结束语
本篇主要讲了,CRUD, 特别是针对查询做了大篇幅说明。
我还会有下一篇来介绍peewee的扩展功能。
上一篇传送门:https://segmentfault.com/a/11…
下一篇传送门:https://segmentfault.com/a/11…