python操作sqlite的示例代码:
import time import threading import sqlite3 def nomal_producer(conn): ''' @summary: producer defination ''' counter = 0 conn.isolation_level = None conn.row_factory = sqlite3.Row while True: # insert to db cur = conn.cursor() cur.execute("INSERT INTO datas(content, flag) VALUES (?, ?);", ("content %s"%counter, False)) counter = counter + 1 # conn.commit() time.sleep(0.1) def nomal_consumer(conn): ''' @summary: consumer defination ''' conn.isolation_level = None conn.row_factory = sqlite3.Row while True: # select data cur = conn.cursor() cur.execute("SELECT * FROM datas ORDER BY id LIMIT 10;") records = cur.fetchall() if len(records) > 0: print "begin to delete: " print records # delete records for r in records: conn.execute("DELETE FROM datas WHERE id = ?;", (r["id"], )) time.sleep(0.5) if __name__ == "__main__": # init db conn = sqlite3.connect('./db.sqlite', check_same_thread = False) # conn = sqlite3.connect('./db.sqlite') # init thread producer = threading.Thread(target = nomal_producer, args = (conn,)) consumer = threading.Thread(target = nomal_consumer, args = (conn,)) # start threads producer.start() consumer.start()
在多进程操作sqlite的示例代码中,采用producer和consumer的模式来处理,没有特殊之处,但需要注意的是:在建立sqlite3的connection的时候,需要设置check_same_thread = False。
另外,为了达到真正的thread-safe,可以对python的sqlite3做进一步封装,以达到仅有一个thread在操作sqlite,原理很简单,就是使用queue来处理所有操作请求并同时将结果返回到另外一个queue中去,示例代码如下:
import sqlite3 from Queue import Queue from threading import Thread class SqliteMultithread(Thread): """ Wrap sqlite connection in a way that allows concurrent requests from multiple threads. This is done by internally queueing the requests and processing them sequentially in a separate thread (in the same order they arrived). """ def __init__(self, filename, autocommit, journal_mode): super(SqliteMultithread, self).__init__() self.filename = filename self.autocommit = autocommit self.journal_mode = journal_mode self.reqs = Queue() # use request queue of unlimited size self.setDaemon(True) # python2.5-compatible self.start() def run(self): if self.autocommit: conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False) else: conn = sqlite3.connect(self.filename, check_same_thread=False) conn.execute('PRAGMA journal_mode = %s' % self.journal_mode) conn.text_factory = str cursor = conn.cursor() cursor.execute('PRAGMA synchronous=OFF') while True: req, arg, res = self.reqs.get() if req == '--close--': break elif req == '--commit--': conn.commit() else: cursor.execute(req, arg) if res: for rec in cursor: res.put(rec) res.put('--no more--') if self.autocommit: conn.commit() conn.close() def execute(self, req, arg=None, res=None): """ `execute` calls are non-blocking: just queue up the request and return immediately. """ self.reqs.put((req, arg or tuple(), res)) def executemany(self, req, items): for item in items: self.execute(req, item) def select(self, req, arg=None): """ Unlike sqlite's native select, this select doesn't handle iteration efficiently. The result of `select` starts filling up with values as soon as the request is dequeued, and although you can iterate over the result normally (`for res in self.select(): ...`), the entire result will be in memory. """ res = Queue() # results of the select will appear as items in this queue self.execute(req, arg, res) while True: rec = res.get() if rec == '--no more--': break yield rec def select_one(self, req, arg=None): """Return only the first row of the SELECT, or None if there are no matching rows.""" try: return iter(self.select(req, arg)).next() except StopIteration: return None def commit(self): self.execute('--commit--') def close(self): self.execute('--close--') #endclass SqliteMultithread