asyncio 并发计算如何收集每个任务的返回值?
有 10 个独立的任务,每个任务都有返回值,且要执行 1-3 秒才能结束。
如果顺序执行,那么就需要 10-30 秒才能全部完成。能不能在 3 秒内完成,且获得每个任务的返回值?
使用 asyncio.gather() 函数
用 asyncio.create() 迅速产生任务(这个几乎不花时间),把每个任务放到一个叫做 aws 的列表中,最后将 aws 作为参数传入 asyncio.gather() 。 asyncio.gather() 会收集每个任务的结果,做成一个列表的形式,存储在 result 中。

result = asyncio.run(many_jobs_using_gather(10)) 的结果显示如下:
[(1, 'Job (#0): 1 seconds'), (2, 'Job (#1): 2 seconds'), (2, 'Job (#2): 2 seconds'), (3, 'Job (#3): 3 seconds'), (2, 'Job (#4): 2 seconds'), (2, 'Job (#5): 2 seconds'), (2, 'Job (#6): 2 seconds'), (3, 'Job (#7): 3 seconds'), (1, 'Job (#8): 1 seconds'), (1, 'Job (#9): 1 seconds')]

可以看到,结果 result 是一个元祖的列表:每个元祖的第一个元素代表 match() 函数的返回值 delay, 第二个元素代表返回值 what。注意,结果是按照任务产生的顺序存储的,Job(#0), Job(#1), ..., 符合预期。

使用 asyn with 与 Task Group
在 Python 3.11 中, 引入了 Task Group, 也可以使用 Task Group, 代码如下。这里使用 task.result() 来获得每个任务完成后的结果。

代码清单
import asyncio
import random
import time
async def match(delay, what):
await asyncio.sleep(delay)
print(what)
return delay, what
async def many_jobs_using_task_group():
tasks = []
async with asyncio.TaskGroup() as tg: # work for Python 3.11
for i in range(10):
wait_seconds = random.randint(1,3)
task = tg.create_task(match(wait_seconds, f'Job (#{i}): {wait_seconds} seconds'))
tasks.append(task)
print(f"Started at {time.strftime('%X')}")
# The await is implicit when the context manager exits.
print(f"Finished at {time.strftime('%X')}")
result = []
for task in tasks:
result.append(task.result()) # get the returned result from each task
return result
async def many_jobs_using_gather(n):
aws = [] # awaitables
for i in range(n):
wait_seconds = random.randint(1,3)
task = asyncio.create_task(match(wait_seconds, f'Job (#{i}): {wait_seconds} seconds'))
aws.append(task)
print(f"Started at {time.strftime('%X')}")
print('Gather results ...')
result = await asyncio.gather(*aws) # a list
print(f"Finished at {time.strftime('%X')}")
return result
#result = asyncio.run(many_jobs_using_task_group())
result = asyncio.run(many_jobs_using_gather(10))
print(result)