欢迎光临散文网 会员登陆 & 注册

asyncio 并发计算如何收集每个任务的返回值?

2023-08-01 10:00 作者:Mr蓝珲  | 我要投稿

有 10 个独立的任务,每个任务都有返回值,且要执行 1-3 秒才能结束。


如果顺序执行,那么就需要 10-30 秒才能全部完成。能不能在 3 秒内完成,获得每个任务的返回值? 


使用 asyncio.gather() 函数


用 asyncio.create() 迅速产生任务(这个几乎不花时间),把每个任务放到一个叫做 aws 的列表中,最后将 aws 作为参数传入 asyncio.gather() 。 asyncio.gather() 会收集每个任务的结果,做成一个列表的形式,存储在 result 中。


用 await asyncio.gather(*aws) 收集结果


result = asyncio.run(many_jobs_using_gather(10)) 的结果显示如下:

[(1, 'Job (#0): 1 seconds'), (2, 'Job (#1): 2 seconds'), (2, 'Job (#2): 2 seconds'), (3, 'Job (#3): 3 seconds'), (2, 'Job (#4): 2 seconds'), (2, 'Job (#5): 2 seconds'), (2, 'Job (#6): 2 seconds'), (3, 'Job (#7): 3 seconds'), (1, 'Job (#8): 1 seconds'), (1, 'Job (#9): 1 seconds')]


运行过程的输出


可以看到,结果 result 是一个元祖的列表:每个元祖的第一个元素代表 match() 函数的返回值 delay, 第二个元素代表返回值 what。注意,结果是按照任务产生的顺序存储的,Job(#0), Job(#1), ..., 符合预期。

match() 函数(coroutine)



使用 asyn with 与 Task Group


在 Python 3.11 中, 引入了 Task Group, 也可以使用 Task Group, 代码如下。这里使用 task.result() 来获得每个任务完成后的结果。


用 async with asyncio.TaskGroup() Context Manager (Python 3.11 版本之后)



代码清单


import asyncio

import random

import time


async def match(delay, what):

    await asyncio.sleep(delay)

    print(what)

    return delay, what


async def many_jobs_using_task_group():

    tasks = []

    async with asyncio.TaskGroup() as tg: # work for Python 3.11

        for i in range(10):

            wait_seconds = random.randint(1,3)

            task = tg.create_task(match(wait_seconds, f'Job (#{i}): {wait_seconds} seconds'))

            tasks.append(task)

            print(f"Started at {time.strftime('%X')}")

    # The await is implicit when the context manager exits.

    print(f"Finished at {time.strftime('%X')}") 

    result = []

    for task in tasks:

        result.append(task.result()) # get the returned result from each task

    return result


async def many_jobs_using_gather(n):

    aws = [] # awaitables

    for i in range(n):

        wait_seconds = random.randint(1,3)

        task = asyncio.create_task(match(wait_seconds, f'Job (#{i}): {wait_seconds} seconds'))        

        aws.append(task)

        print(f"Started at {time.strftime('%X')}")

    print('Gather results ...')

    result = await asyncio.gather(*aws) # a list

    print(f"Finished at {time.strftime('%X')}") 

    return result


#result = asyncio.run(many_jobs_using_task_group())

result = asyncio.run(many_jobs_using_gather(10))

print(result)


asyncio 并发计算如何收集每个任务的返回值?的评论 (共 条)

分享到微博请遵守国家法律