欢迎光临散文网 会员登陆 & 注册

Python异步编程中任务调度的实践

2023-06-14 11:17 作者:普通的魔王  | 我要投稿

背景:

在这时fastapi web框架时,需要一个实时任务调度、一个定时任务调度。

这是我首先想到的是,使用 fastapi.BackgroundTasks 完成 实时任务调用,使用AsyncIOScheduler 完成定时任务调度。以下是我踩过的一些坑:

一、同时使用多个任务调度,导致阻塞和死锁

先说结论:

    当我触发定时任务后,定时任务执行时会去注册后台任务,此时的后台任务中如果有IO阻塞(文件IO),整个AsyncIOScheduler的任务调度会被阻塞和死锁。

原因:

    AsyncIOScheduler 线程池来执行异步任务,并在其中使用 BackgroundTasks().add_task() 将异步后台任务添加到后台任务队列中以等待异步执行。由于任务调度是在 AsyncIOScheduler 中完成的,异步 I/O 操作可能会导致阻塞和死锁,因为事件循环不知道任务何时执行完成,从而可能会导致挂起。

对此我做了以下三种尝试:

(一)尝试:

1.fastapi中,我有一个AsyncIOScheduler 的job(使用的线程池),

2.这个job执行的代码有 fastapi.BackgroundTasks().add_task(asyncio.gather(*tasks))也就是添加后台任务去执行asyncio.gather(*tasks)

3.添加的这些tasks中有 一句aiofiles.open(self.full_path, mode='r') ,我debug看时执行到这里就会一直挂起了。

分析:

在第一种情况下,使用了 AsyncIOScheduler 线程池来执行异步任务,并在其中使用 BackgroundTasks().add_task() 将异步后台任务添加到后台任务队列中以等待异步执行。由于任务调度是在 AsyncIOScheduler 中完成的,您的异步 I/O 操作可能会导致阻塞和死锁,因为事件循环不知道任务何时执行完成,从而可能会导致挂起。

(二)尝试:

1.fastapi中,我有一个AsyncIOScheduler 的job(使用的线程池)

2.这个job执行的代码不使用fastapi.BackgroundTasks().add_task(),而是直接执行await asyncio.gather(*tasks)

3.添加的这些tasks中有 一句aiofiles.open(self.full_path, mode='r') ,但这里就不会挂起。

分析:

在第二种情况下,手动调用了异步 I/O 操作,因此,任务会在同一事件循环中按顺序执行。因此,在阻塞 I/O 操作执行时,它会挂起当前协程,等待I/O操作完成,不会导致任务挂起。

(三)尝试:

1.fastapi中,我不使用AsyncIOScheduler

2.直接请求调用fastapi.BackgroundTasks().add_task(asyncio.gather(*tasks))也就是添加后台任务去执行asyncio.gather(*tasks)

3.添加的这些tasks中有 一句aiofiles.open(self.full_path, mode='r') ,这时也不会挂起

分析:

在第三种情况下,完全跳过了使用 AsyncIOScheduler ,并使用 BackgroundTasks().add_task() 来调度一组异步任务。由于异步任务在同一个事件循环环境中执行,因此异步I/O操作不会导致其他异步任务的阻塞和死锁。

总结:

在三种尝试中,都使用了 aiofiles 库进行文件 I/O 操作。由于这个库提供了一些异步 I/O 操作,它可以更好的协同运行在异步编程模型下,避免I/O阻塞。

因此,由于 AsyncIOScheduler 的原因,第一种尝试失败。而在第二种和第三种尝试中,任务都在相同的事件循环中运行,并使用异步 I/O 操作对文件进行读取和写入,因此阻塞操作不会导致死锁或挂起的现象。








Python异步编程中任务调度的实践的评论 (共 条)

分享到微博请遵守国家法律