用批量写入代替单个写入
最开始,我的代码逻辑是这样的:
for uid, data in user_dict.items():
user_collection.insert_one({'uid':uid, 'user_data': data})
这种方法在数据量较小时可以很好的工作,但是当数据量非常大时,此种操作会非常慢,我们需要通过批量写入的方式来写入数据。
user_data = ({'uid': uid, 'user_data': data} for uid, data in user_dict.items())
user_collection.insert_many(user_data)
调整insert_many参数
再来看是否可以通过调整insert_many参数来进一步优化性能。
- ordered: 这个参数为True时,迫使MongoDB按顺序同步插入数据;而如果为False,则MongoDB会并发的不按固定顺序进行批量插入。显然当我们对性能有要求时,将该参数设为False是非常必要的。
- bypass_document_validation: MongoDB3.2之后加入了document validation功能,用于验证写入的文档是否符合collection制定的规则,具体可以参考reference中的链接。而既然是验证就肯定需要花费时间,当我们对性能有极致要求时,也可以将此参数设为True,从而越过验证,直接写入。
- session: 关于session,请参考References中的Client Session链接。
修改后的代码如下:
user_data = ({'uid': uid, 'user_data': data} for uid, data in user_dict.items())
user_collection.insert_many(user_data, ordere=False, bypass_document_validation=True)
最终性能的提升是非常明显的,时间量级从天降为分钟。
批量更新
前面的例子在插入操作时非常有效,但是对于更新操作由于update_many无法针对每一个doc进行更新,如本例中针对每一个uid进行更新,那么就需要使用bulk_write
操作。
from pymongo import UpdateOne
update_operations = []
for uid, user_data in user_dict.items():
op = UpdateOne({'uid': uid}, {'$set': {'user_data': user_data}}, upsert=True)
update_operations.append(op)
user_collection.bulk_write(update_operations, ordered=False, bypass_document_validation=True)
批量读取
批量读取我们可以使用$in
操作符,但是需要注意的是如果$in
针对的list过大,那么可能会导致报错pymongo.errors.DocumentTooLarge
, 目前我的做法是将大的list分割成1000个一段,然后分段查询
list_length = len(uid_list)
iter_size = 1000
current = 0
while current < list_length:
end = current + iter_size
uid_segment = uid_list[current: end]
result_cursor = mongo_collection.find({"uid": {"$in": uid_segment}})
for user_info in result_cursor:
# do something
...
current = current + iter_size
异常处理
在实践过程中,会遇到异常的情况,尤其是写入的时候,可能由于各种原因导致写入失败,因此需要catch exception,并打印详细信息,如下:
try:
user_collection.insert_many(
data_iter, ordered=False, bypass_document_validation=True)
except BulkWriteError as e:
lg.error(e.details)