apscheduler明灯
作为一个python一个非常好用的定时框架,apscheduler拥有非常强大的定时功能、非常简陋的描述文档和非常糟糕的示例,甚至国内的文章在介绍这个框架的时候,大多只是简单地把apscheduler文档的首页机翻了一遍,这造成了非常坏的影响。写这篇文章给大家排排坑。
首先我们做一个测试,由于我的项目暂时还没用到job_stores,所以就主要针对executors和schedulers。(写完才发现这里logger传参应该是__file__,生米煮成熟饭就不统一改了,只是影响展示而已)
在例子中定义了一个能在控制台打印的logger,方便我们观察job运行的时间和信息;装饰器默认打印出任务的所在的进程和线程;为了适应复杂的需要,我定义了多种任务:包括异步的、阻塞的、文件IO的等等。
首先测试的第一种scheduler:BlockingScheduler。查阅官方文档,它说明了在apscheduler高层,希望用户调用的apscheduler无非以下几类:
我的项目中常用的是前三种,因为并没有用到tornado等框架。使用时需要根据框架具体选择何种scheduler:比如当我的任务都是异步任务的时候,那就选择AsyncIOScheduler。
BlockingScheduler很明确的说明了:当你的scheduler在start后,主进程不需要执行其他的动西时,就采用该框架。这并不代表着scheduler只能执行一个任务,而是说当scheduler.start之后,主进程中所有的内容都会阻塞于此,效果相当于在之后调用asyncio.get_event_loop().run_forever(),或者更确切的来说:相当于启用process或者thread后立马join它们。
输出如下:
通过观察结果可以得出以下结论;
①.在没有使用executor的情况下,每个任务触发时是在主线程下的不同进程。更准确的来说,我们根据apscheduler的文档:
可以得知,在不指定executor的情况下,apscheduler默认使用一个大小为5的线程池,复用每一个线程。这也就是为什么当blocking_job运行时期间触发了print_job,两者使用了不同的线程,而其他时候(比如执行print_job后马上执行os_job)复用了同一个线程。
也就是说apscheduler默认的线程池,并不是每一个任务绑定一个线程,而是任务开启时向线程池固定的、已创建的5个线程中acquire一个,运行完成后再将这个线程release回线程池,每一个任务的每一次执行都可能是在不同的线程下。
②.当我们把blocking_job的interval参数调整为1时(这里我就不仔细讲trigger触发器,以及如何给job传参,看看示例就能明白),会触发错误:
这是取决于job_defaults中的参数,这些参数共享给每一个job,当然你也可以在add_job中给每一个job单独指定这些参数。简单讲述下参数:
coalesce:合并,当因为某些原因一个任务积攒多次未运行时,True会让多次任务只运行一次,而False则是积攒多少次就运行多少次。
misfire_grace_time:容许延迟,当任务因为线程池占用满等原因被迫等待时,若等待超过延时,则会自动skip。默认参数是Undifine,会无限等待下去。
replace_existing: 当任务触发时,scheduler中有相同id的任务,则会自动替代它。这个功能常用在非阻塞的scheduler中,也就是说当你一开始创建一个id为"1"(要求id是字符串)的任务,在scheduler启动后(在BlockingScheduler中会阻塞,所以没办法演示在启动后再增加任务)再增加一个同样id为"1"但是function完全不同的任务,那么之前的任务就会被顶掉。也就是说apscheduler区别任务不是通过函数名,而是通过任务的id。
max_instances:同一id任务运行同时运行的最大次数限制,就像上文报错内容一样,因为我们指定同一个任务同时只能运行一个。这个参数默认是1,且该行为是先于executor的。也就是说当blocking_job的max_instances为2,而线程池只有1的大小时,第二个blocking_job是不会报错的,但第三第四第五个会立马报错;而如果max_instances为1且线程池也为1时,则第二个任务就会报错。总结一下就是:一个任务只有在满足max_instances的情况下才会被放行去aquire线程池,至于acquire的结果是否阻塞,并不在考虑的范围中。
其他参数还有显式设置id,指定第一次运行时间,指定特定的job存储方式(使用内存还算数据库),使用何种executor等等。
③.当我们试图添加async_job进入BlockingScheduler时,启动时则会立即报错说该任务never awaited,也就是说它是无法跑任何协程的。
我们尝试使用进程池:
这里非常值得提的一点就是,网络上的写法大致是这样的:
这样写并没有任何问题,但需要注意的是:是创建scheduler的时候其实并不会将所有任务自动调用processpool,它相当于只是给这个scheduler创立了一个属性,别名叫做processpool。所有的规定executor都必须要手动使用add_executor,或者在创建任务时添加executor参数,参数内容就用别名就行。
输出:
可以看出原理同线程池一样,它会复用所有的进程。由于blocking_job执行需要5秒,执行间隔是4秒,且max_instances为1,所以第二次执行的时候就自动被skip了。如果在线程池中跑异步任务会直接报错无法pickle,这很容易理解(因为你没有显式创建event_loop),总之它无法跑异步。
需要注意的是,add_scheduler时,如果在executor中已经指定了default,它会自动先使用default。逻辑在于:如果你没有规定default也没有手动add_executor,它就用默认线程池;如果你规定了default,它就自动调用default进行一个add_executor操作;如果在有default的情况下还想add_executor,那么你需要先把default给remove掉。所以我建议一切添加executor的行为都手动使用add_executor,而不是写在一个dict里然后传给scheduler实例化。
我们换做BackgroundScheduler:
我们新增了一个print_job_substitute来演示replace_existing的效果,需要注意的是scheduler启动后新增的job需要手动添加参数,它不会继承job_defaults中的内容。所以我建议所有的job_settings都手动在add_job中解决,越是共用就越麻烦。
输出:
我们可以注意到:
①.使用该种方法和BlockingScheduler的唯一区别就在于,start之后非阻塞,而是后台运行。其他功能和BlockingScheduler别无二致(实际上研读源码你也会知道,BackgroundScheduler是继承BlockingScheduler的,且改动很少)。所以start后的print被打印了出来。
②.print_job_substitute确实用同样的id顶掉了print_job,顶替的过程类似于:如果该任务正在执行,就让它执行完成;否则都立马更改任务内容并重新计算interval(也就是说假设print都是3秒执行一次,它在剩下1秒就要触发的时候被顶替,那么这个新的任务会重新等待3秒)。
③.当主进程结束时,scheduler就会被夹断,演示中没有展示被夹断时候的情况,大概是说:无法再进行任务了,当这个scheduler都已经shutdown的时候。所以从原理上看scheduler会在__del__的时候自动调用它的shutdown功能,当然你也可以手动shutdown。为了避免这种尴尬的情况出现,我们可以加一点东西:一个while True的input,让你能够手动的通过控制台输入来决定程序是否结束;或者直接点asyncio.get_event_loop().run_forever。我们可以美好地在外面包一层try KeyboardInterrupt,做一点shutdown后的遗言工作。
最后来说说重量级的AsyncIOScheduler。为了适应测试需要,我们改装几个异步函数:
先不加任何的executor:
输出:
非常符合直觉的:异步当然没办法使用默认的线程池,不同的线程怎么共享一个event_loop呢?所以它们当然都是运行在同一进程同一线程下的。
现在我增加一个同步任务,你会很惊讶的发现输出变为了:
也就是说AsyncIOScheduler也是可以跑同步任务的,它会自动不知道怎么(我等下会说明)创建一个线程池;如果你在主线程中打印一下pid和tid,你会发现所有异步任务用的进程和线程都和主线程一样。也就是说:对于异步的任务,本质上是跑在主进程主线程中的同一个event_loop当中的;同步的任务,它会使用进程池,本质上效果等于asyncio.to_thread。
你甚至可以对AsyncIOScheduler使用ProcessPoolExecutor,但这做是没有意义的,官方文档并不推荐这样做,该过程也不包含创建event_loop,这会导致所有的异步任务never awaited。AsyncIOScheduler底层的默认executor是AsyncIOExecutor。我们看AsyncIOExecutor的源码:
在if iscoroutinefunction_partial(job.func)的地方,它检测函数是否是异步的(底层代码就是调用asyncio.iscoroutinefuction),如果不是,就走else的地方,调用的是loop.run_in_executor(如果你看过我之前的文章,你就会知道这个方法高层封装之后就是asyncio.to_thread);如果是,就调用loop.create_task。
也就是说,一个同步任务,本质上是通过跑在concurrent.threadpoolexecutor里面,变成一个全await的异步任务并添加进了event_loop。
AsyncIOExecutor本身是不接受任何参数的,所以几乎没有办法客制化它(也没办法说我能够在run_in_executor时用进程池,或者我给每一个异步函数开一个单独的进程,创立单独的loop)。它本质是为AsyncIOScheduler服务的底层代码。
有没有办法把异步任务单独建立进程、线程来跑呢?我常常希望多进程、多线程并发还异步,让本来就巨慢无比的python代码上高速。有的,但是需要麻烦点,而且资源占用的事情我概不负责。
我们给它包一层:
然后它就变同步了,这时候我们再用BackgroundScheduler带着ProcessPoolExecutor来跑,竟然还真的跑得通:
这下是真的实现多进程异步了,就是资源占得有点多,还有管理起来有点困难。如果使用线程池,就需要改成new_event_loop,然后还要set一下(本质原因或许是子线程调用get_event_loop功能等同于get_running_loop,无法自动创建,所以需要手动创建):
输出:
你需要注意的是,由于每一个任务并非绑定固定的线程,所以一些使用线程id作为标志的单例是极不安全的。通过打印loop你会发现:任务之间、每个任务运行时,其loop的id都是完全不一样的,它是真的在不断地新建和消灭,所以从某种意义上讲这是一种浪费。这是否和你的需求相匹配,取决于你是否希望牺牲安全来换取速度。
最后祝诸君都写出能满足项目需求的屎山代码!