文章目录
前言
提示:以下是本篇文章正文内容,下面案例可供参考
一、apscheduler是什么?
APScheduler基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。
二、使用步骤
1.安装库
代码如下(示例):
pip install apscheduler
2.引入库
代码如下(示例):
import time
from apscheduler.schedulers.blocking import BlockingScheduler
3.时区设置
设置时区,不然会报错
sched = BlockingScheduler(timezone='Asia/Shanghai')#设置时区,不设置会报错
4.添加定时任务
设置定时任务间隔1小时1分钟5秒执行一次,id:job_1
,设置id后方便修改
def job_function():
print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
job = sched.add_job(job_function,'interval',hours = 1,minutes = 1,seconds=5,id='job_1')
sched.start()
设置定时任务间隔2小时执行一次,设置定时起始日期和结束日期,id:job_1
job = sched.add_job(job_function, 'interval', hours=2, start_date='2022-01-01 09:30:00', end_date='2014-01-30 11:00:00',id='job_1')
5.暂停/恢复定时任务
暂停定时任务 job.pause()
恢复定时任务 job.resume()
6.修改定时任务
修改定时任务间隔2小时2分钟执行一次,id:job_1
,这个id与启动定时任务id要一致
sched.reschedule_job('job_1',trigger='interval', hours = 2,minutes = 2)
7.关闭定时任务
关闭所有的调度器和作业存储。
sched.shutdown() #关闭所有的调度器和作业存储
8.读取定时任务
读取所有任务
print(sched.get_jobs())
读取id:job_1
print(sched.get_job(job_id='job_1'))
9.总结
提示:这里对文章进行总结:
import time
from apscheduler.schedulers.blocking import BlockingScheduler
class timingTask():
def __init__(self):
super().__init__()
self.job = 0
self.sched = BlockingScheduler(timezone='Asia/Shanghai')
def job_function(self):
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
def start(self,hours=1,minutes=0,seconds=0):
print('定时任务启动:时间间隔:'+str(hours)+"小时 "+str(minutes)+"分钟"+str(seconds)+"秒")
if(self.job == 0):
self.job_function()
self.job = self.sched.add_job(self.job_function,trigger='interval', hours = hours,minutes = minutes,seconds = seconds,id='job_1')
# The same as before, but starts on 2010-10-10 at 9:30 and stops on 2014-06-15 at 11:00
# sched.add_job(job_function, 'interval', hours=2, start_date='2021-10-10 09:30:00', end_date='2014-06-15 11:00:00')
self.sched.start()
else:
print('定时任务修改:时间间隔:'+str(hours)+"小时 "+str(minutes)+"分钟"+str(seconds)+"秒")
self.job = sched.reschedule_job('job_1',trigger='interval',hours = hours,minutes = minutes,seconds = seconds)
def pause(self):
self.job.pause()
def delete(self):
self.job.remove()
self.job = 0
def shutdown(self):
self.sche.shutdown()
task = timingTask()#执行函数放入job_function中
task.start(0,0,5)#设置间隔0小时0分钟5秒执行一次
例如:以上就是今天要讲的内容,本文仅仅简单介绍了apscheduler的使用。