【Python】rq队列的使用

1. 什么是Job?

Job直译过来就是工作,可以是任意的Python函数,你可以把你想要异步执行的任务都写成Job函数。简而言之,Job就是你想执行的操作。例如,我想统计任意网页的字符数量,可以写一个这样的Job函数:

import requests 
def count_words(url): 
    return len(requests.get(url).text.split())

这样一个函数就可以称之为Job。

2. 什么是Queue?

当我有很多Job时,假如我现在有3个Job,分别是j1、j2、j3,那么当计算机要执行这些任务的时候,会按照j1、j2、j3加入的顺序来执行这些Job,这样的一个可以忘里面添加Job,并且能够顺序执行队列称之为Queue。

例如,我们可以这样来构建一个Queue:

import redis from rq import Queue


redis_conn = redis.Redis()
q = Queue('default', connection=redis_conn) # 第一个参数是Queue的名称,可以不传,默认为default

3. 怎么把Job放到队列里面去?

j = q.enqueue(count_words, args=('https://www.baidu.com',))

enqueue第一参数是Job函数,args是Job函数的参数,关键字参数可以通过kwargs传入。

4. 什么是Worker?

Worker是Job的消费者,简单来说,你把很多Job加入到了Queue,谁来运行这些Job呢?当然就是Worker啦,你也可以看出Worker必须是独立的进程,这个进程从Redis里面获取Job的信息(包括函数、参数等等),然后运行这个Job。

启动Worker进程也很简单:

$ rq worker low high default
16:56:02 RQ worker 'rq:worker:s2.6443' started, version 0.8.1                                            
16:56:02 Cleaning registries for queue: low         
16:56:02 Cleaning registries for queue: high        
16:56:02 Cleaning registries for queue: default     
16:56:02                                            
16:56:02 *** Listening on low, high, default...

后面的三个参数low、high、default,就是这个Worker将要运行哪些Queue里面的Job,这个顺序很重要,排在前面的Queue里面的Job将优先被运行。

5. 一个完整的例子

jobs.py
[root@iZ2ze66bhrbxkc31nljgjnZ ~]# more jobs.py
import requests
import redis
from rq import Queue
import pymysql

def count_words(url):
    return len(requests.get(url).text.split())
    

def recover_to_db(sql, dbinfo):
    dbinfo[‘charset’] = ‘utf8mb4’
    dbinfo[‘autocommit’] = True
    dbconn = pymysql.Connect(**dbinfo)
    dbconn.autocommit(1)

    cur = dbconn.cursor()
    cur.execute(sql)
    dbconn.close()

app.py

from jobs import count_words,recover_to_db
import requests
import redis
from rq import Queue
import time


def run():
    redis_conn = redis.Redis()
    q = Queue(connection=redis_conn)
    for  i in range(92,99):
        j = q.enqueue(recover_to_db, 'insert into `tt`(`id`) VALUES (%d);' % i,{'host': '47.93.243.162', 'password': 'ESBecs00', 'port': 3306, 'user': 'root','db':'te
st'})
    #j = q.enqueue(, 'https://www.baidu.com')
    #print(j.result)
    #time.sleep(3)
    #print(j.result)
    
if __name__ == '__main__':
    run() 

启动Worker:

$ rq worker  
运行:
$ python app.py

在查看rq work端的日志

  1. 17:08:53 default: jobs.recover_to_db(insert into `tt`(`id`) VALUES (92);, {host: 47.93.243.162, password: ESBecs00, db: test, port: 3306, user: root}) (de54dfcbc2c04d7ea082273ae40e5316)
  2. 17:08:53 default: Job OK (de54dfcbc2c04d7ea082273ae40e5316)
  3. 17:08:53 Result is kept for 500 seconds
  4. 17:08:53
  5. 17:08:53 *** Listening on default...
  6. 17:08:53 default: jobs.recover_to_db(insert into `tt`(`id`) VALUES (93);, {host: 47.93.243.162, password: ESBecs00, db: test, port: 3306, user: root}) (46fb8ef27ad64369837780e728fa3129)
  7. 17:08:53 default: Job OK (46fb8ef27ad64369837780e728fa3129)
  8. 17:08:53 Result is kept for 500 seconds
  9. 17:08:53
  10. 17:08:53 *** Listening on default...
  11. 17:08:53 default: jobs.recover_to_db(insert into `tt`(`id`) VALUES (94);, {host: 47.93.243.162, password: ESBecs00, db: test, port: 3306, user: root}) (334ef4daea8f4ec18c82952fa9300f6f)
  12. 17:08:53 default: Job OK (334ef4daea8f4ec18c82952fa9300f6f)
  13. 17:08:53 Result is kept for 500 seconds
  14. 17:08:53
  15. 17:08:53 *** Listening on default...
  16. 17:08:53 default: jobs.recover_to_db(insert into `tt`(`id`) VALUES (95);, {host: 47.93.243.162, password: ESBecs00, db: test, port: 3306, user: root}) (e7fcbed60d7540ffae89f135d1f6fb45)
  17. 17:08:53 default: Job OK (e7fcbed60d7540ffae89f135d1f6fb45)
  18. 17:08:53 Result is kept for 500 seconds
  19. 17:08:53
  20. 17:08:53 *** Listening on default...
  21. 17:08:53 default: jobs.recover_to_db(insert into `tt`(`id`) VALUES (96);, {host: 47.93.243.162, password: ESBecs00, db: test, port: 3306, user: root}) (3642c31ac2af4cdc99b8773c00d1dbd5)
  22. 17:08:53 default: Job OK (3642c31ac2af4cdc99b8773c00d1dbd5)
  23. 17:08:53 Result is kept for 500 seconds
  24. 17:08:53
  25. 17:08:53 *** Listening on default...
  26. 17:08:53 default: jobs.recover_to_db(insert into `tt`(`id`) VALUES (97);, {host: 47.93.243.162, password: ESBecs00, db: test, port: 3306, user: root}) (99c4fdf31c41494bbb70b287f59cf452)
  27. 17:08:54 default: Job OK (99c4fdf31c41494bbb70b287f59cf452)
  28. 17:08:54 Result is kept for 500 seconds
  29. 17:08:54
  30. 17:08:54 *** Listening on default...
  31. 17:08:54 default: jobs.recover_to_db(insert into `tt`(`id`) VALUES (98);, {host: 47.93.243.162, password: ESBecs00, db: test, port: 3306, user: root}) (842ea53afeba4f54af77ef191a28014b)
  32. 17:08:54 default: Job OK (842ea53afeba4f54af77ef191a28014b)
  33. 17:08:54 Result is kept for 500 seconds
  34. 17:08:54
  35. 17:08:54 *** Listening on default...

参考:

https://segmentfault.com/a/1190000010654775

    原文作者:小亮520cl
    原文地址: http://blog.itpub.net/29096438/viewspace-2154300/
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞