【python】apscheduler 时间定时

如果没有接触过定时模块,现在有个需求让定时启动任务,该怎么实现?我想过几种方法:
(1)sleep,直接pass,sleep会阻塞,在等待的这段时间里,啥都做不了,效率不太好。
(2)将定时的时间和当前时间想办法做比对,如果一样,再调用要运行的任务,再配合一个while Ture,应该也可以实现吧。
(3)调用第三方接口,比如我或许可以找到平台自身的定时模块,比如windows的闹钟、linux的cron,然后用程序去调用接口。
如果不查找资料,让自己去做定时,估计就会从上面的(2)或者(3)去思考了。幸好python库里已经有这样一个模块:apscheduler,可以直接拿来用。省去很多事。
1、模块功能
时间调度模块,可以用于定时、周期性跑任务。可以添加任务、删除任务。
APScheduler有四大组件:
触发器-trigger:触发器就是时间到了要调什么任务,有三种方式触发任务调用:date、cron、interval。
date: 定时,精确到几年几月几日几点几分几秒
cron: 很灵活,date和interval能表示的,它基本上都可以表示,掌握cron就能应对大部分定时场景了。
interval:周期性执行,比如:每隔5秒执行一次任务调用
任务存储器-job store:作业存储的位置,默认情况下放在内存里,也可以存放在数据库中。
执行器-executor:会把任务放到进程池或者线程池中运行任务,任务完成,执行器会通知调度器。默认是线程池。如果你调用的任务是计算密集型的,推荐使用进程池。
调度器-scheduler:我们一般会使用到调度器,调度器能干的活很多,比如配置任务存储器、配置执行器、编辑作业等等。
2、模块安装
pip install apscheduler
3、模块使用
从我自己使用该模块的情况,重点应该是放在触发器上,它是定时模块的核心。
使用该模块分三步:定义调度器、构造触发器添加任务、启动调度器。
先举一个最简单的例子:
from apscheduler.schedulers.blocking import BlockingScheduler
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s-%(levelname)s-%(funcName)s: %(message)s')
logger = logging.getLogger("阿提艾斯")
def func():
logger.info("你好,阿提艾斯")
def scheduler_example():
# 定义调度器实例
sche = BlockingScheduler(timezone='Asia/Shanghai')
# 构造触发器,添加job
sche.add_job(func, 'cron', year="*", month="*", day="*", hour="15", minute="07", second="00")
# 启动调度器
sche.start()
上面例子的运行效果:

3.1 定义调度器
调度器也不止一种,根据情况选择合适的调度器:
BlockingScheduler: 阻塞式调度器,适用于只跑调度器的程序
BackgroundScheduler:后台式调度器,适用于非阻塞的情况,调度器会在后台运行
AsyncIOScheduler: AsyncIO调度器,还没用过
TornadoScheduler: Tornado调度器,还没用过
TwistedScheduler: Twisted调度器,还没用过
QtScheduler: Qt调度器,还没用过
假如不想使用调度器默认的配置,可以参考这个例子:
# 定义调度器
def get_instance():
executors = {
'default': ThreadPoolExecutor(max_workers=30),
#'processorpool': ProcessPoolExecutor(max_workers=4)
}
job_defaults = {
'coalesce': True, # 如果系统因为某些原因没有执行任务,导致任务累计,为True只运行最后一次,为False则累计的任务全部跑一遍。
'max_instances': 1 # 默认情况下,每个作业只能同时运行一个实例。该参数可以为调度程序设置允许并发运行的特定作业的最大实例。
}
sche = BackgroundScheduler(timezone='Asia/Shanghai', executors=executors, job_defaults=job_defaults)
return sche
3.2 构造触发器添加任务
这儿是重点!!!
add_job方法传参源码:
add_job(func, trigger=None, args=None, kwargs=None, id=None, \
name=None, misfire_grace_time=undefined, coalesce=undefined, \
max_instances=undefined, next_run_time=undefined, \
jobstore='default', executor='default', \
replace_existing=False, **trigger_args)
参数解释:
func: 调用的函数,比如: def hello(name)
trigger: 触发器类型,比如:“date”、“cron”、“interval”
args: 传递给func的参数,比如:args=["xxx"]
kwargs: 以字典形式传递给func的参数,比如: kwargs={"name": "atiaisi"}
id: 任务id,要保证唯一
name: job的名称,默认是调用函数的名字
misfire_grace_time: 在指定的运行时间之后的几秒钟,作业仍然可以允许运行。(处理任务定时没启动的情况)
coalesce: True或False,为True表示只运行任务累计的最后一次
max_instances: 每个作业运行实例个数
next_run_time: datetime类型,不用管触发器如何配置,这个字段都会先生效。当然原先配置的触发器定时也会生效。
jobstore: 作业存储,可以不配置,使用默认
executor: 执行器,可以不配置,使用默认
replace_existing: True或False,为True的,job_id相同的话,后面的job会覆盖之前的job。
各种定时情况如下:
“*”号表示所有
sche = BlockingScheduler(timezone='Asia/Shanghai', executors=executors, job_defaults=job_defaults)
# 每个月的第3个周3的23:59:59执行hello
sche.add_job(hello, 'cron', month="*", day=f"{self.num_dict[3]} {self.week_dict[2]}", hour="23", minute="59", second="59")
# 每隔5秒执行一次
sche.add_job(hello, 'interval', days=0, hours=0, minutes=0, seconds=5)
sche.add_job(hello, 'interval', seconds=5)
# 排列组合(3月15日6:7:10、3月15日6:7:30、...)这些时间点分别执行hello
sche.add_job(hello, 'cron', month='3, 4', day="15, 20", hour="6, 9", minute="7, 8", second="10, 30")
完整例子代码:https://gitee.com/atiaisi/py_learn/blob/master/apscheduler_test.py
4、遇到的问题
4.1 PytzUsageWarning: The zone attribute is specific to pytz's interface; please migrate to a new time zone provider

解决方法:定义调度器的时候,把时区加上,比如:
sche = BlockingScheduler(timezone='Asia/Shanghai')
5、参考资料
5.1 https://apscheduler.readthedocs.io/en/stable/userguide.html
5.2 涉及到的代码可以在这儿查看:
https://gitee.com/atiaisi/py_learn/blob/master/apscheduler_test.py

日拱一卒无有尽,功不唐捐终入海。